千家信息网

如何使用Kafka的High Level Consumer

发表于:2025-01-24 作者:千家信息网编辑
千家信息网最后更新 2025年01月24日,本篇文章给大家分享的是有关如何使用Kafka的High Level Consumer,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。为什么
千家信息网最后更新 2025年01月24日如何使用Kafka的High Level Consumer

本篇文章给大家分享的是有关如何使用Kafka的High Level Consumer,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。

为什么使用High Level Consumer

  • 在某些应用场景,我们希望通过多线程读取消息,而我们并不关心从Kafka消费消息的顺序,我们仅仅关心数据能被消费就行。High Level 就是用于抽象这类消费动作的。

  • 消息消费已Consumer Group为单位,每个Consumer Group中可以有多个consumer,每个consumer是一个线程,topic的每个partition同时只能被某一个consumer读取,Consumer Group对应的每个partition都有一个最新的offset的值,存储在zookeeper上的。所以不会出现重复消费的情况。

设计High Level Consumer High Level Consumer 可以并且应该被使用在多线程的环境,线程模型中线程的数量(也代表group中consumer的数量)和topic的partition数量有关,下面列举一些规则:

  1. 当提供的线程数量多于partition的数量,则部分线程将不会接收到消息;

  2. 当提供的线程数量少于partition的数量,则部分线程将从多个partition接收消息;

  3. 当某个线程从多个partition接收消息时,不保证接收消息的顺序;可能出现从partition3接收5条消息,从partition4接收6条消息,接着又从partition3接收10条消息;

  4. 当添加更多线程时,会引起kafka做re-balance, 可能改变partition和线程的对应关系。

代码示例 ConsumerGroupExample

package com.test.groups;import kafka.consumer.ConsumerConfig;import kafka.consumer.KafkaStream;import kafka.javaapi.consumer.ConsumerConnector;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.Properties;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class ConsumerGroupExample {        private final ConsumerConnector consumer;        private final String topic;        private  ExecutorService executor;        public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic) {                consumer = kafka.consumer.Consumer.createJavaConsumerConnector(                                createConsumerConfig(a_zookeeper, a_groupId));                this.topic = a_topic;        }        public void shutdown() {                if (consumer != null) consumer.shutdown();                if (executor != null) executor.shutdown();        }        public void run(int a_numThreads) {                Map topicCountMap = new HashMap();                topicCountMap.put(topic, new Integer(a_numThreads));                Map>> consumerMap = consumer.createMessageStreams(topicCountMap);                List> streams = consumerMap.get(topic);                // now launch all the threads                //                executor = Executors.newFixedThreadPool(a_numThreads);                // now create an object to consume the messages                //                int threadNumber = 0;                for (final KafkaStream stream : streams) {                        executor.submit(new ConsumerTest(stream, threadNumber));                        threadNumber++;                }        }        private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {                Properties props = new Properties();                props.put("zookeeper.connect", a_zookeeper);                props.put("group.id", a_groupId);                props.put("zookeeper.session.timeout.ms", "400");                props.put("zookeeper.sync.time.ms", "200");                props.put("auto.commit.interval.ms", "1000");                return new ConsumerConfig(props);        }        public static void main(String[] args) {                String zooKeeper = args[0];                String groupId = args[1];                String topic = args[2];                int threads = Integer.parseInt(args[3]);                ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic);                example.run(threads);                try {                        Thread.sleep(10000);                } catch (InterruptedException ie) {                }                example.shutdown();        }}

ConsumerTest

package com.test.groups;import kafka.consumer.ConsumerIterator;import kafka.consumer.KafkaStream;public class ConsumerTest implements Runnable {        private KafkaStream m_stream;        private int m_threadNumber;        public ConsumerTest(KafkaStream a_stream, int a_threadNumber) {                m_threadNumber = a_threadNumber;                m_stream = a_stream;        }        public void run() {                ConsumerIterator it = m_stream.iterator();                while (it.hasNext())                        System.out.println("Thread " + m_threadNumber + ": " + new String(it.next().message()));                System.out.println("Shutting down Thread: " + m_threadNumber);        }}

以上就是如何使用Kafka的High Level Consumer,小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注行业资讯频道。

0