flume与kafka集成配置
发布网友
发布时间:2022-12-22 22:45
我来回答
共1个回答
热心网友
时间:2023-09-20 23:02
简介
Flume代理配置存储在本地配置文件中。这是遵循Javaproperties文件格式的文本文件。可以在同一配置文件中指定一个或多个代理的配置。配置文件包括代理中每个source,sink和channel的属性,以及它们如何连接在一起以形成数据流。
流中的每个组件(source,sink和channel)都有一个名称,类型和特定于该类型和实例化的属性集。例如,一个Avro源需要一个主机名(或IP地址)和一个端口号来接收数据。内存通道可以具有最大队列大小(“capacity”),并且HDFS的sink需要知道文件系统URI,创建文件的路径,文件rotation的frequency(“hdfs.rollInterval”)等。组件的所有此类属性需要在hosting Flume代理的属性文件中进行设置。
代理需要知道要加载哪些单个组件以及如何连接它们才能构成流程。通过列出代理中每个source,sink和channel的名称,然后为每个sink和source指定channel来完成此操作。例如,代理通过称为文件通道的文件通道将事件从名为avroWeb的Avro源流到HDFS接收器hdfs-cluster1。配置文件将包含这些组件的名称和文件通道,作为avroWebsource和hdfs-cluster1sink的共享通道。
使用称为flume-ng的shell脚本启动代理,该脚本位于Flume发行版的bin目录中。您需要在命令行上指定代理名称,配置目录和配置文件:
$ bin/flume-ng agent -n $agent_name -c conf-f conf/flume-conf.properties.template
然后,代理将开始运行在给定属性文件中配置的source,sink和channel。
示例
在这里,我们提供了一个示例配置文件,描述了单节点Flume部署。通过此配置,用户可以生成事件,然后将其记录到控制台。
#example.conf:单节点Flume配置
#在此代理上命名组件
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#描述/配置源
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
#描述接收器
a1.sinks.k1.type = logger
#使用通道将事件缓存在内存
a1.channels.c1.type = memory
a1.channels中.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#将源和接收器绑定到通道
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
此配置定义了一个名为a1的代理。a1具有侦听端口44444上的数据的source,在内存中缓冲事件数据的通道以及将事件数据记录到控制台的sink。配置文件为各个组件命名,然后描述它们的类型和配置参数。给定的配置文件可能会定义几个命名的代理。当启动给定的Flume进程时,会传递一个标志,告诉它要显示哪个命名的代理。
有了这个配置文件,我们可以如下启动Flume:
$ bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console
请注意,在完整部署中,我们通常会包含一个选项:-- conf
在.bash_profile中加入flume环境变量
PATH=/usr/flume/bin:$PATH:$HOME/bin
source .bash_profile刷新
使用shell将大量文件分发到不同sources中
定时任务* * * * * sh cp.sh
cp.sh
#!/bin/bash
source ~/.bash_profile
time=`date +%Y%m%d%H%M -d -2min`
echo `date '+%Y-%m-%d %H:%M:%S'`":$time cp start"
for file in `ls *xxxx*`
do
file_name=`basename $file`
cp $file /data/1/$file_name.tmp
mv /data/1/$file_name.tmp /data/1/$file_name
done &
创建.conf文件
例:
agent1.sources = s1
agent1.channels = c1
agent1.sinks = k1 k1_1
agent1.sources.s1.type = spooldir
agent1.sources.s1.fileSuffix = .comp
agent1.sources.s1.deletePolicy = immediate
agent1.sources.s1.spoolDir=/data/1/
agent1.sources.s1.fileHeader= false
agent1.sources.s1.channels = c1
agent1.sources.s1.trackerDir = /data/flumespool/s1
agent1.sources.s1.ignorePattern = (.)*.\.tmp
agent1.channels.c1.type = memory
agent1.channels.c1.keep-alive = 10
agent1.channels.c1.capacity = 5000
agent1.channels.c1.transactionCapacity = 1000
agent1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.k1.topic = topic
agent1.sinks.k1.brokerList = kafka_1:9092,kafka_2:9092,kafka_3:9092
agent1.sinks.k1.requiredAcks = 1
agent1.sinks.k1.batchSize = 500
agent1.sinks.k1.kafka.receive.buffer.bytes = 200000
agent1.sinks.k1.kafka.send.buffer.bytes = 300000
agent1.sinks.k1.channel = c1
agent1.sinks.k1_1.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.k1_1.topic = topic
agent1.sinks.k1_1.brokerList =kafka_1:9092,kafka_2:9092,kafka_3:9092
agent1.sinks.k1_1.requiredAcks = 1
agent1.sinks.k1_1.batchSize = 500
agent1.sinks.k1_1.kafka.receive.buffer.bytes = 200000
agent1.sinks.k1_1.kafka.send.buffer.bytes = 300000
agent1.sinks.k1_1.channel = c1
sinks中k1与k1_1实现双线程
使用1个flume连接2个kafka,即同时向2个kafka中录入数据可以在同一agent下配置2个channels和2个sinks,source共用一个
示例:
xxx.conf
agent1.sources = s1
agent1.channels = c1 cx1
agent1.sinks = k1 k1_1 kx1 kx1_1
agent1.sources.s1.type = spooldir
agent1.sources.s1.fileSuffix = .comp
agent1.sources.s1.deletePolicy = immediate
agent1.sources.s1.spoolDir=/data/1/
agent1.sources.s1.fileHeader= false
agent1.sources.s1.channels = c1 cx1
agent1.sources.s1.trackerDir = /data/flumespool/s1
agent1.sources.s1.ignorePattern = (.)*.\.tmp
agent1.channels.c1.type = memory
agent1.channels.c1.keep-alive = 10
agent1.channels.c1.capacity = 5000
agent1.channels.c1.transactionCapacity = 1000
agent1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.k1.topic = topic1
agent1.sinks.k1.brokerList =kafka1_1:9092,kafka1_2:9092,kafka1_3:9092
agent1.sinks.k1.requiredAcks = 1
agent1.sinks.k1.batchSize = 500
agent1.sinks.k1.kafka.receive.buffer.bytes = 200000
agent1.sinks.k1.kafka.send.buffer.bytes = 300000
agent1.sinks.k1.channel = c1
agent1.sinks.k1_1.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.k1_1.topic = topic1
agent1.sinks.k1_1.brokerList = kafka1_1:9092,kafka1_2:9092,kafka1_3:9092
agent1.sinks.k1_1.requiredAcks = 1
agent1.sinks.k1_1.batchSize = 500
agent1.sinks.k1_1.kafka.receive.buffer.bytes = 200000
agent1.sinks.k1_1.kafka.send.buffer.bytes = 300000
agent1.sinks.k1_1.channel = c1
agent1.channels.cx1.type = memory
agent1.channels.cx1.keep-alive = 10
agent1.channels.cx1.capacity = 5000
agent1.channels.cx1.transactionCapacity = 1000
agent1.sinks.kx1.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.kx1.topic = topic2
agent1.sinks.kx1.brokerList = kafka2_1:9092,kafka2_2:9092,kafka2_3:9092
agent1.sinks.kx1.requiredAcks = 1
agent1.sinks.kx1.batchSize = 500
agent1.sinks.kx1.kafka.receive.buffer.bytes = 200000
agent1.sinks.kx1.kafka.send.buffer.bytes = 300000
agent1.sinks.kx1.channel = cx1
agent1.sinks.kx1_1.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.kx1_1.topic = topic2
agent1.sinks.kx1_1.brokerList = kafka2_1:9092,kafka2_2:9092,kafka2_3:9092
agent1.sinks.kx1_1.requiredAcks = 1
agent1.sinks.kx1_1.batchSize = 500
agent1.sinks.kx1_1.kafka.receive.buffer.bytes = 200000
agent1.sinks.kx1_1.kafka.send.buffer.bytes = 300000
agent1.sinks.kx1_1.channel = cx1
启动命令如下:
flume-ng agent -c /usr/flume/conf/ -f xxx.conf -n agent1 -Dflume.root.logger=INFO,console >log/1`date +%Y%m%d`.log 2>&1 &