问答文章1 问答文章501 问答文章1001 问答文章1501 问答文章2001 问答文章2501 问答文章3001 问答文章3501 问答文章4001 问答文章4501 问答文章5001 问答文章5501 问答文章6001 问答文章6501 问答文章7001 问答文章7501 问答文章8001 问答文章8501 问答文章9001 问答文章9501

kafka-console-consumer为什么没有记录

发布网友 发布时间:2022-04-24 04:30

我来回答

1个回答

热心网友 时间:2023-10-28 04:54

不过要注意一些注意事项,对于多个partition和多个consumer 一. 如果consumer比partition多,是浪费,因为kafka的设计是在一个partition上是不允许并发的,所以consumer数不要大于partition数 二. 如果consumer比partition少,一个consumer会对应于多个partitions,这里主要合理分配consumer数和partition数,否则会导致partition里面的数据被取的不均匀 最好partiton数目是consumer数目的整数倍,所以partition数目很重要,比如取二四,就很容易设定consumer数目 三. 如果consumer从多个partition读到数据,不保证数据间的顺序性,kafka只保证在一个partition上数据是有序的,但多个partition,根据你读的顺序会有不同 四. 增减consumer,broker,partition会导致rebalance,所以rebalance后consumer对应的partition会发生变化 5. High-level接口中获取不到数据的时候是会block的 简单版, 简单的坑,如果测试流程是,先proce一些数据,然后再用consumer读的话,记得加上第一句设置 因为初始的offset默认是非法的,然后这个设置的意思是,当offset非法时,如何修正offset,默认是largest,即最新,所以不加这个配置,你是读不到你之前proce的数据的,而且这个时候你再加上smallest配置也没用了,因为此时offset是合法的,不会再被修正了,需要手工或用工具改重置offset Properties props = new Properties(); props.put("auto.offset.reset", "smallest"); //必须要加,如果要读旧数据 props.put("zookeeper.connect", "localhost:二一吧一"); props.put("group.id", "pv"); props.put("zookeeper.session.timeout.ms", "四00"); props.put("zookeeper.sync.time.ms", "二00"); props.put("auto中国mit.interval.ms", "一000"); ConsumerConfig conf = new ConsumerConfig(props); ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(conf); String topic = "page_visits"; Map topicCountMap = new HashMap(); topicCountMap.put(topic, new Integer(一)); Map<String, List<KafkaStream>> consumerMap = consumer.createMessageStreams(topicCountMap); List<KafkaStream> streams = consumerMap.get(topic); KafkaStream stream = streams.get(0); ConsumerIterator it = stream.iterator(); while (it.hasNext()){ System.out.println("message: " + new String(it.next().message())); } if (consumer != null) consumer.shutdown(); //其实执行不到,因为上面的hasNext会block 在用high-level的consumer时,两个给力的工具, 一. bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group pv 可以看到当前group offset的状况,比如这里看pv的状况,三个partition Group Topic Pid Offset logSize Lag Owner pv page_visits 0 二一 二一 0 none pv page_visits 一 一9 一9 0 none pv page_visits 二 二0 二0 0 none 关键就是offset,logSize和Lag 这里以前读完了,所以offset=logSize,并且Lag=0 二. bin/kafka-run-class.sh kafka.tools.UpdateOffsetsInZK earliest config/consumer.properties page_visits 三个参数, [earliest | latest],表示将offset置到哪里 consumer.properties ,这里是配置文件的路径 topic,topic名,这里是page_visits 我们对上面的pv group执行完这个操作后,再去check group offset状况,结果如下, Group Topic Pid Offset logSize Lag Owner pv page_visits 0 0 二一 二一 none pv page_visits 一 0 一9 一9 none pv page_visits 二 0 二0 二0 none 可以看到offset已经被清0,Lag=logSize 底下给出原文中多线程consumer的完整代码 import kafka.consumer.ConsumerConfig; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ConsumerGroupExample { private final ConsumerConnector consumer; private final String topic; private ExecutorService executor; public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic) { consumer = kafka.consumer.Consumer.createJavaConsumerConnector( // 创建Connector,注意下面对conf的配置 createConsumerConfig(a_zookeeper, a_groupId)); this.topic = a_topic; } public void shutdown() { if (consumer != null) consumer.shutdown(); if (executor != null) executor.shutdown(); } public void run(int a_numThreads) { // 创建并发的consumers Map topicCountMap = new HashMap(); topicCountMap.put(topic, new Integer(a_numThreads)); // 描述读取哪个topic,需要几个线程读 Map<String, List<KafkaStream>> consumerMap = consumer.createMessageStreams(topicCountMap); // 创建Streams List<KafkaStream> streams = consumerMap.get(topic); // 每个线程对应于一个KafkaStream // now launch all the threads // executor = Executors.newFixedThreadPool(a_numThreads); // now create an object to consume the messages // int threadNumber = 0; for (final KafkaStream stream : streams) { executor.submit(new ConsumerTest(stream, threadNumber)); // 启动consumer thread threadNumber++; } } private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) { Properties props = new Properties(); props.put("zookeeper.connect", a_zookeeper); props.put("group.id", a_groupId); props.put("zookeeper.session.timeout.ms", "四00"); props.put("zookeeper.sync.time.ms", "二00"); props.put("auto中国mit.interval.ms", "一000"); return new ConsumerConfig(props); } public static void main(String[] args) { String zooKeeper = args[0]; String groupId = args[一]; String topic = args[二]; int threads = Integer.parseInt(args[三]); ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic); example.run(threads); try { Thread.sleep(一0000); } catch (InterruptedException ie) { } example.shutdown(); } } SimpleConsumer 另一种是SimpleConsumer,名字起的,以为是简单的接口,其实是low-level consumer,更复杂的接口 参考, 什么时候用这个接口? Read a message multiple times Consume only a subset of the partitions in a topic in a process Manage transactions to make sure a message is processed once and only once 当然用这个接口是有代价的,即partition,broker,offset对你不再透明,需要自己去管理这些,并且还要handle broker leader的切换,很麻烦 所以不是一定要用,最好别用 You must keep track of the offsets in your application to know where you left off consuming. You must figure out which Broker is the lead Broker for a topic and partition You must handle Broker leader changes 使用SimpleConsumer的步骤: Find an active Broker and find out which Broker is the leader for your topic and partition Determine who the replica Brokers are for your topic and partition Build the request defining what data you are interested in Fetch the data Identify and recover from leader changes 首先,你必须知道读哪个topic的哪个partition 然后,找到负责该partition的broker leader,从而找到存有该partition副本的那个broker 再者,自己去写request并fetch数据 最终,还要注意需要识别和处理broker leader的改

热心网友 时间:2023-10-28 04:54

不过要注意一些注意事项,对于多个partition和多个consumer 一. 如果consumer比partition多,是浪费,因为kafka的设计是在一个partition上是不允许并发的,所以consumer数不要大于partition数 二. 如果consumer比partition少,一个consumer会对应于多个partitions,这里主要合理分配consumer数和partition数,否则会导致partition里面的数据被取的不均匀 最好partiton数目是consumer数目的整数倍,所以partition数目很重要,比如取二四,就很容易设定consumer数目 三. 如果consumer从多个partition读到数据,不保证数据间的顺序性,kafka只保证在一个partition上数据是有序的,但多个partition,根据你读的顺序会有不同 四. 增减consumer,broker,partition会导致rebalance,所以rebalance后consumer对应的partition会发生变化 5. High-level接口中获取不到数据的时候是会block的 简单版, 简单的坑,如果测试流程是,先proce一些数据,然后再用consumer读的话,记得加上第一句设置 因为初始的offset默认是非法的,然后这个设置的意思是,当offset非法时,如何修正offset,默认是largest,即最新,所以不加这个配置,你是读不到你之前proce的数据的,而且这个时候你再加上smallest配置也没用了,因为此时offset是合法的,不会再被修正了,需要手工或用工具改重置offset Properties props = new Properties(); props.put("auto.offset.reset", "smallest"); //必须要加,如果要读旧数据 props.put("zookeeper.connect", "localhost:二一吧一"); props.put("group.id", "pv"); props.put("zookeeper.session.timeout.ms", "四00"); props.put("zookeeper.sync.time.ms", "二00"); props.put("auto中国mit.interval.ms", "一000"); ConsumerConfig conf = new ConsumerConfig(props); ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(conf); String topic = "page_visits"; Map topicCountMap = new HashMap(); topicCountMap.put(topic, new Integer(一)); Map<String, List<KafkaStream>> consumerMap = consumer.createMessageStreams(topicCountMap); List<KafkaStream> streams = consumerMap.get(topic); KafkaStream stream = streams.get(0); ConsumerIterator it = stream.iterator(); while (it.hasNext()){ System.out.println("message: " + new String(it.next().message())); } if (consumer != null) consumer.shutdown(); //其实执行不到,因为上面的hasNext会block 在用high-level的consumer时,两个给力的工具, 一. bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group pv 可以看到当前group offset的状况,比如这里看pv的状况,三个partition Group Topic Pid Offset logSize Lag Owner pv page_visits 0 二一 二一 0 none pv page_visits 一 一9 一9 0 none pv page_visits 二 二0 二0 0 none 关键就是offset,logSize和Lag 这里以前读完了,所以offset=logSize,并且Lag=0 二. bin/kafka-run-class.sh kafka.tools.UpdateOffsetsInZK earliest config/consumer.properties page_visits 三个参数, [earliest | latest],表示将offset置到哪里 consumer.properties ,这里是配置文件的路径 topic,topic名,这里是page_visits 我们对上面的pv group执行完这个操作后,再去check group offset状况,结果如下, Group Topic Pid Offset logSize Lag Owner pv page_visits 0 0 二一 二一 none pv page_visits 一 0 一9 一9 none pv page_visits 二 0 二0 二0 none 可以看到offset已经被清0,Lag=logSize 底下给出原文中多线程consumer的完整代码 import kafka.consumer.ConsumerConfig; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ConsumerGroupExample { private final ConsumerConnector consumer; private final String topic; private ExecutorService executor; public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic) { consumer = kafka.consumer.Consumer.createJavaConsumerConnector( // 创建Connector,注意下面对conf的配置 createConsumerConfig(a_zookeeper, a_groupId)); this.topic = a_topic; } public void shutdown() { if (consumer != null) consumer.shutdown(); if (executor != null) executor.shutdown(); } public void run(int a_numThreads) { // 创建并发的consumers Map topicCountMap = new HashMap(); topicCountMap.put(topic, new Integer(a_numThreads)); // 描述读取哪个topic,需要几个线程读 Map<String, List<KafkaStream>> consumerMap = consumer.createMessageStreams(topicCountMap); // 创建Streams List<KafkaStream> streams = consumerMap.get(topic); // 每个线程对应于一个KafkaStream // now launch all the threads // executor = Executors.newFixedThreadPool(a_numThreads); // now create an object to consume the messages // int threadNumber = 0; for (final KafkaStream stream : streams) { executor.submit(new ConsumerTest(stream, threadNumber)); // 启动consumer thread threadNumber++; } } private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) { Properties props = new Properties(); props.put("zookeeper.connect", a_zookeeper); props.put("group.id", a_groupId); props.put("zookeeper.session.timeout.ms", "四00"); props.put("zookeeper.sync.time.ms", "二00"); props.put("auto中国mit.interval.ms", "一000"); return new ConsumerConfig(props); } public static void main(String[] args) { String zooKeeper = args[0]; String groupId = args[一]; String topic = args[二]; int threads = Integer.parseInt(args[三]); ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic); example.run(threads); try { Thread.sleep(一0000); } catch (InterruptedException ie) { } example.shutdown(); } } SimpleConsumer 另一种是SimpleConsumer,名字起的,以为是简单的接口,其实是low-level consumer,更复杂的接口 参考, 什么时候用这个接口? Read a message multiple times Consume only a subset of the partitions in a topic in a process Manage transactions to make sure a message is processed once and only once 当然用这个接口是有代价的,即partition,broker,offset对你不再透明,需要自己去管理这些,并且还要handle broker leader的切换,很麻烦 所以不是一定要用,最好别用 You must keep track of the offsets in your application to know where you left off consuming. You must figure out which Broker is the lead Broker for a topic and partition You must handle Broker leader changes 使用SimpleConsumer的步骤: Find an active Broker and find out which Broker is the leader for your topic and partition Determine who the replica Brokers are for your topic and partition Build the request defining what data you are interested in Fetch the data Identify and recover from leader changes 首先,你必须知道读哪个topic的哪个partition 然后,找到负责该partition的broker leader,从而找到存有该partition副本的那个broker 再者,自己去写request并fetch数据 最终,还要注意需要识别和处理broker leader的改

热心网友 时间:2023-10-28 04:54

不过要注意一些注意事项,对于多个partition和多个consumer 一. 如果consumer比partition多,是浪费,因为kafka的设计是在一个partition上是不允许并发的,所以consumer数不要大于partition数 二. 如果consumer比partition少,一个consumer会对应于多个partitions,这里主要合理分配consumer数和partition数,否则会导致partition里面的数据被取的不均匀 最好partiton数目是consumer数目的整数倍,所以partition数目很重要,比如取二四,就很容易设定consumer数目 三. 如果consumer从多个partition读到数据,不保证数据间的顺序性,kafka只保证在一个partition上数据是有序的,但多个partition,根据你读的顺序会有不同 四. 增减consumer,broker,partition会导致rebalance,所以rebalance后consumer对应的partition会发生变化 5. High-level接口中获取不到数据的时候是会block的 简单版, 简单的坑,如果测试流程是,先proce一些数据,然后再用consumer读的话,记得加上第一句设置 因为初始的offset默认是非法的,然后这个设置的意思是,当offset非法时,如何修正offset,默认是largest,即最新,所以不加这个配置,你是读不到你之前proce的数据的,而且这个时候你再加上smallest配置也没用了,因为此时offset是合法的,不会再被修正了,需要手工或用工具改重置offset Properties props = new Properties(); props.put("auto.offset.reset", "smallest"); //必须要加,如果要读旧数据 props.put("zookeeper.connect", "localhost:二一吧一"); props.put("group.id", "pv"); props.put("zookeeper.session.timeout.ms", "四00"); props.put("zookeeper.sync.time.ms", "二00"); props.put("auto中国mit.interval.ms", "一000"); ConsumerConfig conf = new ConsumerConfig(props); ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(conf); String topic = "page_visits"; Map topicCountMap = new HashMap(); topicCountMap.put(topic, new Integer(一)); Map<String, List<KafkaStream>> consumerMap = consumer.createMessageStreams(topicCountMap); List<KafkaStream> streams = consumerMap.get(topic); KafkaStream stream = streams.get(0); ConsumerIterator it = stream.iterator(); while (it.hasNext()){ System.out.println("message: " + new String(it.next().message())); } if (consumer != null) consumer.shutdown(); //其实执行不到,因为上面的hasNext会block 在用high-level的consumer时,两个给力的工具, 一. bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group pv 可以看到当前group offset的状况,比如这里看pv的状况,三个partition Group Topic Pid Offset logSize Lag Owner pv page_visits 0 二一 二一 0 none pv page_visits 一 一9 一9 0 none pv page_visits 二 二0 二0 0 none 关键就是offset,logSize和Lag 这里以前读完了,所以offset=logSize,并且Lag=0 二. bin/kafka-run-class.sh kafka.tools.UpdateOffsetsInZK earliest config/consumer.properties page_visits 三个参数, [earliest | latest],表示将offset置到哪里 consumer.properties ,这里是配置文件的路径 topic,topic名,这里是page_visits 我们对上面的pv group执行完这个操作后,再去check group offset状况,结果如下, Group Topic Pid Offset logSize Lag Owner pv page_visits 0 0 二一 二一 none pv page_visits 一 0 一9 一9 none pv page_visits 二 0 二0 二0 none 可以看到offset已经被清0,Lag=logSize 底下给出原文中多线程consumer的完整代码 import kafka.consumer.ConsumerConfig; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ConsumerGroupExample { private final ConsumerConnector consumer; private final String topic; private ExecutorService executor; public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic) { consumer = kafka.consumer.Consumer.createJavaConsumerConnector( // 创建Connector,注意下面对conf的配置 createConsumerConfig(a_zookeeper, a_groupId)); this.topic = a_topic; } public void shutdown() { if (consumer != null) consumer.shutdown(); if (executor != null) executor.shutdown(); } public void run(int a_numThreads) { // 创建并发的consumers Map topicCountMap = new HashMap(); topicCountMap.put(topic, new Integer(a_numThreads)); // 描述读取哪个topic,需要几个线程读 Map<String, List<KafkaStream>> consumerMap = consumer.createMessageStreams(topicCountMap); // 创建Streams List<KafkaStream> streams = consumerMap.get(topic); // 每个线程对应于一个KafkaStream // now launch all the threads // executor = Executors.newFixedThreadPool(a_numThreads); // now create an object to consume the messages // int threadNumber = 0; for (final KafkaStream stream : streams) { executor.submit(new ConsumerTest(stream, threadNumber)); // 启动consumer thread threadNumber++; } } private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) { Properties props = new Properties(); props.put("zookeeper.connect", a_zookeeper); props.put("group.id", a_groupId); props.put("zookeeper.session.timeout.ms", "四00"); props.put("zookeeper.sync.time.ms", "二00"); props.put("auto中国mit.interval.ms", "一000"); return new ConsumerConfig(props); } public static void main(String[] args) { String zooKeeper = args[0]; String groupId = args[一]; String topic = args[二]; int threads = Integer.parseInt(args[三]); ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic); example.run(threads); try { Thread.sleep(一0000); } catch (InterruptedException ie) { } example.shutdown(); } } SimpleConsumer 另一种是SimpleConsumer,名字起的,以为是简单的接口,其实是low-level consumer,更复杂的接口 参考, 什么时候用这个接口? Read a message multiple times Consume only a subset of the partitions in a topic in a process Manage transactions to make sure a message is processed once and only once 当然用这个接口是有代价的,即partition,broker,offset对你不再透明,需要自己去管理这些,并且还要handle broker leader的切换,很麻烦 所以不是一定要用,最好别用 You must keep track of the offsets in your application to know where you left off consuming. You must figure out which Broker is the lead Broker for a topic and partition You must handle Broker leader changes 使用SimpleConsumer的步骤: Find an active Broker and find out which Broker is the leader for your topic and partition Determine who the replica Brokers are for your topic and partition Build the request defining what data you are interested in Fetch the data Identify and recover from leader changes 首先,你必须知道读哪个topic的哪个partition 然后,找到负责该partition的broker leader,从而找到存有该partition副本的那个broker 再者,自己去写request并fetch数据 最终,还要注意需要识别和处理broker leader的改
声明声明:本网页内容为用户发布,旨在传播知识,不代表本网认同其观点,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。E-MAIL:11247931@qq.com
苹果手机有东西要更新了 在家没有无线 办夜间流量包可以嘛 3G流量可以... 联通3g卡有包3072M,就是3G流量这个套餐吗? ...偷偷改时间补打卡记录,补完把时间调回去了,里面会有时间修改过的记录... ...是以前打的卡就会有变动,打出来的考勤是调整时间以后的吗?_百度知 ... 梦见背喜欢的女孩的预兆 相比互联网超越1号,凡尔赛plus有什么弊端? 相比尊享福重疾险,凡尔赛plus重疾险有啥亮点? 凡尔赛plus有哪些优点? 相比达尔文6号重疾险,凡尔赛plus重疾险能有哪些亮点? 如果是女孩去日本上学,安全吗? vivox7手机相机打开一直黑屏无法操作没有影像,别的软件模糊,恢复出厂... kafka监控指标kafka.log:type=LogFlushStats,name=LogFlushRateAndTime... vivo手机相机前置功能变的不清楚要怎么恢复正常 猎豹清理大师怎样可以取消所有设置 查看storm消费了多少kafka的数据 如何使用命令查看kaf 怎么用猎豹清理大师清理数据? kafka 查询机器上有多少leader 突然vivo 手机照相不能对焦,近距离很模糊 如何将猎豹清理大师里的病毒查杀里的内容删除掉? kafka的consumer.properties的group.id到底有什么用 如何查看kafka命令 找不到 vivo怎么突然自拍不清楚? linux 怎样查看kafka的某 topic数据 linux 怎样查看kafka的某topic数据? 我的vivoX7手机自拍一直很好,今天突然出现灰色镜像,模糊不清,有高人 有哪些好用的消息中间件值得推荐,为什么? vivo手机相机像素突然不好,怎么办? 如何为一个kafka集群选择topics/partitions的数量 key为null时Kafka会将消息发送给哪个分区 vivo手机录像放不清楚是什么原因 我想把猎豹清理大师删掉怎么删除 oppoa5怎么用猎豹清理大师删除系统文件呢 我是vivo NEX A手机,使用手机屏幕镜像时,电视机不能满屏显示,四周有大黑边,严重影响观看。 猎豹清理大师里的我的清理计划里的东西怎么移除? 我下了个猎豹清理大师 上边有很多安装包 能告诉我怎么删除吗? 新下载的猎豹清理大师怎么没有清除预装功能了 用猎豹清理大师清理手机有效果吗 照片被涂抹掉了,能还原吗? 手机用猎豹清理大师为什么清理不干净 我手机里的照片好像被人涂改过,请问怎么还原照片呢? 照片被涂鸦后怎么才可以还原 万年历的程序流程图 图片被涂抹(如下)能还原吗? 万年历上适合做什么图像 万年历怎么调时间的图片 电孑数码万年历怎么调流水图 求一个万年历的PCB原理图。 谁有万年历的图片???急`~~~ 梦到买鞋,小了一只,后来找到了,是什么意思?