发布网友 发布时间:2022-04-09 16:38
共2个回答
懂视网 时间:2022-04-09 21:00
采用exec source,指定执行命令行为python xxx.py,在xxx.py代码中处理日志,并按照跟flume-ng-mongodb-sink的约定,print出json格式的数据,如果update类操作必须带着_id字段,print出来的日志被当作Event的Body,我再通过interceptors给它加上自定义Event Header;static interceptors用于为Event Header添加信息,这里我为它加上了db=cswuyg_test、collection=cswuyg_test、op=upsert,这三个key是跟flume-ng-mongodb-sink 约定的,用于指定mongodb中的db、collection名以及操作类型为update。
my_agent.channels.my_channel_1.type = file my_agent.channels.my_channel_1.checkpointDir = /home/work/flume/file-channel/my_channel_1/checkPoint my_agent.channels.my_channel_1.useDualCheckpoints = true my_agent.channels.my_channel_1.backupCheckpointDir = /home/work/flume/file-channel/my_channel_1/checkPoint2 my_agent.channels.my_channel_1.dataDirs = /home/work/flume/file-channel/my_channel_1/data my_agent.channels.my_channel_1.transactionCapacity = 10000 my_agent.channels.my_channel_1.checkpointInterval = 30000 my_agent.channels.my_channel_1.maxFileSize = 4292870142 my_agent.channels.my_channel_1.minimumRequiredSpace = 524288000 my_agent.channels.my_channel_1.capacity = 100000
sink配置:
my_agent.sinks.my_mongo_1.type = org.riderzen.flume.sink.MongoSink my_agent.sinks.my_mongo_1.host = xxxhost my_agent.sinks.my_mongo_1.port = yyyport my_agent.sinks.my_mongo_1.model = DYNAMIC/SINGLE ---查看源码仅支持此二种方式,并且必须大小
my_agent.sinks.my_mongo_1.db = XXX --mongo表名,默认名称为events
my_agent.sinks.my_mongo_1.username = XXX --mongo用户名
my_agent.sinks.my_mongo_1.password = YYY --mongo密码
my_agent.sinks.my_mongo_1.collecion = log my_agent.sinks.my_mongo_1.batch = 10 my_agent.sinks.my_mongo_1.channel = my_channel_1 my_agent.sinks.my_mongo_1.timestampField = _S
参见:http://www.cnblogs.com/cswuyg/p/4498804.html
Flume-ng-mongodb-sink
标签:ati 用户名 pch events header action 注意 变量 bug
热心网友 时间:2022-04-09 18:08
需要实时收集多台服务器的nginx日志到一台机器。收集完成结果存放需要按天生成文件夹,按每5分钟生成文件,比如2012年12月29日12点26分的日志,需要放到/data/log/20121229/log-1225-对应的文件中。自己实现了类似flume-og和flume-ng的hdfs-sink的文件sink。