Springboot如何集成Kafka进行批量消费
发表于:2025-01-18 作者:千家信息网编辑
千家信息网最后更新 2025年01月18日,本篇内容主要讲解"Springboot如何集成Kafka进行批量消费",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"Springboot如何集成Kafka进
千家信息网最后更新 2025年01月18日Springboot如何集成Kafka进行批量消费
本篇内容主要讲解"Springboot如何集成Kafka进行批量消费",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"Springboot如何集成Kafka进行批量消费"吧!
引入依赖
org.springframework.kafka spring-kafka 1.3.11.RELEASE
因为我的项目的 springboot 版本是 1.5.22.RELEASE,所以引的是 1.3.11.RELEASE 的包。读者可以根据下图来自行选择对应的版本。图片更新可能不及时,详情可查看spring-kafka 官方网站。
注:这里有个踩坑点,如果引入包版本不对,项目启动时会抛出org.springframework.core.log.LogAccessor 异常:
java.lang.ClassNotFoundException: org.springframework.core.log.LogAccessor
创建配置类
/** * kafka 配置类 */ @Configuration @EnableKafka public class KafkaConsumerConfig { private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(KafkaConsumerConfig.class); @Value("${kafka.bootstrap.servers}") private String kafkaBootstrapServers; @Value("${kafka.group.id}") private String kafkaGroupId; @Value("${kafka.topic}") private String kafkaTopic; public static final String CONFIG_PATH = "/home/admin/xxx/BOOT-INF/classes/kafka_client_jaas.conf"; public static final String LOCATION_PATH = "/home/admin/xxx/BOOT-INF/classes/kafka.client.truststore.jks"; @Bean public KafkaListenerContainerFactory> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); // 设置并发量,小于或者等于 Topic 的分区数 factory.setConcurrency(5); // 设置为批量监听 factory.setBatchListener(Boolean.TRUE); factory.getContainerProperties().setPollTimeout(30000); return factory; } public ConsumerFactory consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } public Map consumerConfigs() { Map props = new HashMap<>(); //设置接入点,请通过控制台获取对应Topic的接入点。 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers); //设置SSL根证书的路径,请记得将XXX修改为自己的路径。 //与SASL路径类似,该文件也不能被打包到jar中。 System.setProperty("java.security.auth.login.config", CONFIG_PATH); props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, LOCATION_PATH); //根证书存储的密码,保持不变。 props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient"); //接入协议,目前支持使用SASL_SSL协议接入。 props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL"); //SASL鉴权方式,保持不变。 props.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); // 自动提交 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.TRUE); //两次Poll之间的最大允许间隔。 //消费者超过该值没有返回心跳,服务端判断消费者处于非存活状态,服务端将消费者从Consumer Group移除并触发Rebalance,默认30s。 props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000); //设置单次拉取的量,走公网访问时,该参数会有较大影响。 props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 32000); props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 32000); //每次Poll的最大数量。 //注意该值不要改得太大,如果Poll太多数据,而不能在下次Poll之前消费完,则会触发一次负载均衡,产生卡顿。 props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30); //消息的反序列化方式。 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); //当前消费实例所属的消费组,请在控制台申请之后填写。 //属于同一个组的消费实例,会负载消费消息。 props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId); //Hostname校验改成空。 props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ""); return props; } }
注:此处通过 factory.setConcurrency(5); 配置了并发量为 5 ,假设我们线上的 Topic 有 12 个分区。那么将会是 3 个线程分配到 2 个分区,2 个线程分配到 3 个分区,3 * 2 + 2 * 3 = 12。
Kafka 消费者
/** * kafka 消息消费类 */ @Component public class KafkaMessageListener { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaMessageListener.class); @KafkaListener(topics = {"${kafka.topic}"}) public void listen(List> recordList) { for (ConsumerRecord record : recordList) { // 打印消息的分区以及偏移量 LOGGER.info("Kafka Consume partition:{}, offset:{}", record.partition(), record.offset()); String value = record.value(); System.out.println("value = " + value); // 处理业务逻辑 ... } } }
因为我在配置类中设置了批量监听,所以此处 listen 方法的入参是List:List
到此,相信大家对"Springboot如何集成Kafka进行批量消费"有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!
消费
消息
消费者
接入
配置
版本
路径
最大
内容
实例
接入点
控制台
方式
方法
线程
网站
证书
项目
分配
学习
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
网络安全 合同
hp服务器加内存
万方数据库知识平台
广州市淘库网络技术有限公司盖章
数据库智能诊断
网络安全最高等级是什么
计算机网络技术新华杯
网络安全管理设备投入
佛山市南海网络安全大队吴森源
dell服务器u盘引导
我的世界服务器手机欢迎
无锡多功能软件开发常见问题
图像分析软件开发
数据库表实验原理
数据库的表自增字段
咸宁软件开发制作
启动管理服务器命令
500人游戏服务器一月多少钱
软件开发管理团队队名
天苍宝可梦手机端怎么添加服务器
阿里未来网络安全
天一晟汇北京网络技术
网络安全 合同
HDR贴图软件开发
杭州齐盛网络技术 掼蛋
盐池企业网站服务器
达梦数据库的最新版本是什么
长沙澳博软件开发有限公司官网
脱贫攻坚数据库中国发展网
长城杯网络安全竞赛晋级名单