如何解析Kafka 1.0.0 多消费者示例
发表于:2025-02-07 作者:千家信息网编辑
千家信息网最后更新 2025年02月07日,如何解析Kafka 1.0.0 多消费者示例,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。package kafka.demo;impo
千家信息网最后更新 2025年02月07日如何解析Kafka 1.0.0 多消费者示例
如何解析Kafka 1.0.0 多消费者示例,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。
package kafka.demo;import java.util.HashMap;import java.util.Map;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerRecord;/** * *Description: kafka 1.0.0
* @author guangshihao * @date 2018年9月19日 * */public class KafkaProduderDemo { public static void main(String[] args) { Mapprops = new HashMap<>(); /* * acks,设置发送数据是否需要服务端的反馈,有三个值0,1,-1 * 0,意味着producer永远不会等待一个来自broker的ack,这就是0.7版本的行为。 * 这个选项提供了最低的延迟,但是持久化的保证是最弱的,当server挂掉的时候会丢失一些数据。 * 1,意味着在leader replica已经接收到数据后,producer会得到一个ack。 * 这个选项提供了更好的持久性,因为在server确认请求成功处理后,client才会返回。 * 如果刚写到leader上,还没来得及复制leader就挂了,那么消息才可能会丢失。 * -1,意味着在所有的ISR都接收到数据后,producer才得到一个ack。 * 这个选项提供了最好的持久性,只要还有一个replica存活,那么数据就不会丢失 */ props.put("acks", "1"); //配置默认的分区方式 props.put("partitioner.class", "org.apache.kafka.clients.producer.internals.DefaultPartitioner"); //配置topic的序列化类 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //配置value的序列化类 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); /* * kafka broker对应的主机,格式为host1:port1,host2:port2 */ props.put("bootstrap.servers", "bigdata01:9092,bigdata02:9092,bigdata03:9092"); //topic String topic = "test7"; KafkaProducer< String, String> producer = new KafkaProducer< String, String>(props); for(int i = 1 ;i <= 100 ; i++) { String line = i+" this is a test "; ProducerRecord record = new ProducerRecord (topic,line ); producer.send(record); } producer.close(); }}
package kafka.demo;import java.util.List;import java.util.concurrent.atomic.AtomicBoolean;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.common.errors.WakeupException;public class MutilConsumerThread implements Runnable{ private AtomicBoolean closed = new AtomicBoolean(false); KafkaConsumerconsumer = null; String topic = null; public MutilConsumerThread(KafkaConsumer consumer,List topic) { this.consumer=consumer; consumer.subscribe(topic); } public void run() { try{ while(!closed.get()) { ConsumerRecords records = consumer.poll(1000); for(ConsumerRecord record: records) { //一组consumer的时候每个partition对应的线程是固定的 System.out.println("Thread-Name:"+Thread.currentThread().getName()+" "+"partition:"+record.partition()+" "+record.value()); } } }catch(WakeupException e ) { if(!closed.get()) throw e; }finally { consumer.close(); } } public void shutdown() { closed.set(true); consumer.wakeup(); }}
package kafka.demo;import java.util.ArrayList;import java.util.Arrays;import java.util.List;import java.util.Properties;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import org.apache.kafka.clients.consumer.KafkaConsumer;public class MutiConsumerTest { public static void main(String[] args) throws InterruptedException { Properties props = new Properties(); props.put("bootstrap.servers", "bigdata01:9092,bigdata02:9092,bigdata03:9092"); props.put("group.id", "group_test7"); //配置topic的序列化类 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //配置value的序列化类 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //自动同步offset props.put("enable.auto.commit","true"); //自动同步offset的时间间隔 props.put("auto.commit.intervals.ms", "2000"); //当在zookeeper中发现要消费的topic没有或者topic的offset不合法时自动设置为最小值,可以设的值为 latest, earliest, none,默认为largest props.put("auto.offset.reset", "earliest "); String topic = "test7"; Listconsumers = new ArrayList<>(); ExecutorService es = Executors.newFixedThreadPool(3); for(int i = 0 ;i<=2;i++) { KafkaConsumer consumer = new KafkaConsumer (props); MutilConsumerThread cThread = new MutilConsumerThread(consumer,Arrays.asList(topic)); consumers.add(cThread); es.submit(cThread); } //Thread.sleep(1000L); /* 这个方法的意思就是在JVM中增加一个关闭的钩子,当JVM关闭的时候, 会执行系统中已经设置的所有通过方法addShutdownHook添加的钩子,当系统执行完这些钩子后, JVM才会关闭。所以这些钩子可以在JVM关闭的时候进行内存清理、对象销毁等操作。*/ Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { for(MutilConsumerThread consumer :consumers ) { consumer.shutdown(); } } }); }}
看完上述内容,你们掌握如何解析Kafka 1.0.0 多消费者示例的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注行业资讯频道,感谢各位的阅读!
数据
配置
序列
方法
时候
钩子
消费
意味
消费者
示例
内容
就是
持久性
更多
系统
问题
同步
最低
最小
成功
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
软件开发推广的简历怎么写
杭州市软件开发平台
游戏服务器后端开发面试题
晨曦软件开发有限公司
台州领策网络技术有限公司
电脑如何建立ftp服务器视频教程
庚顿数据库的安全配置
郑州市软件开发最低征收率
UI设计和软件开发哪个简单
最好的软件开发培训班
高二网络技术应用考试
专业信息和网络技术服务
在access数据库类型
服务器配置阵列
软件开发工程需要资质吗
oracle数据库谁发明的
网络安全时间典例
比较hive和传统数据库
做软件开发可以考造价师
中国网络安全领域排名
学好软件开发能去大公司上班吗
怎么查询数据库某一行
搜点网络技术有限公司怎么样
网络安全工程师证去哪儿考
石狮市软件开发
服务器安装安全狗好吗
云端服务器 客户端
华为信创服务器操作系统
数据库目录树设计
社招去银行做软件开发