问答文章1 问答文章501 问答文章1001 问答文章1501 问答文章2001 问答文章2501 问答文章3001 问答文章3501 问答文章4001 问答文章4501 问答文章5001 问答文章5501 问答文章6001 问答文章6501 问答文章7001 问答文章7501 问答文章8001 问答文章8501 问答文章9001 问答文章9501

请教kafka 启用多台consumer 问题

发布网友 发布时间:2022-04-24 01:10

我来回答

1个回答

热心网友 时间:2023-10-17 12:49

public static void consumer(){
Properties props = new Properties();
props.put("zk.connect", "hadoop-2:2181");
props.put("zk.connectiontimeout.ms", "1000000");
props.put("groupid", "fans_group");

// Create the connection to the cluster
ConsumerConfig consumerConfig = new ConsumerConfig(props);
ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);

Map<String, Integer> map = new HashMap<String, Integer>();
map.put("fans", 1);

// create 4 partitions of the stream for topic “test”, to allow 4 threads to consume
Map<String, List<KafkaStream<Message>>> topicMessageStreams = consumerConnector.createMessageStreams(map);
List<KafkaStream<Message>> streams = topicMessageStreams.get("fans");

// create list of 4 threads to consume from each of the partitions
ExecutorService executor = Executors.newFixedThreadPool(1);
long startTime = System.currentTimeMillis();
// consume the messages in the threads
for(final KafkaStream<Message> stream: streams) {
executor.submit(new Runnable() {
public void run() {
ConsumerIterator<Message> it = stream.iterator();
while (it.hasNext()){
log.debug(byteBufferToString(it.next().message().payload()));
}
}

});
log.debug("use time="+(System.currentTimeMillis()-startTime));
}
}

热心网友 时间:2023-10-17 12:49

public static void consumer(){
Properties props = new Properties();
props.put("zk.connect", "hadoop-2:2181");
props.put("zk.connectiontimeout.ms", "1000000");
props.put("groupid", "fans_group");

// Create the connection to the cluster
ConsumerConfig consumerConfig = new ConsumerConfig(props);
ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);

Map<String, Integer> map = new HashMap<String, Integer>();
map.put("fans", 1);

// create 4 partitions of the stream for topic “test”, to allow 4 threads to consume
Map<String, List<KafkaStream<Message>>> topicMessageStreams = consumerConnector.createMessageStreams(map);
List<KafkaStream<Message>> streams = topicMessageStreams.get("fans");

// create list of 4 threads to consume from each of the partitions
ExecutorService executor = Executors.newFixedThreadPool(1);
long startTime = System.currentTimeMillis();
// consume the messages in the threads
for(final KafkaStream<Message> stream: streams) {
executor.submit(new Runnable() {
public void run() {
ConsumerIterator<Message> it = stream.iterator();
while (it.hasNext()){
log.debug(byteBufferToString(it.next().message().payload()));
}
}

});
log.debug("use time="+(System.currentTimeMillis()-startTime));
}
}
声明声明:本网页内容为用户发布,旨在传播知识,不代表本网认同其观点,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。E-MAIL:11247931@qq.com
玉米仁子饭产自哪里 中国期货交易所的交易品种有哪些? 历史要怎么读,有啥诀窍 高中历史诀窍 年终会活动策划方案 深度解析:第一财经回放,探索财经新风向 逆水寒手游庄园怎么邀请好友同住 逆水寒手游 逆水寒不同区可以一起组队吗? 逆水寒手游 逆水寒怎么进入好友世界? 逆水寒手游 逆水寒怎么去别人的庄园? 电脑总是蓝屏是怎么回事啊? spring cloud kafka运行报错怎么解决啊??? 网友找你借钱,能借给他吗? 如何用Java向kafka发送json数据 电脑开机后蓝屏,是什么坏了? 你会借钱给网友吗? 免费的手机邮箱内存及附件有多大? 学生可以学习哪些理财知识呢? spark接kafka怎么区分topic 老公网恋借钱给别人我能报案吗 为什么电脑总是自动蓝屏,是什么地方坏了 使用java实现kafka consumer时报错 主动给网友借钱是否还能要回来?? 借钱给网友2万元,对方不能一次还清,提出每个月还一部分,我不同意,让律师处理,有用吗? 如何确定Kafka的分区数,key和consumer线程数 电脑蓝屏怎么办 是什么地方坏了 微信借钱给网友,除了,其他信息都没有,对方也不把我拉黑,也没有说不给一直拖,怎么办? Java通过zk连接kafka,程序未报错,但是取不到数据。将程序在另一台主机 电脑蓝屏是那里坏了? 借钱给网友,结果不还钱,报警需要什么证据 电脑为什么会出现蓝屏是什么坏了吗 联通有免费手机邮箱吗? kafka connect到底会不会重写/丢失数据 普通人如何学习理财 电脑蓝屏是哪里坏了? 中国联通的手机邮箱免费吗??? 电脑运行的时候会突然变成蓝屏,什么坏了? 免费手机邮箱? 电脑开机蓝屏是不是主板坏了? 有哪些渠道可以学习理财知识? 手机邮箱要钱吗 中国移动的手机邮箱有没有免费的 免费邮箱(189邮箱)好用吗? 如何用手机注册免费邮箱? 黑峡谷机械键盘怎么样? 手机邮箱6元版 是不是付费的?如何退订? 为什么都推荐黑峡谷机械键盘 黑峡谷的机械键盘质量怎么样? 黑峡谷键盘怎么调键盘灯 想入手黑峡谷x3,哪个轴比较适合fps游戏,主要《csgo》?