当前位置: 首页 / 技术干货 / 正文
数据采集工具之Flume的基础使用

2023-03-02

   1. sinks hdfs

  一、Flume 的基础配置

  要使用 Flume 来采集数据,我们要做的第一件事情就是制定采集方案。在采集方案中主要要制定的是三部分:

  ●针对不同的数据源,制定对应的 Source

  ●针对不同的数据去向,制定对应的 Sink

  ●针对不同的场景,制定对应的 Channel

  # list the sources, sinks and channels for the agent

  .sources =

  .channels =

  .sinks =

  # set channel for source

  .sources..channels =...

  # set channel for sink

  .sinks..channel = 

  例如:

  # list the sources, sinks and channels for the agent

  agent_foo.sources = avro-appserver-src-1

  agent_foo.channels = mem-channel-1

  agent_foo.sinks = hdfs-sink-1

  # set channel for source

  agent_foo.sources.avro-appserver-src-1.channels = mem-channel-1

  # set channel for sink

  agent_foo.sinks.hdfs-sink-1.channel = mem-channel-1

  无论是 Source、Channel、Sink,每一个组件都有自己的一些属性。我们也可以在采集的配置文件中定义这些组件的属性,来丰富功能。

  # properties for sources

  .sources.. =

  # properties for channels

  .channel..=

  # properties for sinks

  .sources..= 

  例如:

  agent_foo.sources = avro-AppSrv-source

  agent_foo.sinks = hdfs-Cluster1-sink

  agent_foo.channels = mem-channel-1

  # set channel for sources, sinks

  # properties of avro-AppSrv-source

  agent_foo.sources.avro-AppSrv-source.type = avro

  agent_foo.sources.avro-AppSrv-source.bind = localhost

  agent_foo.sources.avro-AppSrv-source.port = 10000

  # properties of mem-channel-1

  agent_foo.channels.mem-channel-1.type = memory

  agent_foo.channels.mem-channel-1.capacity = 1000

  agent_foo.channels.mem-channel-1.transactionCapacity = 100

  # properties of hdfs-Cluster1-sink

  agent_foo.sinks.hdfs-Cluster1-sink.type = hdfs

  agent_foo.sinks.hdfs-Cluster1-sink.hdfs.path = hdfs://namenode/flume/webdata

  #...

  二、常见的采集案例

  2.1. 案例演示:Avro+Memory+Logger

  Avro Source:监听一个指定的Avro端口,通过Avro端口可以获取到Avro client发送过来的文件,即只要应用程序通过Avro端口发送文件,source组件就可以获取到该文件中的内容,输出位置为Logger

  采集方案

  [root@qianfeng01 flume-1.9.0]# mkdir flumeconf

  [root@qianfeng01 flume-1.9.0]# cd flumeconf

  [root@qianfeng01 flumeconf]# vi avro-logger.conf

  #定义各个组件的名字

  a1.sources=avro-sour1

  a1.channels=mem-chan1

  a1.sinks=logger-sink1

  #定义sources组件的相关属性

  a1.sources.avro-sour1.type=avro

  a1.sources.avro-sour1.bind=qianfeng01

  a1.sources.avro-sour1.port=9999

  #定义channels组件的相关属性

  a1.channels.mem-chan1.type=memory

  #定义sinks组件的相关属性

  a1.sinks.logger-sink1.type=logger

  a1.sinks.logger-sink1.maxBytesToLog=100

  #组件之间进行绑定

  a1.sources.avro-sour1.channels=mem-chan1

  a1.sinks.logger-sink1.channel=mem-chan1

  启动 Agent

  [root@qianfeng01 flumeconf]# flume-ng agent -c ../conf -f ./avro-logger.conf -n a1 -Dflume.root.logger=INFO,console

  测试数据

  [root@qianfeng01 ~]# mkdir flumedata

  [root@qianfeng01 ~]# cd flumedata/

  [root@qianfeng01 flumedata]#

  [root@qianfeng01 flumedata]# date >> test.data

  [root@qianfeng01 flumedata]# cat test.data

  2019年 11月 21日 星期四 21:22:36 CST

  [root@qianfeng01 flumedata]# ping qianfeng01 >> test.data

  [root@qianfeng01 flumedata]# cat test.data

  ....省略....

  [root@qianfeng01 flumedata]# flume-ng avro-client -c /usr/local/flume-1.9.0/conf/ -H qianfeng01 -p 9999 -F ./test.data

  2.2. 案例演示 Taildir+Memory+HDFS

  采集方案

  a1.sources = r1

  a1.channels = c1

  a1.sinks = s1

  a1.sources.r1.type = TAILDIR

  a1.sources.r1.positionFile = /usr/local/flume-1.9.0/flumeconf/taildir_position.json

  a1.sources.r1.filegroups = f1

  a1.sources.r1.filegroups.f1 = /usr/local/flume-1.9.0/flumedata/tails/.*log.*

  a1.sources.r1.fileHeader = true

  a1.sources.ri.maxBatchCount = 1000

  a1.channels.c1.type=memory

  a1.channels.c1.capacity=1000

  a1.channels.c1.transactionCapacity=100

  a1.sinks.s1.type=hdfs

  a1.sinks.s1.hdfs.path=hdfs://qianfeng01:9820/flume/taildir/

  a1.sinks.s1.hdfs.filePrefix=flume-hdfs

  a1.sinks.s1.hdfs.fileSuffix=.log

  a1.sinks.s1.hdfs.inUseSuffix=.tmp

  a1.sinks.s1.hdfs.rollInterval=60

  a1.sinks.s1.hdfs.rollSize=1024

  a1.sinks.s1.hdfs.rollCount=10

  a1.sinks.s1.hdfs.idleTimeout=0

  a1.sinks.s1.hdfs.batchSize=100

  a1.sinks.s1.hdfs.fileType=DataStream

  a1.sinks.s1.hdfs.writeFormat=Text

  a1.sinks.s1.hdfs.round=true

  a1.sinks.s1.hdfs.roundValue=1

  a1.sinks.s1.hdfs.roundUnit=second

  a1.sinks.s1.hdfs.useLocalTimeStamp=true

  a1.sources.r1.channels=c1

  a1.sinks.s1.channel=c1

  启动 Agent

  [root@qianfeng01 flumeconf]# flume-ng agent -c ../conf -f ./taildir-hdfs.conf -n a1 -Dflume.root.logger=INFO,console

  测试数据

  [root@qianfeng01 tails]# echo "hello world" >>a1.log

  [root@qianfeng01 tails]# echo "hello world" >>a1.log

  [root@qianfeng01 tails]# echo "hello world" >>a1.log

  [root@qianfeng01 tails]# echo "hello world" >>a1.log

  [root@qianfeng01 tails]# echo "hello world123" >>a1.log

  [root@qianfeng01 tails]# echo "hello world123" >>a2.log

  [root@qianfeng01 tails]# echo "hello world123" >>a3.log

  [root@qianfeng01 tails]# echo "hello world123" >>a3.csv

  [root@qianfeng01 tails]# echo "hello world123" >>a3.log

好程序员公众号

  • · 剖析行业发展趋势
  • · 汇聚企业项目源码

好程序员开班动态

More+
  • HTML5大前端 <高端班>

    开班时间:2021-04-12(深圳)

    开班盛况

    开班时间:2021-05-17(北京)

    开班盛况
  • 大数据+人工智能 <高端班>

    开班时间:2021-03-22(杭州)

    开班盛况

    开班时间:2021-04-26(北京)

    开班盛况
  • JavaEE分布式开发 <高端班>

    开班时间:2021-05-10(北京)

    开班盛况

    开班时间:2021-02-22(北京)

    开班盛况
  • Python人工智能+数据分析 <高端班>

    开班时间:2021-07-12(北京)

    预约报名

    开班时间:2020-09-21(上海)

    开班盛况
  • 云计算开发 <高端班>

    开班时间:2021-07-12(北京)

    预约报名

    开班时间:2019-07-22(北京)

    开班盛况
IT培训IT培训
在线咨询
IT培训IT培训
试听
IT培训IT培训
入学教程
IT培训IT培训
立即报名
IT培训

Copyright 2011-2023 北京千锋互联科技有限公司 .All Right 京ICP备12003911号-5 京公网安备 11010802035720号