Flume
Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。
Flume 安装配置
wget http://www.apache.org/dyn/closer.lua/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz
cd /home/meizhangzheng
1. 解压
tar -zxvf apache-flume-1.9.0-bin.tar.gz -C ~/data
cd data
mv apache-flume-1.9.0-bin flume-1.9.0
2. 添加环境变量配置信息:
– 2020.05.09 add flume path
export FLUME_HOME=/home/meizhangzheng/data/flume-1.9.0
export PATH=$FLUME_HOME/bin:$PATH
3.flum 配置信息:
cd /home/meizhangzheng/data/flume-1.9.0/conf
touch test.properties
vi test.properties
3.1 add config
agent.sources = s1
agent.channels = c1
agent.sinks = k1
agent.sources.s1.type = org.apache.flume.source.taildir.TaildirSource
agent.sources.s1.channels = c1
agent.sources.s1.positionFile = /home/meizhangzheng/data/flume-1.9.0/test/taildir_position.json
agent.sources.s1.filegroups = f1
agent.sources.s1.filegroups.f1 = /home/meizhangzheng/data/testlog/.*.log
agent.sources.s1.batchSize = 4000
agent.channels.c1.type = file
agent.channels.c1.checkpointDir = /home/meizhangzheng/data/flume-1.9.0/test/checkpoint
agent.channels.c1.dataDirs = /home/meizhangzheng/data/flume_data
agent.channels.c1.keep-alive = 6
agent.sinks.k1.channel = c1
agent.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.k1.kafka.topic=test
agent.sinks.k1.kafka.bootstrap.servers = 192.168.1.10:9092,192.168.1.20:9092,192.168.1.30:9092
agent.sinks.k1.kafka.producer.acks = 1
agent.sinks.k1.kafka.flumeBatchSize = 1000
3.2 创建测试目录
需要在/home/meizhangzheng/data/flume-1.9.0 创建test目录
cd /home/meizhangzheng/data/flume-1.9.0
mkdir test
3.3 创建日志目录
在data 目录创建flume 日志目录
cd /home/meizhangzheng/data
mkdir testlog
4. agent启动flume
4.1 需要启动zookeeper 、kafka 集群
zkServer.sh start
kafka-server-start.sh ~/data/kafka-2.5.0/config/server.properties
4.2 启动 主题为 test的kafka消费
kafka-console-consumer.sh --bootstrap-server master:9092,slave1:9092,slave2:9092 --topic test --from-beginning
4.3 启动flume
sh flume-ng agent -c conf -f ~/data/flume-1.9.0/conf/test.properties --name agent -Dflume.root.logger=WARN,console
4.4 进入testlog目录,然后写test.log文件,同时查看该文件的信息是否被消费
cd /home/meizhangzheng/data/testlog
echo "hello" >> test.log
4.5 打开 监听 test主题的进程: 查看结果
[meizhangzheng@master ~]$ kafka-console-consumer.sh --bootstrap-server master:9092,slave1:9092,slave2:9092 --topic test --from-beginning
a
b
d
e
hello
c
mzz
Sat May 9 07:51:32 CST 2020
Sat May 9 07:52:28 CST 2020
source:console
source:test.log file