问答文章1 问答文章501 问答文章1001 问答文章1501 问答文章2001 问答文章2501 问答文章3001 问答文章3501 问答文章4001 问答文章4501 问答文章5001 问答文章5501 问答文章6001 问答文章6501 问答文章7001 问答文章7501 问答文章8001 问答文章8501 问答文章9001 问答文章9501

flume与kafka集成配置

发布网友 发布时间:2022-12-22 22:45

我来回答

1个回答

热心网友 时间:2023-09-20 23:02

简介

Flume代理配置存储在本地配置文件中。这是遵循Javaproperties文件格式的文本文件。可以在同一配置文件中指定一个或多个代理的配置。配置文件包括代理中每个source,sink和channel的属性,以及它们如何连接在一起以形成数据流。

流中的每个组件(source,sink和channel)都有一个名称,类型和特定于该类型和实例化的属性集。例如,一个Avro源需要一个主机名(或IP地址)和一个端口号来接收数据。内存通道可以具有最大队列大小(“capacity”),并且HDFS的sink需要知道文件系统URI,创建文件的路径,文件rotation的frequency(“hdfs.rollInterval”)等。组件的所有此类属性需要在hosting Flume代理的属性文件中进行设置。

代理需要知道要加载哪些单个组件以及如何连接它们才能构成流程。通过列出代理中每个source,sink和channel的名称,然后为每个sink和source指定channel来完成此操作。例如,代理通过称为文件通道的文件通道将事件从名为avroWeb的Avro源流到HDFS接收器hdfs-cluster1。配置文件将包含这些组件的名称和文件通道,作为avroWebsource和hdfs-cluster1sink的共享通道。

使用称为flume-ng的shell脚本启动代理,该脚本位于Flume发行版的bin目录中。您需要在命令行上指定代理名称,配置目录和配置文件:

$ bin/flume-ng agent -n $agent_name -c conf-f conf/flume-conf.properties.template

然后,代理将开始运行在给定属性文件中配置的source,sink和channel。

示例

在这里,我们提供了一个示例配置文件,描述了单节点Flume部署。通过此配置,用户可以生成事件,然后将其记录到控制台。

#example.conf:单节点Flume配置

#在此代理上命名组件

a1.sources   =  r1

a1.sinks   =  k1

a1.channels   =  c1

#描述/配置源

a1.sources.r1.type   =  netcat

a1.sources.r1.bind   =  localhost

a1.sources.r1.port   =  44444

#描述接收器

a1.sinks.k1.type   =  logger

#使用通道将事件缓存在内存

a1.channels.c1.type   =  memory

a1.channels中.c1.capacity   =  1000

a1.channels.c1.transactionCapacity   = 100

#将源和接收器绑定到通道

a1.sources.r1.channels   =  c1

a1.sinks.k1.channel   =  c1

此配置定义了一个名为a1的代理。a1具有侦听端口44444上的数据的source,在内存中缓冲事件数据的通道以及将事件数据记录到控制台的sink。配置文件为各个组件命名,然后描述它们的类型和配置参数。给定的配置文件可能会定义几个命名的代理。当启动给定的Flume进程时,会传递一个标志,告诉它要显示哪个命名的代理。

有了这个配置文件,我们可以如下启动Flume:

$ bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console

请注意,在完整部署中,我们通常会包含一个选项:-- conf

在.bash_profile中加入flume环境变量

PATH=/usr/flume/bin:$PATH:$HOME/bin

source .bash_profile刷新

使用shell将大量文件分发到不同sources中

定时任务* * * * * sh cp.sh

cp.sh

#!/bin/bash

source ~/.bash_profile

time=`date +%Y%m%d%H%M -d -2min`

echo `date '+%Y-%m-%d %H:%M:%S'`":$time cp start"

for file in `ls *xxxx*`

do

 file_name=`basename $file`

 cp $file /data/1/$file_name.tmp

 mv /data/1/$file_name.tmp  /data/1/$file_name

done &

创建.conf文件

例:

agent1.sources = s1

agent1.channels = c1

agent1.sinks = k1 k1_1

agent1.sources.s1.type = spooldir

agent1.sources.s1.fileSuffix = .comp

agent1.sources.s1.deletePolicy = immediate

agent1.sources.s1.spoolDir=/data/1/

agent1.sources.s1.fileHeader= false

agent1.sources.s1.channels = c1

agent1.sources.s1.trackerDir = /data/flumespool/s1

agent1.sources.s1.ignorePattern = (.)*.\.tmp

agent1.channels.c1.type = memory

agent1.channels.c1.keep-alive = 10

agent1.channels.c1.capacity = 5000

agent1.channels.c1.transactionCapacity = 1000

agent1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink

agent1.sinks.k1.topic = topic

agent1.sinks.k1.brokerList = kafka_1:9092,kafka_2:9092,kafka_3:9092

agent1.sinks.k1.requiredAcks = 1

agent1.sinks.k1.batchSize = 500

agent1.sinks.k1.kafka.receive.buffer.bytes = 200000

agent1.sinks.k1.kafka.send.buffer.bytes = 300000

agent1.sinks.k1.channel = c1

agent1.sinks.k1_1.type = org.apache.flume.sink.kafka.KafkaSink

agent1.sinks.k1_1.topic = topic

agent1.sinks.k1_1.brokerList =kafka_1:9092,kafka_2:9092,kafka_3:9092

agent1.sinks.k1_1.requiredAcks = 1

agent1.sinks.k1_1.batchSize = 500

agent1.sinks.k1_1.kafka.receive.buffer.bytes = 200000

agent1.sinks.k1_1.kafka.send.buffer.bytes = 300000

agent1.sinks.k1_1.channel = c1

sinks中k1与k1_1实现双线程

使用1个flume连接2个kafka,即同时向2个kafka中录入数据可以在同一agent下配置2个channels和2个sinks,source共用一个

示例:

xxx.conf

agent1.sources = s1

agent1.channels = c1 cx1

agent1.sinks = k1 k1_1 kx1 kx1_1

agent1.sources.s1.type = spooldir

agent1.sources.s1.fileSuffix = .comp

agent1.sources.s1.deletePolicy = immediate

agent1.sources.s1.spoolDir=/data/1/

agent1.sources.s1.fileHeader= false

agent1.sources.s1.channels = c1 cx1

agent1.sources.s1.trackerDir = /data/flumespool/s1

agent1.sources.s1.ignorePattern = (.)*.\.tmp

agent1.channels.c1.type = memory

agent1.channels.c1.keep-alive = 10

agent1.channels.c1.capacity = 5000

agent1.channels.c1.transactionCapacity = 1000

agent1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink

agent1.sinks.k1.topic = topic1

agent1.sinks.k1.brokerList =kafka1_1:9092,kafka1_2:9092,kafka1_3:9092

agent1.sinks.k1.requiredAcks = 1

agent1.sinks.k1.batchSize = 500

agent1.sinks.k1.kafka.receive.buffer.bytes = 200000

agent1.sinks.k1.kafka.send.buffer.bytes = 300000

agent1.sinks.k1.channel = c1

agent1.sinks.k1_1.type = org.apache.flume.sink.kafka.KafkaSink

agent1.sinks.k1_1.topic = topic1

agent1.sinks.k1_1.brokerList = kafka1_1:9092,kafka1_2:9092,kafka1_3:9092

agent1.sinks.k1_1.requiredAcks = 1

agent1.sinks.k1_1.batchSize = 500

agent1.sinks.k1_1.kafka.receive.buffer.bytes = 200000

agent1.sinks.k1_1.kafka.send.buffer.bytes = 300000

agent1.sinks.k1_1.channel = c1

agent1.channels.cx1.type = memory

agent1.channels.cx1.keep-alive = 10

agent1.channels.cx1.capacity = 5000

agent1.channels.cx1.transactionCapacity = 1000

agent1.sinks.kx1.type = org.apache.flume.sink.kafka.KafkaSink

agent1.sinks.kx1.topic = topic2

agent1.sinks.kx1.brokerList = kafka2_1:9092,kafka2_2:9092,kafka2_3:9092

agent1.sinks.kx1.requiredAcks = 1

agent1.sinks.kx1.batchSize = 500

agent1.sinks.kx1.kafka.receive.buffer.bytes = 200000

agent1.sinks.kx1.kafka.send.buffer.bytes = 300000

agent1.sinks.kx1.channel = cx1

agent1.sinks.kx1_1.type = org.apache.flume.sink.kafka.KafkaSink

agent1.sinks.kx1_1.topic = topic2

agent1.sinks.kx1_1.brokerList = kafka2_1:9092,kafka2_2:9092,kafka2_3:9092

agent1.sinks.kx1_1.requiredAcks = 1

agent1.sinks.kx1_1.batchSize = 500

agent1.sinks.kx1_1.kafka.receive.buffer.bytes = 200000

agent1.sinks.kx1_1.kafka.send.buffer.bytes = 300000

agent1.sinks.kx1_1.channel = cx1

启动命令如下:

flume-ng agent -c /usr/flume/conf/ -f xxx.conf -n agent1 -Dflume.root.logger=INFO,console >log/1`date +%Y%m%d`.log 2>&1 &
声明声明:本网页内容为用户发布,旨在传播知识,不代表本网认同其观点,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。E-MAIL:11247931@qq.com
手机壁纸花卉红色手机壁纸花卉 辽宁智能五芯防水连接器 军工产品浅谈——航空插头 xp电脑系统修复XP系统怎么恢复出厂设置 ...表编辑器后打不开任何文件,无需重装系统,电脑盲都能用 粼基本信息 粼五笔怎样打? 【微科普】想变成光?先搞清楚光是什么吧! 手机被拉黑后打电话是什么提示? 物流管理专业对数学的要求高么 怎么关闭微信快递服务通知? vivox70pro怎样删除快递服务 国产操作系统排名 apache flume Alan walker是什么梗 花在植物体的结构层次上属于什么 建筑工程企业资质标准 之桃对应的男名 向珊的签名字怎么写 以前女生有多长名字 向珊质料 保税区仓库里主要是什么货物啊 仓储保税货物的介绍 我做核酸都二十个小时了现在还没出结果呢? 情侣去游泳,为什么男孩在游泳的时候,抓了一把像海带的东西就上来了... 嵌入式c语言中float下溢出会复位吗 密度钢和碳钢的区别 德国康立雅锅蒸笼是多大尺寸的 此所谓有功而见疑者也什么意思? 七年级语文上册第30课后俩文言文的解释 故之所谓隐士者,非伏其身而弗见也,非闭其言而不出也,非藏其知而不发... 跑滴滴一个月能赚多少钱? 课外活动专题:有趣的人做有趣的事 中科健康国际专卖店 江宁店怎么样 为什么选择南京中科健康产业集团 后宫嫔妃众多,她们争宠真的是因为爱皇帝吗? 华为智慧屏电视机上钉钉怎么完成打卡任务 2022年什么时候立冬 故人归不归的上一句 深圳市善之能科技有限公司是不是培训机构 深圳善之能科技招聘是真的吗 善之能科技有限公司是培训么 善之能招学徒是真的吗 战国时期儒家学派代表人物孟子的思想主张是? 关于“愿望”的题目……急啊……记住要用修辞手法哦 洗车泡沫壶里装什么 爱上了梦里的男孩,无法自拔该怎么办? “我吓得汗毛都竖起来了”在英语里有没有 围棋的具体规则是什么? 你家在那里歌词 你家在哪里第几版本中