如何进行kafka批量消费多消费者问题分析
发表于:2025-01-22 作者:千家信息网编辑
千家信息网最后更新 2025年01月22日,今天就跟大家聊聊有关如何进行kafka批量消费多消费者问题分析,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。package com.llw.m
千家信息网最后更新 2025年01月22日如何进行kafka批量消费多消费者问题分析
今天就跟大家聊聊有关如何进行kafka批量消费多消费者问题分析,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。
package com.llw.medical.bs.listener;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.kafka.annotation.TopicPartition;import org.springframework.stereotype.Component;import java.util.List;import java.util.Optional;@Componentpublic class KafakaListener {@KafkaListener(id = "1", topics = {"topic2"})public void listen(ConsumerRecord, ?> record) { Optional> kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); System.out.println("----------------- record =" + record); System.out.println("----------------- message =" + message); } }@KafkaListener(id = "2", topicPartitions = {@TopicPartition(topic = "topic1", partitions = {"1", "2", "3"}// partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "4") ) })public void listen2(ConsumerRecord, ?> record) { Optional> kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); System.out.println("----------------- record 1=" + record); System.out.println("------------------ message 1=" + message); } }//id = "4", //id="4" @KafkaListener( id= "4",groupId = "1",topics="topic1", /*topicPartitions = {@TopicPartition(topic = "topic1", partitions = {"0"} // partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "4") )*//* },*/ containerFactory = "kafkaBatchListener6")public void listen3(List> records) {//, Acknowledgment ack try {for (ConsumerRecord, ?> record : records) { Optional> kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); System.out.println("----------------- record 4=" + record);// System.out.println("------------------ message 4=" + message); } } } finally {// ack.acknowledge(); } }//id="5" @KafkaListener(id = "5",groupId = "1",topics="topic1", /*topicPartitions = {@TopicPartition(topic = "topic1", partitions = {"0"} // partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "4") ) },*/ containerFactory = "kafkaBatchListener6")public void listen2(List > records) {//, Acknowledgment ack try {for (ConsumerRecord, ?> record : records) { Optional> kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); System.out.println("----------------- record 6=" + record);// System.out.println("------------------ message 6=" + message); } } } finally {// ack.acknowledge(); } }//https://www.cnblogs.com/linjiqin/p/13171789.html @KafkaListener(id = "6",groupId = "1",topics="topic1",/* topicPartitions = {@TopicPartition(topic = "topic1", partitions = {"0"} // partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "4") ) }, */containerFactory = "kafkaBatchListener6")public void listen4(List > records) {try {for (ConsumerRecord, ?> record : records) { Optional> kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); System.out.println("----------------- record 3=" + record);// System.out.println("------------------ message 6=" + message); } } } finally {// ack.acknowledge(); } }}
一个partition只能有一个消费者,如果多个消费者会是广播模式,每个消费者都会有一条数据,kafka是一个发布和订阅模式的主键,并不是队列模式,
spring boot整合时,如果使用topicPartitions 注解参数指定partition会有消息重复消费的问题,最好使用topics注解,并指定groupId。
看完上述内容,你们对如何进行kafka批量消费多消费者问题分析有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注行业资讯频道,感谢大家的支持。
消费
消费者
问题
内容
模式
费多
分析
注解
参数
多个
数据
更多
最好
消息
知识
篇文章
行业
资讯
资讯频道
队列
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
博士网络安全数据挖掘
魔王him服务器被炸
数据库动态系统
德州软件开发大学生
天长直销软件开发技术哪家好
软件开发与计算机科学与技术
数据库d版和e版
乐高无限怎么在服务器里装饰
10大虚拟服务器品牌
数据库如何在jsp页面显示
乡镇网络安全预算方案
软件开发总体设计原则
网络安全微动漫
河南青少年网络安全周
u8维护数据库
学习网络安全教育简报
e 服务器ip
有实力软件开发优惠
丽雅网络技术
列数据库的数据压缩
确保 软件开发安全
可以加字的p图软件开发
摄像头管理服务器离线
重庆交易软件开发公司
网络安全员需要具备素质
iphone文件扫描服务器
恒生科技和恒生互联网ETF
显示连接服务器出错咋办
小管家数据库怎样卸载
数学方面软件开发题目