千家信息网

如何解析Kafka 1.0.0 多消费者示例

发表于:2025-02-07 作者:千家信息网编辑
千家信息网最后更新 2025年02月07日,如何解析Kafka 1.0.0 多消费者示例,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。package kafka.demo;impo
千家信息网最后更新 2025年02月07日如何解析Kafka 1.0.0 多消费者示例

如何解析Kafka 1.0.0 多消费者示例,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。

package kafka.demo;import java.util.HashMap;import java.util.Map;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerRecord;/** *  *  

Description: kafka 1.0.0

* @author guangshihao * @date 2018年9月19日 * */public class KafkaProduderDemo { public static void main(String[] args) { Map props = new HashMap<>(); /* * acks,设置发送数据是否需要服务端的反馈,有三个值0,1,-1 * 0,意味着producer永远不会等待一个来自broker的ack,这就是0.7版本的行为。 * 这个选项提供了最低的延迟,但是持久化的保证是最弱的,当server挂掉的时候会丢失一些数据。 * 1,意味着在leader replica已经接收到数据后,producer会得到一个ack。 * 这个选项提供了更好的持久性,因为在server确认请求成功处理后,client才会返回。 * 如果刚写到leader上,还没来得及复制leader就挂了,那么消息才可能会丢失。 * -1,意味着在所有的ISR都接收到数据后,producer才得到一个ack。 * 这个选项提供了最好的持久性,只要还有一个replica存活,那么数据就不会丢失 */ props.put("acks", "1"); //配置默认的分区方式 props.put("partitioner.class", "org.apache.kafka.clients.producer.internals.DefaultPartitioner"); //配置topic的序列化类 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //配置value的序列化类 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); /* * kafka broker对应的主机,格式为host1:port1,host2:port2 */ props.put("bootstrap.servers", "bigdata01:9092,bigdata02:9092,bigdata03:9092"); //topic String topic = "test7"; KafkaProducer< String, String> producer = new KafkaProducer< String, String>(props); for(int i = 1 ;i <= 100 ; i++) { String line = i+" this is a test "; ProducerRecord record = new ProducerRecord(topic,line ); producer.send(record); } producer.close(); }}
package kafka.demo;import java.util.List;import java.util.concurrent.atomic.AtomicBoolean;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.common.errors.WakeupException;public class MutilConsumerThread implements Runnable{        private AtomicBoolean closed = new AtomicBoolean(false);        KafkaConsumer consumer = null;        String topic = null;        public MutilConsumerThread(KafkaConsumer consumer,List topic) {                this.consumer=consumer;                consumer.subscribe(topic);        }        public void run() {                try{                        while(!closed.get()) {                                ConsumerRecords records = consumer.poll(1000);                                for(ConsumerRecord record: records) {                                        //一组consumer的时候每个partition对应的线程是固定的                                        System.out.println("Thread-Name:"+Thread.currentThread().getName()+"  "+"partition:"+record.partition()+"  "+record.value());                                }                        }                                        }catch(WakeupException e ) {                        if(!closed.get()) throw e;                }finally {                        consumer.close();                }        }                public void shutdown() {                closed.set(true);                consumer.wakeup();        }}
package kafka.demo;import java.util.ArrayList;import java.util.Arrays;import java.util.List;import java.util.Properties;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import org.apache.kafka.clients.consumer.KafkaConsumer;public class MutiConsumerTest {        public static void main(String[] args) throws InterruptedException {                Properties props = new Properties();                props.put("bootstrap.servers", "bigdata01:9092,bigdata02:9092,bigdata03:9092");                props.put("group.id", "group_test7");                //配置topic的序列化类                props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");                //配置value的序列化类                props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");                //自动同步offset        props.put("enable.auto.commit","true");        //自动同步offset的时间间隔        props.put("auto.commit.intervals.ms", "2000");        //当在zookeeper中发现要消费的topic没有或者topic的offset不合法时自动设置为最小值,可以设的值为 latest, earliest, none,默认为largest        props.put("auto.offset.reset", "earliest ");        String topic = "test7";        List consumers = new ArrayList<>();        ExecutorService es = Executors.newFixedThreadPool(3);        for(int i = 0 ;i<=2;i++) {                KafkaConsumer consumer = new KafkaConsumer(props);                MutilConsumerThread cThread = new MutilConsumerThread(consumer,Arrays.asList(topic));                consumers.add(cThread);                es.submit(cThread);        }                //Thread.sleep(1000L);        /* 这个方法的意思就是在JVM中增加一个关闭的钩子,当JVM关闭的时候,              会执行系统中已经设置的所有通过方法addShutdownHook添加的钩子,当系统执行完这些钩子后,          JVM才会关闭。所以这些钩子可以在JVM关闭的时候进行内存清理、对象销毁等操作。*/        Runtime.getRuntime().addShutdownHook(new Thread() {                        @Override                        public void run() {                                for(MutilConsumerThread consumer :consumers ) {                                        consumer.shutdown();                                }                        }                        });                }}

看完上述内容,你们掌握如何解析Kafka 1.0.0 多消费者示例的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注行业资讯频道,感谢各位的阅读!

0