请教kafka 启用多台consumer 问题
发布网友
发布时间:2022-04-24 01:10
我来回答
共1个回答
热心网友
时间:2023-10-17 12:49
public static void consumer(){
Properties props = new Properties();
props.put("zk.connect", "hadoop-2:2181");
props.put("zk.connectiontimeout.ms", "1000000");
props.put("groupid", "fans_group");
// Create the connection to the cluster
ConsumerConfig consumerConfig = new ConsumerConfig(props);
ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
Map<String, Integer> map = new HashMap<String, Integer>();
map.put("fans", 1);
// create 4 partitions of the stream for topic “test”, to allow 4 threads to consume
Map<String, List<KafkaStream<Message>>> topicMessageStreams = consumerConnector.createMessageStreams(map);
List<KafkaStream<Message>> streams = topicMessageStreams.get("fans");
// create list of 4 threads to consume from each of the partitions
ExecutorService executor = Executors.newFixedThreadPool(1);
long startTime = System.currentTimeMillis();
// consume the messages in the threads
for(final KafkaStream<Message> stream: streams) {
executor.submit(new Runnable() {
public void run() {
ConsumerIterator<Message> it = stream.iterator();
while (it.hasNext()){
log.debug(byteBufferToString(it.next().message().payload()));
}
}
});
log.debug("use time="+(System.currentTimeMillis()-startTime));
}
}
热心网友
时间:2023-10-17 12:49
public static void consumer(){
Properties props = new Properties();
props.put("zk.connect", "hadoop-2:2181");
props.put("zk.connectiontimeout.ms", "1000000");
props.put("groupid", "fans_group");
// Create the connection to the cluster
ConsumerConfig consumerConfig = new ConsumerConfig(props);
ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
Map<String, Integer> map = new HashMap<String, Integer>();
map.put("fans", 1);
// create 4 partitions of the stream for topic “test”, to allow 4 threads to consume
Map<String, List<KafkaStream<Message>>> topicMessageStreams = consumerConnector.createMessageStreams(map);
List<KafkaStream<Message>> streams = topicMessageStreams.get("fans");
// create list of 4 threads to consume from each of the partitions
ExecutorService executor = Executors.newFixedThreadPool(1);
long startTime = System.currentTimeMillis();
// consume the messages in the threads
for(final KafkaStream<Message> stream: streams) {
executor.submit(new Runnable() {
public void run() {
ConsumerIterator<Message> it = stream.iterator();
while (it.hasNext()){
log.debug(byteBufferToString(it.next().message().payload()));
}
}
});
log.debug("use time="+(System.currentTimeMillis()-startTime));
}
}