问答文章1 问答文章501 问答文章1001 问答文章1501 问答文章2001 问答文章2501 问答文章3001 问答文章3501 问答文章4001 问答文章4501 问答文章5001 问答文章5501 问答文章6001 问答文章6501 问答文章7001 问答文章7501 问答文章8001 问答文章8501 问答文章9001 问答文章9501

kafka的consumer.properties的group.id到底有什么用

发布网友 发布时间:2022-04-24 04:30

我来回答

3个回答

热心网友 时间:2022-04-14 06:01

kafka的consumer.properties的group.id到底有什么用,

在kafka分布式集群部署时,消费者的group.id,是否需要和consumer.properties配置的group.id一致。

我两个不同的topic,分别使用两个consumer消费。

其中一个consumer必须设置group.id和consumer.properties配置的group.id一致,才能消费消息。

另一个consumer必须设置group.id和consumer.properties配置的group.id不一致,才能消费消息。

《Apache Kafka源码剖析》以Kafka 0.10.0版本源码为基础,针对Kafka的架构设计到实现细节进行详细阐述。《Apache Kafka源码剖析》共5章,从Kafka的应用场景、源码环境搭建开始逐步深入,不仅介绍Kafka的核心概念,而且对Kafka生产者、消费者、服务端的源码进行深入的剖析,最后介绍Kafka常用的管理脚本实现,让读者不仅从宏观设计上了解Kafka,而且能够深入到Kafka的细节设计之中。

在源码分析的过程中,还穿插了笔者工作积累的经验和对Kafka设计的理解,希望读者可以举一反三,不仅知其然,而且知其所以然。

热心网友 时间:2022-04-14 07:19

要注意些注意事项于partitionconsumer
1. consumer比partition浪费kafka设计partition允许并发所consumer数要于partition数
2. consumer比partition少consumer应于partitions主要合理配consumer数partition数否则导致partition面数据取均匀
partiton数目consumer数目整数倍所partition数目重要比取24容易设定consumer数目
3. consumerpartition读数据保证数据间顺序性kafka保证partition数据序partition根据读顺序同
4. 增减consumerbrokerpartition导致rebalance所rebalanceconsumer应partition发变化
5. High-level接口获取数据候block
简单版
简单坑测试流程先proce些数据再用consumer读记加第句设置
初始offset默认非设置意思offset非何修offset默认largest即新所加配置读前proce数据且候再加smallest配置没用offset合再修需要手工或用工具改重置offset

Properties props = new Properties();
props.put("auto.offset.reset", "smallest"); //必须要加要读旧数据
props.put("zookeeper.connect", "localhost:2181");
props.put("group.id", "pv");
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");

ConsumerConfig conf = new ConsumerConfig(props);
ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(conf);
String topic = "page_visits";
Map topicCountMap = new HashMap();
topicCountMap.put(topic, new Integer(1));
Map<String, List<KafkaStream>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream> streams = consumerMap.get(topic);

KafkaStream stream = streams.get(0);
ConsumerIterator it = stream.iterator();
while (it.hasNext()){
System.out.println("message: " + new String(it.next().message()));
}

if (consumer != null) consumer.shutdown(); //其实执行面hasNextblock

用high-levelconsumer两给力工具
1. bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group pv
看前group offset状况比看pv状况3partition
Group Topic Pid Offset logSize Lag Owner
pv page_visits 0 21 21 0 none
pv page_visits 1 19 19 0 none
pv page_visits 2 20 20 0 none
关键offsetlogSizeLag
前读完所offset=logSize并且Lag=0
2. bin/kafka-run-class.sh kafka.tools.UpdateOffsetsInZK earliest config/consumer.properties page_visits
3参数
[earliest | latest]表示offset置哪
consumer.properties 配置文件路径
topictopic名page_visits
我面pv group执行完操作再check group offset状况结
Group Topic Pid Offset logSize Lag Owner
pv page_visits 0 0 21 21 none
pv page_visits 1 0 19 19 none
pv page_visits 2 0 20 20 none
看offset已经清0Lag=logSize

底给原文线程consumer完整代码

import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ConsumerGroupExample {
private final ConsumerConnector consumer;
private final String topic;
private ExecutorService executor;

public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic) {
consumer = kafka.consumer.Consumer.createJavaConsumerConnector( // 创建Connector注意面conf配置
createConsumerConfig(a_zookeeper, a_groupId));
this.topic = a_topic;
}

public void shutdown() {
if (consumer != null) consumer.shutdown();
if (executor != null) executor.shutdown();
}

public void run(int a_numThreads) { // 创建并发consumers
Map topicCountMap = new HashMap();
topicCountMap.put(topic, new Integer(a_numThreads)); // 描述读取哪topic需要几线程读
Map<String, List<KafkaStream>> consumerMap = consumer.createMessageStreams(topicCountMap); // 创建Streams
List<KafkaStream> streams = consumerMap.get(topic); // 每线程应于KafkaStream

// now launch all the threads
//
executor = Executors.newFixedThreadPool(a_numThreads);

// now create an object to consume the messages
//
int threadNumber = 0;
for (final KafkaStream stream : streams) {
executor.submit(new ConsumerTest(stream, threadNumber)); // 启consumer thread
threadNumber++;
}
}

private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
Properties props = new Properties();
props.put("zookeeper.connect", a_zookeeper);
props.put("group.id", a_groupId);
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");

return new ConsumerConfig(props);
}

public static void main(String[] args) {
String zooKeeper = args[0];
String groupId = args[1];
String topic = args[2];
int threads = Integer.parseInt(args[3]);

ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic);
example.run(threads);

try {
Thread.sleep(10000);
} catch (InterruptedException ie) {

}
example.shutdown();
}
}

SimpleConsumer
另种SimpleConsumer名字起简单接口其实low-level consumer更复杂接口
参考
候用接口?
Read a message multiple times
Consume only a subset of the partitions in a topic in a process
Manage transactions to make sure a message is processed once and only once

用接口代价即partition,broker,offset再透明需要自管理些并且要handle broker leader切换麻烦
所定要用别用
You must keep track of the offsets in your application to know where you left off consuming.
You must figure out which Broker is the lead Broker for a topic and partition
You must handle Broker leader changes
使用SimpleConsumer步骤:
Find an active Broker and find out which Broker is the leader for your topic and partition
Determine who the replica Brokers are for your topic and partition
Build the request defining what data you are interested in
Fetch the data
Identify and recover from leader changes
首先必须知道读哪topic哪partition
找负责该partitionbroker leader找存该partition副本broker
再者自写request并fetch数据
终要注意需要识别处理broker leader改变

热心网友 时间:2022-04-14 08:53

刚开始学习这一块的内容。请指教
声明声明:本网页内容为用户发布,旨在传播知识,不代表本网认同其观点,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。E-MAIL:11247931@qq.com
苹果手机有东西要更新了 在家没有无线 办夜间流量包可以嘛 3G流量可以... 联通3g卡有包3072M,就是3G流量这个套餐吗? ...偷偷改时间补打卡记录,补完把时间调回去了,里面会有时间修改过的记录... ...是以前打的卡就会有变动,打出来的考勤是调整时间以后的吗?_百度知 ... 梦见背喜欢的女孩的预兆 相比互联网超越1号,凡尔赛plus有什么弊端? 相比尊享福重疾险,凡尔赛plus重疾险有啥亮点? 凡尔赛plus有哪些优点? 相比达尔文6号重疾险,凡尔赛plus重疾险能有哪些亮点? 如果是女孩去日本上学,安全吗? 如何查看kafka命令 找不到 vivo怎么突然自拍不清楚? linux 怎样查看kafka的某 topic数据 linux 怎样查看kafka的某topic数据? 我的vivoX7手机自拍一直很好,今天突然出现灰色镜像,模糊不清,有高人 有哪些好用的消息中间件值得推荐,为什么? vivo手机相机像素突然不好,怎么办? 如何为一个kafka集群选择topics/partitions的数量 key为null时Kafka会将消息发送给哪个分区 求助,关于kafka发送数据丢失的问题 如何为Kafka集群选择合适的Partitions数量 cad2012突然意外关闭怎么恢复图形? 怎样恢复CAD意外关闭的图纸 CAD总是意外关闭,应该怎么解决? cad软件突然关闭,如何从备份文件中恢复图形? 黑米,小米,糯米粥里能放天麻吗? 黑米有什么功效?可以和什么一起煮? 微信客户端在哪里可下载 黑米可以和黑糯米一起煮粥吗 微信客户端怎么下载安装 如何将猎豹清理大师里的病毒查杀里的内容删除掉? 突然vivo 手机照相不能对焦,近距离很模糊 kafka 查询机器上有多少leader 怎么用猎豹清理大师清理数据? 如何使用命令查看kaf 查看storm消费了多少kafka的数据 猎豹清理大师怎样可以取消所有设置 vivo手机相机前置功能变的不清楚要怎么恢复正常 kafka监控指标kafka.log:type=LogFlushStats,name=LogFlushRateAndTime... vivox7手机相机打开一直黑屏无法操作没有影像,别的软件模糊,恢复出厂... kafka-console-consumer为什么没有记录 vivo手机录像放不清楚是什么原因 我想把猎豹清理大师删掉怎么删除 oppoa5怎么用猎豹清理大师删除系统文件呢 我是vivo NEX A手机,使用手机屏幕镜像时,电视机不能满屏显示,四周有大黑边,严重影响观看。 猎豹清理大师里的我的清理计划里的东西怎么移除? 我下了个猎豹清理大师 上边有很多安装包 能告诉我怎么删除吗? 新下载的猎豹清理大师怎么没有清除预装功能了 用猎豹清理大师清理手机有效果吗 照片被涂抹掉了,能还原吗?