2023-03-16
channel 事件 寫入
說明
Flume中的Channel選擇器作用于source階段 ,是決定Source接受的特定事件寫入到哪個Channel的組件,他們告訴Channel處理器,然后由其將事件寫入到Channel。
Agent中各個組件的交互
由于Flume不是兩階段提交,事件被寫入到一個Channel,然后事件在寫入下一個Channel之前提交,如果寫入一個Channel出現異常,那么之前已經寫入到其他Channel的相同事件不能被回滾。當這樣的異常發生時,Channel處理器拋出ChannelException異常,事務失敗,如果Source試圖再次寫入相同的事件(大多數情況下,會再次寫入,只有Syslog,Exec等Source不能重試,因為沒有辦法生成相同的數據),重復的事件將寫入到Channel中,而先前的提交是成功的,這樣在Flume中就發生了重復。
Channel選擇器的配置是通過Channel處理器完成的,Channel選擇器可以指定一組Channel是必須的,另一組的可選的。
Flume分類兩種選擇器,如果Source配置中沒有指定選擇器,那么會自動使用復制Channel選擇器.
●replicating:該選擇器復制每個事件到通過Source的Channels參數指定的所有Channel中。
●multiplexing:是一種專門用于動態路由事件的Channel選擇器,通過選擇事件應該寫入到哪個Channel,基于一個特定的事件頭的值進行路由
案例演示:replicating selector
配置方案
[root@qianfeng01 flumeconf]# vi rep.conf
a1.sources = r1
a1.channels = c1 c2
a1.sinks = s1 s2
a1.sources.r1.type=syslogtcp
a1.sources.r1.host = qianfeng01
a1.sources.r1.port = 6666
a1.sources.r1.selector.type=replicating
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
a1.channels.c1.keep-alive=3
a1.channels.c1.byteCapacityBufferPercentage=20
a1.channels.c1.byteCapacity=800000
a1.channels.c2.type=memory
a1.channels.c2.capacity=1000
a1.channels.c2.transactionCapacity=100
a1.sinks.s1.type=hdfs
a1.sinks.s1.hdfs.path=hdfs://qianfeng01:8020/flume/%Y/%m/%d/rep
a1.sinks.s1.hdfs.filePrefix=s1sink
a1.sinks.s1.hdfs.fileSuffix=.log
a1.sinks.s1.hdfs.inUseSuffix=.tmp
a1.sinks.s1.hdfs.rollInterval=60
a1.sinks.s1.hdfs.rollSize=1024
a1.sinks.s1.hdfs.rollCount=10
a1.sinks.s1.hdfs.idleTimeout=0
a1.sinks.s1.hdfs.batchSize=100
a1.sinks.s1.hdfs.fileType=DataStream
a1.sinks.s1.hdfs.writeFormat=Text
a1.sinks.s1.hdfs.round=true
a1.sinks.s1.hdfs.roundValue=1
a1.sinks.s1.hdfs.roundUnit=second
a1.sinks.s1.hdfs.useLocalTimeStamp=true
a1.sinks.s2.type=hdfs
a1.sinks.s2.hdfs.path=hdfs://qianfeng01:8020/flume/%Y/%m/%d/rep
a1.sinks.s2.hdfs.filePrefix=s2sink
a1.sinks.s2.hdfs.fileSuffix=.log
a1.sinks.s2.hdfs.inUseSuffix=.tmp
a1.sinks.s2.hdfs.rollInterval=60
a1.sinks.s2.hdfs.rollSize=1024
a1.sinks.s2.hdfs.rollCount=10
a1.sinks.s2.hdfs.idleTimeout=0
a1.sinks.s2.hdfs.batchSize=100
a1.sinks.s2.hdfs.fileType=DataStream
a1.sinks.s2.hdfs.writeFormat=Text
a1.sinks.s2.hdfs.round=true
a1.sinks.s2.hdfs.roundValue=1
a1.sinks.s2.hdfs.roundUnit=second
a1.sinks.s2.hdfs.useLocalTimeStamp=true
a1.sources.r1.channels=c1 c2
a1.sinks.s1.channel=c1
a1.sinks.s2.channel=c2
啟動agent的服務
[root@qianfeng01 flumeconf]# flume-ng agent -c ../conf -f ./rep.conf -n a1 -Dflume.root.logger=INFO,console
測試
[root@qianfeng01 ~]# echo "hello world hello qianfeng" | nc qianfeng01 6666
案例演示:Multiplexing selector
配置方案
[root@qianfeng01 flumeconf]# vi mul.conf
a1.sources = r1
a1.channels = c1 c2
a1.sinks = s1 s2
a1.sources.r1.type=http
a1.sources.r1.bind = qianfeng01
a1.sources.r1.port = 6666
a1.sources.r1.selector.type=multiplexing
a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.USER = c1
a1.sources.r1.selector.mapping.ORDER = c2
a1.sources.r1.selector.default = c1
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
a1.channels.c1.keep-alive=3
a1.channels.c1.byteCapacityBufferPercentage=20
a1.channels.c1.byteCapacity=800000
a1.channels.c2.type=memory
a1.channels.c2.capacity=1000
a1.channels.c2.transactionCapacity=100
a1.sinks.s1.type=hdfs
a1.sinks.s1.hdfs.path=hdfs://qianfeng01:8020/flume/%Y/%m/%d/mul
a1.sinks.s1.hdfs.filePrefix=s1sink
a1.sinks.s1.hdfs.fileSuffix=.log
a1.sinks.s1.hdfs.inUseSuffix=.tmp
a1.sinks.s1.hdfs.rollInterval=60
a1.sinks.s1.hdfs.rollSize=1024
a1.sinks.s1.hdfs.rollCount=10
a1.sinks.s1.hdfs.idleTimeout=0
a1.sinks.s1.hdfs.batchSize=100
a1.sinks.s1.hdfs.fileType=DataStream
a1.sinks.s1.hdfs.writeFormat=Text
a1.sinks.s1.hdfs.round=true
a1.sinks.s1.hdfs.roundValue=1
a1.sinks.s1.hdfs.roundUnit=second
a1.sinks.s1.hdfs.useLocalTimeStamp=true
a1.sinks.s2.type=hdfs
a1.sinks.s2.hdfs.path=hdfs://qianfeng01:8020/flume/%Y/%m/%d/mul
a1.sinks.s2.hdfs.filePrefix=s2sink
a1.sinks.s2.hdfs.fileSuffix=.log
a1.sinks.s2.hdfs.inUseSuffix=.tmp
a1.sinks.s2.hdfs.rollInterval=60
a1.sinks.s2.hdfs.rollSize=1024
a1.sinks.s2.hdfs.rollCount=10
a1.sinks.s2.hdfs.idleTimeout=0
a1.sinks.s2.hdfs.batchSize=100
a1.sinks.s2.hdfs.fileType=DataStream
a1.sinks.s2.hdfs.writeFormat=Text
a1.sinks.s2.hdfs.round=true
a1.sinks.s2.hdfs.roundValue=1
a1.sinks.s2.hdfs.roundUnit=second
a1.sinks.s2.hdfs.useLocalTimeStamp=true
a1.sources.r1.channels=c1 c2
a1.sinks.s1.channel=c1
a1.sinks.s2.channel=c2
啟動Agent的服務
[root@qianfeng01 flumeconf]# flume-ng agent -c ../conf -f ./mul.conf -n a1 -Dflume.root.logger=INFO,console
測試
[root@qianfeng01 ~]# curl -X POST -d '[{"headers":{"state":"ORDER"},"body":"this my multiplex to c2"}]' http://qianfeng01:6666
[root@qianfeng01 ~]# curl -X POST -d '[{"headers":{"state":"ORDER"},"body":"this is my content"}]' http://qianfeng01:6666
開班時間:2021-04-12(深圳)
開班盛況開班時間:2021-05-17(北京)
開班盛況開班時間:2021-03-22(杭州)
開班盛況開班時間:2021-04-26(北京)
開班盛況開班時間:2021-05-10(北京)
開班盛況開班時間:2021-02-22(北京)
開班盛況開班時間:2021-07-12(北京)
預約報名開班時間:2020-09-21(上海)
開班盛況開班時間:2021-07-12(北京)
預約報名開班時間:2019-07-22(北京)
開班盛況Copyright 2011-2023 北京千鋒互聯科技有限公司 .All Right 京ICP備12003911號-5 京公網安備 11010802035720號