如何进行kafka批量消费多消费者问题分析
发表于:2025-02-23 作者:千家信息网编辑
千家信息网最后更新 2025年02月23日,今天就跟大家聊聊有关如何进行kafka批量消费多消费者问题分析,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。package com.llw.m
千家信息网最后更新 2025年02月23日如何进行kafka批量消费多消费者问题分析
今天就跟大家聊聊有关如何进行kafka批量消费多消费者问题分析,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。
package com.llw.medical.bs.listener;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.kafka.annotation.TopicPartition;import org.springframework.stereotype.Component;import java.util.List;import java.util.Optional;@Componentpublic class KafakaListener {@KafkaListener(id = "1", topics = {"topic2"})public void listen(ConsumerRecord, ?> record) { Optional> kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); System.out.println("----------------- record =" + record); System.out.println("----------------- message =" + message); } }@KafkaListener(id = "2", topicPartitions = {@TopicPartition(topic = "topic1", partitions = {"1", "2", "3"}// partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "4") ) })public void listen2(ConsumerRecord, ?> record) { Optional> kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); System.out.println("----------------- record 1=" + record); System.out.println("------------------ message 1=" + message); } }//id = "4", //id="4" @KafkaListener( id= "4",groupId = "1",topics="topic1", /*topicPartitions = {@TopicPartition(topic = "topic1", partitions = {"0"} // partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "4") )*//* },*/ containerFactory = "kafkaBatchListener6")public void listen3(List> records) {//, Acknowledgment ack try {for (ConsumerRecord, ?> record : records) { Optional> kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); System.out.println("----------------- record 4=" + record);// System.out.println("------------------ message 4=" + message); } } } finally {// ack.acknowledge(); } }//id="5" @KafkaListener(id = "5",groupId = "1",topics="topic1", /*topicPartitions = {@TopicPartition(topic = "topic1", partitions = {"0"} // partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "4") ) },*/ containerFactory = "kafkaBatchListener6")public void listen2(List > records) {//, Acknowledgment ack try {for (ConsumerRecord, ?> record : records) { Optional> kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); System.out.println("----------------- record 6=" + record);// System.out.println("------------------ message 6=" + message); } } } finally {// ack.acknowledge(); } }//https://www.cnblogs.com/linjiqin/p/13171789.html @KafkaListener(id = "6",groupId = "1",topics="topic1",/* topicPartitions = {@TopicPartition(topic = "topic1", partitions = {"0"} // partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "4") ) }, */containerFactory = "kafkaBatchListener6")public void listen4(List > records) {try {for (ConsumerRecord, ?> record : records) { Optional> kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); System.out.println("----------------- record 3=" + record);// System.out.println("------------------ message 6=" + message); } } } finally {// ack.acknowledge(); } }}
一个partition只能有一个消费者,如果多个消费者会是广播模式,每个消费者都会有一条数据,kafka是一个发布和订阅模式的主键,并不是队列模式,
spring boot整合时,如果使用topicPartitions 注解参数指定partition会有消息重复消费的问题,最好使用topics注解,并指定groupId。
看完上述内容,你们对如何进行kafka批量消费多消费者问题分析有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注行业资讯频道,感谢大家的支持。
消费
消费者
问题
内容
模式
费多
分析
注解
参数
多个
数据
更多
最好
消息
知识
篇文章
行业
资讯
资讯频道
队列
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
哪些方面会运用到数据库
嵌入式软件服务器开发
亚德客选型软件用什么软件开发
宝山区微型软件开发工艺
qq服务器可以送人吗
部队网络安全ppt
蓝矩科创网络技术有限公司
k3客户端连接服务器
乐陵软件开发供应
软件开发要先学什么技能
我的世界服务器id1.8
视频会议如何增加数据库
对象-关系型数据库
第三季度网络安全会议
香港服务器用国内数据库快吗
无线网络安全的ppt课件
数据库出现死锁场景
移动营业厅网络安全吗
软件开发文档是否要入库管理
草图大师怎么在服务器上运行
华为服务器叫什么名字呢
南京软件开发介绍
南京现代化软件开发资费
万方文献数据库官网
人民网络安全 重要性
宁波东方网络技术有限公司
IBM服务器 装系统
数据库图片解析
大型网站用什么软件开发
济南天桥区政府软件开发