2023-03-02
1. sinks hdfs
一、Flume 的基础配置
要使用 Flume 来采集数据,我们要做的第一件事情就是制定采集方案。在采集方案中主要要制定的是三部分:
●针对不同的数据源,制定对应的 Source
●针对不同的数据去向,制定对应的 Sink
●针对不同的场景,制定对应的 Channel
# list the sources, sinks and channels for the agent
.sources =
.channels =
.sinks =
# set channel for source
.sources..channels =...
# set channel for sink
.sinks..channel =
例如:
# list the sources, sinks and channels for the agent
agent_foo.sources = avro-appserver-src-1
agent_foo.channels = mem-channel-1
agent_foo.sinks = hdfs-sink-1
# set channel for source
agent_foo.sources.avro-appserver-src-1.channels = mem-channel-1
# set channel for sink
agent_foo.sinks.hdfs-sink-1.channel = mem-channel-1
无论是 Source、Channel、Sink,每一个组件都有自己的一些属性。我们也可以在采集的配置文件中定义这些组件的属性,来丰富功能。
# properties for sources
.sources.. =
# properties for channels
.channel..=
# properties for sinks
.sources..=
例如:
agent_foo.sources = avro-AppSrv-source
agent_foo.sinks = hdfs-Cluster1-sink
agent_foo.channels = mem-channel-1
# set channel for sources, sinks
# properties of avro-AppSrv-source
agent_foo.sources.avro-AppSrv-source.type = avro
agent_foo.sources.avro-AppSrv-source.bind = localhost
agent_foo.sources.avro-AppSrv-source.port = 10000
# properties of mem-channel-1
agent_foo.channels.mem-channel-1.type = memory
agent_foo.channels.mem-channel-1.capacity = 1000
agent_foo.channels.mem-channel-1.transactionCapacity = 100
# properties of hdfs-Cluster1-sink
agent_foo.sinks.hdfs-Cluster1-sink.type = hdfs
agent_foo.sinks.hdfs-Cluster1-sink.hdfs.path = hdfs://namenode/flume/webdata
#...
二、常见的采集案例
2.1. 案例演示:Avro+Memory+Logger
Avro Source:监听一个指定的Avro端口,通过Avro端口可以获取到Avro client发送过来的文件,即只要应用程序通过Avro端口发送文件,source组件就可以获取到该文件中的内容,输出位置为Logger
采集方案
[root@qianfeng01 flume-1.9.0]# mkdir flumeconf
[root@qianfeng01 flume-1.9.0]# cd flumeconf
[root@qianfeng01 flumeconf]# vi avro-logger.conf
#定义各个组件的名字
a1.sources=avro-sour1
a1.channels=mem-chan1
a1.sinks=logger-sink1
#定义sources组件的相关属性
a1.sources.avro-sour1.type=avro
a1.sources.avro-sour1.bind=qianfeng01
a1.sources.avro-sour1.port=9999
#定义channels组件的相关属性
a1.channels.mem-chan1.type=memory
#定义sinks组件的相关属性
a1.sinks.logger-sink1.type=logger
a1.sinks.logger-sink1.maxBytesToLog=100
#组件之间进行绑定
a1.sources.avro-sour1.channels=mem-chan1
a1.sinks.logger-sink1.channel=mem-chan1
启动 Agent
[root@qianfeng01 flumeconf]# flume-ng agent -c ../conf -f ./avro-logger.conf -n a1 -Dflume.root.logger=INFO,console
测试数据
[root@qianfeng01 ~]# mkdir flumedata
[root@qianfeng01 ~]# cd flumedata/
[root@qianfeng01 flumedata]#
[root@qianfeng01 flumedata]# date >> test.data
[root@qianfeng01 flumedata]# cat test.data
2019年 11月 21日 星期四 21:22:36 CST
[root@qianfeng01 flumedata]# ping qianfeng01 >> test.data
[root@qianfeng01 flumedata]# cat test.data
....省略....
[root@qianfeng01 flumedata]# flume-ng avro-client -c /usr/local/flume-1.9.0/conf/ -H qianfeng01 -p 9999 -F ./test.data
2.2. 案例演示 Taildir+Memory+HDFS
采集方案
a1.sources = r1
a1.channels = c1
a1.sinks = s1
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /usr/local/flume-1.9.0/flumeconf/taildir_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /usr/local/flume-1.9.0/flumedata/tails/.*log.*
a1.sources.r1.fileHeader = true
a1.sources.ri.maxBatchCount = 1000
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
a1.sinks.s1.type=hdfs
a1.sinks.s1.hdfs.path=hdfs://qianfeng01:9820/flume/taildir/
a1.sinks.s1.hdfs.filePrefix=flume-hdfs
a1.sinks.s1.hdfs.fileSuffix=.log
a1.sinks.s1.hdfs.inUseSuffix=.tmp
a1.sinks.s1.hdfs.rollInterval=60
a1.sinks.s1.hdfs.rollSize=1024
a1.sinks.s1.hdfs.rollCount=10
a1.sinks.s1.hdfs.idleTimeout=0
a1.sinks.s1.hdfs.batchSize=100
a1.sinks.s1.hdfs.fileType=DataStream
a1.sinks.s1.hdfs.writeFormat=Text
a1.sinks.s1.hdfs.round=true
a1.sinks.s1.hdfs.roundValue=1
a1.sinks.s1.hdfs.roundUnit=second
a1.sinks.s1.hdfs.useLocalTimeStamp=true
a1.sources.r1.channels=c1
a1.sinks.s1.channel=c1
启动 Agent
[root@qianfeng01 flumeconf]# flume-ng agent -c ../conf -f ./taildir-hdfs.conf -n a1 -Dflume.root.logger=INFO,console
测试数据
[root@qianfeng01 tails]# echo "hello world" >>a1.log
[root@qianfeng01 tails]# echo "hello world" >>a1.log
[root@qianfeng01 tails]# echo "hello world" >>a1.log
[root@qianfeng01 tails]# echo "hello world" >>a1.log
[root@qianfeng01 tails]# echo "hello world123" >>a1.log
[root@qianfeng01 tails]# echo "hello world123" >>a2.log
[root@qianfeng01 tails]# echo "hello world123" >>a3.log
[root@qianfeng01 tails]# echo "hello world123" >>a3.csv
[root@qianfeng01 tails]# echo "hello world123" >>a3.log
上一篇:数据采集工具之Flume 的介绍
下一篇:SpringMVC源码解析(一)
开班时间:2021-04-12(深圳)
开班盛况开班时间:2021-05-17(北京)
开班盛况开班时间:2021-03-22(杭州)
开班盛况开班时间:2021-04-26(北京)
开班盛况开班时间:2021-05-10(北京)
开班盛况开班时间:2021-02-22(北京)
开班盛况开班时间:2021-07-12(北京)
预约报名开班时间:2020-09-21(上海)
开班盛况开班时间:2021-07-12(北京)
预约报名开班时间:2019-07-22(北京)
开班盛况Copyright 2011-2023 北京千锋互联科技有限公司 .All Right 京ICP备12003911号-5 京公网安备 11010802035720号