千家信息网

如何理解kafka java代码的使用Producer和Consumer

发表于:2025-01-24 作者:千家信息网编辑
千家信息网最后更新 2025年01月24日,这篇文章给大家介绍如何理解kafka java代码的使用Producer和Consumer,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。用java代码对kafka消息进行消费与发
千家信息网最后更新 2025年01月24日如何理解kafka java代码的使用Producer和Consumer

这篇文章给大家介绍如何理解kafka java代码的使用Producer和Consumer,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。

用java代码对kafka消息进行消费与发送,首先我们得引入相关jar包

maven:

    org.apache.kafka    kafka_2.10    0.8.2.1

gradle:

compile("org.apache.kafka:kafka_2.10:0.8.2.1")

在新版本的kafka中(具体版本记不清楚了),添加了java代码实现的producer,consumer目前还是Scala的,之前的producer和consumer均是Scala编写的,在这里则介绍java版本的producer。

另一点需要特别注意:

当发送消息时我们不指定key时,producer将消息分发到各partition的机制是:

Scala版本的producer:在你的producer启动的时候,随机获得一个partition,然后后面的消息都会发送到这个partition,也就是说,只要程序启动了,这个producer都会往同一个partition里发送消息

java版本的producer:会轮询每个partition,所以发送的会比较平均

所以当使用Scala版本的producer时,尽量传入key,保证消息在partition的平均性

下面是具体的代码:

import java.io.Serializable;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.Properties;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;import kafka.consumer.Consumer;import kafka.consumer.ConsumerConfig;import kafka.consumer.ConsumerIterator;import kafka.consumer.KafkaStream;import kafka.javaapi.consumer.ConsumerConnector;import org.apache.commons.lang.SerializationUtils;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.clients.producer.RecordMetadata;import cn.qlt.study.domain.User;public class KafkaUtil {                private static KafkaProducer producer=null;        private static ConsumerConnector consumer=null;                static{                //生产者配置文件,具体配置可参考ProducerConfig类源码,或者参考官网介绍                Map config=new HashMap();                //kafka服务器地址                config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.100.90:9092,192.168.100.91:9092");                //kafka消息序列化类 即将传入对象序列化为字节数组                config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");                //kafka消息key序列化类 若传入key的值,则根据该key的值进行hash散列计算出在哪个partition上                config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");                config.put(ProducerConfig.BATCH_SIZE_CONFIG, 1024*1024*5);                //往kafka服务器提交消息间隔时间,0则立即提交不等待                config.put(ProducerConfig.LINGER_MS_CONFIG,0);                //消费者配置文件                Properties props = new Properties();                //zookeeper地址            props.put("zookeeper.connect", "192.168.100.90:2181");            //组id            props.put("group.id", "123");            //自动提交消费情况间隔时间            props.put("auto.commit.interval.ms", "1000");                        ConsumerConfig consumerConfig=new ConsumerConfig(props);                producer=new KafkaProducer(config);                consumer=Consumer.createJavaConsumerConnector(consumerConfig);        }        /**         *启动一个消费程序         * @param topic 要消费的topic名称        * @param handler 自己的处理逻辑的实现        * @param threadCount 消费线程数,该值应小于等于partition个数,多了也没用         */        public static void startConsumer(String topic,final MqMessageHandler handler,int threadCount) throws Exception{                if(threadCount<1)                        throw new Exception("处理消息线程数最少为1");           //设置处理消息线程数,线程数应小于等于partition数量,若线程数大于partition数量,则多余的线程则闲置,不会进行工作           //key:topic名称 value:线程数           Map topicCountMap = new HashMap();           topicCountMap.put(topic, new Integer(threadCount));           Map>> consumerMap = consumer.createMessageStreams(topicCountMap);           //声明一个线程池,用于消费各个partition           ExecutorService executor=Executors.newFixedThreadPool(threadCount);           //获取对应topic的消息队列           List> streams = consumerMap.get(topic);           //为每一个partition分配一个线程去消费           for (final KafkaStream stream : streams) {                   executor.execute(new Runnable() {                        @Override                        public void run() {                             ConsumerIterator it = stream.iterator();                             //有信息则消费,无信息将会阻塞                             while (it.hasNext()){                                T message=null;                                        try {                                                //将字节码反序列化成相应的对象                                                byte[] bytes=it.next().message();                                                message = (T) SerializationUtils.deserialize(bytes);                                        } catch (Exception e) {                                                e.printStackTrace();                                                return;                                        }                                    //调用自己的业务逻辑                                    try {                                                handler.handle(message);                                        } catch (Exception e) {                                                e.printStackTrace();                                        }                             }                        }                });       }        }        /**         *发送消息,发送的对象必须是可序列化的          */        public static Future send(String topic,Serializable value) throws Exception{                try {                        //将对象序列化称字节码                        byte[] bytes=SerializationUtils.serialize(value);                        Future future=producer.send(new ProducerRecord(topic,bytes));                        return future;                }catch(Exception e){                        throw e;                }        }                //内部抽象类 用于实现自己的处理逻辑        public static abstract class MqMessageHandler{                public abstract void handle(T message);        }                        public static void main(String[] args) throws Exception {                //发送一个信息                send("test",new User("id","userName", "password"));                //为test启动一个消费者,启动后每次有消息则打印对象信息                KafkaUtil.startConsumer("test", new MqMessageHandler() {                        @Override                        public void handle(User user) {                                //实现自己的处理逻辑,这里只打印出消息                                System.out.println(user.toString());                        }                },2);        }}

相关配置解释:

producer:

1、producer的配置不需要zookeeper地址,会直接获取kafka的元数据,直接和broker进行通信

2、ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG即value.serializer,kafka生产者与broker之间数据是以byte进行传递的,所以这个参数的意思是把我们传入对象转换成byte[]的类,一般使用org.apache.kafka.common.serialization.ByteArraySerializer即可,我们自己把对象序列化为byte[]

3、ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG即key.serializer,首先说明下key值是干什么的,若我们指定了key的值,生产者则会根据该key进行hash散列计算出具体的partition。若不指定,则随机选择partition。一般情况下我们没必要指定该值。这个类与上面功能一样,即将key转换成byte[]

4、ProducerConfig.LINGER_MS_CONFIG即linger.ms,为了减少请求次数提高吞吐率,这个参数为每次提交间隔的次数,若设置了该值,如1000,则意味着我们的消息可能不会马上提交到kafka服务器,需要等上1秒中,才会进行批量提交。我们可以适当的配置该值。0为不等待立刻提交。

consumer:

1、zookeeper.connect:zookeeper的地址,多个之间用,分割

2、group.id:这个值可以随便写,但建议写点有意义的值,别随便写个123。kafka保证同一个组内的消息只会被消费一次,若需要重复消费消息,则可以配置不同的groupid。

3、auto.commit.interval.ms:consumer自己会记录消费的偏移量,并定时往zookeeper上提交,该值即为提交时间间隔,若该值设置太大可能会出现重复消费的情况,如我们停止了某个consumer,但该consumer还未往zookeeper提交某段时间的消费记录,这导致我们下次启动该消费者的时候,它会从上次提交的偏移量进行消费,这就导致了某些数据的重复消费。

注意:在杀死consumer进程后,应等一会儿再去重启,因为杀死consumer进程时,会删除zookeeper的一些临时节点,若我们马上重启的话,可能会在启动的时候那些节点还没删除掉,出现写不必要的错误。

关于如何理解kafka java代码的使用Producer和Consumer就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。

0