2023-03-02
1. sinks hdfs
一、Flume 的基礎配置
要使用 Flume 來采集數據,我們要做的第一件事情就是制定采集方案。在采集方案中主要要制定的是三部分:
●針對不同的數據源,制定對應的 Source
●針對不同的數據去向,制定對應的 Sink
●針對不同的場景,制定對應的 Channel
# list the sources, sinks and channels for the agent
.sources =
.channels =
.sinks =
# set channel for source
.sources..channels =...
# set channel for sink
.sinks..channel =
例如:
# list the sources, sinks and channels for the agent
agent_foo.sources = avro-appserver-src-1
agent_foo.channels = mem-channel-1
agent_foo.sinks = hdfs-sink-1
# set channel for source
agent_foo.sources.avro-appserver-src-1.channels = mem-channel-1
# set channel for sink
agent_foo.sinks.hdfs-sink-1.channel = mem-channel-1
無論是 Source、Channel、Sink,每一個組件都有自己的一些屬性。我們也可以在采集的配置文件中定義這些組件的屬性,來豐富功能。
# properties for sources
.sources.. =
# properties for channels
.channel..=
# properties for sinks
.sources..=
例如:
agent_foo.sources = avro-AppSrv-source
agent_foo.sinks = hdfs-Cluster1-sink
agent_foo.channels = mem-channel-1
# set channel for sources, sinks
# properties of avro-AppSrv-source
agent_foo.sources.avro-AppSrv-source.type = avro
agent_foo.sources.avro-AppSrv-source.bind = localhost
agent_foo.sources.avro-AppSrv-source.port = 10000
# properties of mem-channel-1
agent_foo.channels.mem-channel-1.type = memory
agent_foo.channels.mem-channel-1.capacity = 1000
agent_foo.channels.mem-channel-1.transactionCapacity = 100
# properties of hdfs-Cluster1-sink
agent_foo.sinks.hdfs-Cluster1-sink.type = hdfs
agent_foo.sinks.hdfs-Cluster1-sink.hdfs.path = hdfs://namenode/flume/webdata
#...
二、常見的采集案例
2.1. 案例演示:Avro+Memory+Logger
Avro Source:監聽一個指定的Avro端口,通過Avro端口可以獲取到Avro client發送過來的文件,即只要應用程序通過Avro端口發送文件,source組件就可以獲取到該文件中的內容,輸出位置為Logger
采集方案
[root@qianfeng01 flume-1.9.0]# mkdir flumeconf
[root@qianfeng01 flume-1.9.0]# cd flumeconf
[root@qianfeng01 flumeconf]# vi avro-logger.conf
#定義各個組件的名字
a1.sources=avro-sour1
a1.channels=mem-chan1
a1.sinks=logger-sink1
#定義sources組件的相關屬性
a1.sources.avro-sour1.type=avro
a1.sources.avro-sour1.bind=qianfeng01
a1.sources.avro-sour1.port=9999
#定義channels組件的相關屬性
a1.channels.mem-chan1.type=memory
#定義sinks組件的相關屬性
a1.sinks.logger-sink1.type=logger
a1.sinks.logger-sink1.maxBytesToLog=100
#組件之間進行綁定
a1.sources.avro-sour1.channels=mem-chan1
a1.sinks.logger-sink1.channel=mem-chan1
啟動 Agent
[root@qianfeng01 flumeconf]# flume-ng agent -c ../conf -f ./avro-logger.conf -n a1 -Dflume.root.logger=INFO,console
測試數據
[root@qianfeng01 ~]# mkdir flumedata
[root@qianfeng01 ~]# cd flumedata/
[root@qianfeng01 flumedata]#
[root@qianfeng01 flumedata]# date >> test.data
[root@qianfeng01 flumedata]# cat test.data
2019年 11月 21日 星期四 21:22:36 CST
[root@qianfeng01 flumedata]# ping qianfeng01 >> test.data
[root@qianfeng01 flumedata]# cat test.data
....省略....
[root@qianfeng01 flumedata]# flume-ng avro-client -c /usr/local/flume-1.9.0/conf/ -H qianfeng01 -p 9999 -F ./test.data
2.2. 案例演示 Taildir+Memory+HDFS
采集方案
a1.sources = r1
a1.channels = c1
a1.sinks = s1
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /usr/local/flume-1.9.0/flumeconf/taildir_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /usr/local/flume-1.9.0/flumedata/tails/.*log.*
a1.sources.r1.fileHeader = true
a1.sources.ri.maxBatchCount = 1000
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
a1.sinks.s1.type=hdfs
a1.sinks.s1.hdfs.path=hdfs://qianfeng01:9820/flume/taildir/
a1.sinks.s1.hdfs.filePrefix=flume-hdfs
a1.sinks.s1.hdfs.fileSuffix=.log
a1.sinks.s1.hdfs.inUseSuffix=.tmp
a1.sinks.s1.hdfs.rollInterval=60
a1.sinks.s1.hdfs.rollSize=1024
a1.sinks.s1.hdfs.rollCount=10
a1.sinks.s1.hdfs.idleTimeout=0
a1.sinks.s1.hdfs.batchSize=100
a1.sinks.s1.hdfs.fileType=DataStream
a1.sinks.s1.hdfs.writeFormat=Text
a1.sinks.s1.hdfs.round=true
a1.sinks.s1.hdfs.roundValue=1
a1.sinks.s1.hdfs.roundUnit=second
a1.sinks.s1.hdfs.useLocalTimeStamp=true
a1.sources.r1.channels=c1
a1.sinks.s1.channel=c1
啟動 Agent
[root@qianfeng01 flumeconf]# flume-ng agent -c ../conf -f ./taildir-hdfs.conf -n a1 -Dflume.root.logger=INFO,console
測試數據
[root@qianfeng01 tails]# echo "hello world" >>a1.log
[root@qianfeng01 tails]# echo "hello world" >>a1.log
[root@qianfeng01 tails]# echo "hello world" >>a1.log
[root@qianfeng01 tails]# echo "hello world" >>a1.log
[root@qianfeng01 tails]# echo "hello world123" >>a1.log
[root@qianfeng01 tails]# echo "hello world123" >>a2.log
[root@qianfeng01 tails]# echo "hello world123" >>a3.log
[root@qianfeng01 tails]# echo "hello world123" >>a3.csv
[root@qianfeng01 tails]# echo "hello world123" >>a3.log
開班時間:2021-04-12(深圳)
開班盛況開班時間:2021-05-17(北京)
開班盛況開班時間:2021-03-22(杭州)
開班盛況開班時間:2021-04-26(北京)
開班盛況開班時間:2021-05-10(北京)
開班盛況開班時間:2021-02-22(北京)
開班盛況開班時間:2021-07-12(北京)
預約報名開班時間:2020-09-21(上海)
開班盛況開班時間:2021-07-12(北京)
預約報名開班時間:2019-07-22(北京)
開班盛況Copyright 2011-2023 北京千鋒互聯科技有限公司 .All Right 京ICP備12003911號-5 京公網安備 11010802035720號