Springboot 2.x集成kafka 2.2.0的方法
发表于:2025-01-18 作者:千家信息网编辑
千家信息网最后更新 2025年01月18日,本文小编为大家详细介绍"Springboot 2.x集成kafka 2.2.0的方法",内容详细,步骤清晰,细节处理妥当,希望这篇"Springboot 2.x集成kafka 2.2.0的方法"文章能
千家信息网最后更新 2025年01月18日Springboot 2.x集成kafka 2.2.0的方法
本文小编为大家详细介绍"Springboot 2.x集成kafka 2.2.0的方法",内容详细,步骤清晰,细节处理妥当,希望这篇"Springboot 2.x集成kafka 2.2.0的方法"文章能帮助大家解决疑惑,下面跟着小编的思路慢慢深入,一起来学习新知识吧。
引言
kafka近几年更新非常快,也可以看出kafka在企业中是用的频率越来越高,在springboot中集成kafka还是比较简单的,但是应该注意使用的版本和kafka中基本配置,这个地方需要信心,防止进入坑中。
基本环境
springboot版本2.1.4
kafka版本2.2.0
jdk 1.8
代码编写
1、基本引用pom
4.0.0 org.springframework.boot spring-boot-starter-parent 2.1.4.RELEASE com.example demo 0.0.1-SNAPSHOT kafkademo Demo project for Spring Boot 1.8 org.springframework.boot spring-boot-starter-web mysql mysql-connector-java runtime org.springframework.boot spring-boot-starter-test test org.springframework.kafka spring-kafka 2.2.0.RELEASE com.google.code.gson gson 2.7 org.springframework.boot spring-boot-maven-plugin
2、基本配置
spring.kafka.bootstrap-servers=2.1.1.1:9092spring.kafka.consumer.group-id=test-consumer-groupspring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializerspring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializerspring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializerspring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer #logging.level.root=debug
3、实体类
package com.example.demo.model; import java.util.Date; public class Messages { private Long id; private String msg; private Date sendTime; public Long getId() { return id; } public void setId(Long id) { this.id = id; } public String getMsg() { return msg; } public void setMsg(String msg) { this.msg = msg; } public Date getSendTime() { return sendTime; } public void setSendTime(Date sendTime) { this.sendTime = sendTime; }}
4、生产者端
package com.example.demo.service; import com.example.demo.model.Messages;import com.google.gson.Gson;import com.google.gson.GsonBuilder;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.kafka.support.SendResult;import org.springframework.stereotype.Service;import org.springframework.util.concurrent.ListenableFuture; import java.util.Date;import java.util.UUID; @Servicepublic class KafkaSender { @Autowired private KafkaTemplatekafkaTemplate; private Gson gson = new GsonBuilder().create(); public void send() { Messages message = new Messages(); message.setId(System.currentTimeMillis()); message.setMsg("123"); message.setSendTime(new Date()); ListenableFuture > test0 = kafkaTemplate.send("newtopic", gson.toJson(message)); }}
5、消费者
package com.example.demo.service; import org.apache.kafka.clients.consumer.ConsumerRecord;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.stereotype.Service; import java.util.Optional; @Servicepublic class KafkaReceiver { @KafkaListener(topics = {"newtopic"}) public void listen(ConsumerRecord, ?> record) { Optional> kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); System.out.println("record =" + record); System.out.println("message =" + message); } } }
6、测试
在启动方法中模拟消息生产者,向kafka中发送消息
package com.example.demo; import com.example.demo.service.KafkaSender;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.context.ConfigurableApplicationContext; @SpringBootApplicationpublic class KafkademoApplication { public static void main(String[] args) { ConfigurableApplicationContext context = SpringApplication.run(KafkademoApplication.class, args); KafkaSender sender = context.getBean(KafkaSender.class); for (int i = 0; i <1000; i++) { sender.send(); try { Thread.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); } } } }
效果展示
命令行直接消费消息
遇到的问题
生产端连接kafka超时
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:119)
解决方案:
修改kafka中的server.properties中的下面配置,将原来的默认配置替换成下面ip+端口的形式,重启kafka
读到这里,这篇"Springboot 2.x集成kafka 2.2.0的方法"文章已经介绍完毕,想要掌握这篇文章的知识点还需要大家自己动手实践使用过才能领会,如果想了解更多相关内容的文章,欢迎关注行业资讯频道。
方法
配置
文章
消息
版本
生产
内容
生产者
消费
妥当
代码
企业
信心
命令
地方
基本配置
实体
引言
形式
思路
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
java和数据库联系在一起
48v服务器直流电源
网络安全第三方检测机构
数据库创建事物的一般过程
冒险岛服务器选择
万得数据库怎么查询价格变动
西安app软件开发哪个好
养老保险认证显示服务器异常
软件开发 不努力
工商上传文件压缩数据库
计算机网络技术利弊
软件开发行业适合什么星座
计算机四级数据库
数据库算法与结构图
网络安全认证检测
华为5g网络技术缺陷
网络技术岗位工资
前瞻网数据库vip账号
服务器管理权限帐号
数据管理平台软件开发需求
小程序 调用数据库
德州银行目前用什么数据库
word服务器备案
我的世界中国年服务器地址是什么
自学计算机网络技术应用
网络安全示范小区重庆
网络安全暖场视频心得体会
网络安全责任制实施办法大学生
思科计算机网络安全课程答案
窗体是数据库应用系统的什么