千家信息网

Springboot 2.x集成kafka 2.2.0的方法

发表于:2025-01-18 作者:千家信息网编辑
千家信息网最后更新 2025年01月18日,本文小编为大家详细介绍"Springboot 2.x集成kafka 2.2.0的方法",内容详细,步骤清晰,细节处理妥当,希望这篇"Springboot 2.x集成kafka 2.2.0的方法"文章能
千家信息网最后更新 2025年01月18日Springboot 2.x集成kafka 2.2.0的方法

本文小编为大家详细介绍"Springboot 2.x集成kafka 2.2.0的方法",内容详细,步骤清晰,细节处理妥当,希望这篇"Springboot 2.x集成kafka 2.2.0的方法"文章能帮助大家解决疑惑,下面跟着小编的思路慢慢深入,一起来学习新知识吧。

    引言

    kafka近几年更新非常快,也可以看出kafka在企业中是用的频率越来越高,在springboot中集成kafka还是比较简单的,但是应该注意使用的版本和kafka中基本配置,这个地方需要信心,防止进入坑中。

    基本环境

    springboot版本2.1.4

    kafka版本2.2.0

    jdk 1.8

    代码编写

    1、基本引用pom

            4.0.0                        org.springframework.boot                spring-boot-starter-parent                2.1.4.RELEASE                                 com.example        demo        0.0.1-SNAPSHOT        kafkademo        Demo project for Spring Boot                         1.8                                                         org.springframework.boot                        spring-boot-starter-web                                                         mysql                        mysql-connector-java                        runtime                                                        org.springframework.boot                        spring-boot-starter-test                        test                                                         org.springframework.kafka                        spring-kafka                        2.2.0.RELEASE                                                         com.google.code.gson                        gson                        2.7                                                                                                          org.springframework.boot                                spring-boot-maven-plugin                                                 

    2、基本配置

    spring.kafka.bootstrap-servers=2.1.1.1:9092spring.kafka.consumer.group-id=test-consumer-groupspring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializerspring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializerspring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializerspring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer #logging.level.root=debug

    3、实体类

    package com.example.demo.model; import java.util.Date; public class Messages {     private Long id;     private String msg;     private Date sendTime;     public Long getId() {        return id;    }     public void setId(Long id) {        this.id = id;    }     public String getMsg() {        return msg;    }     public void setMsg(String msg) {        this.msg = msg;    }     public Date getSendTime() {        return sendTime;    }     public void setSendTime(Date sendTime) {        this.sendTime = sendTime;    }}

    4、生产者端

    package com.example.demo.service; import com.example.demo.model.Messages;import com.google.gson.Gson;import com.google.gson.GsonBuilder;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.kafka.support.SendResult;import org.springframework.stereotype.Service;import org.springframework.util.concurrent.ListenableFuture; import java.util.Date;import java.util.UUID; @Servicepublic class KafkaSender {     @Autowired    private KafkaTemplate kafkaTemplate;     private Gson gson = new GsonBuilder().create();     public void send() {        Messages message = new Messages();        message.setId(System.currentTimeMillis());        message.setMsg("123");        message.setSendTime(new Date());        ListenableFuture> test0 = kafkaTemplate.send("newtopic", gson.toJson(message));    }}

    5、消费者

    package com.example.demo.service; import org.apache.kafka.clients.consumer.ConsumerRecord;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.stereotype.Service; import java.util.Optional; @Servicepublic class KafkaReceiver {     @KafkaListener(topics = {"newtopic"})    public void listen(ConsumerRecord record) {        Optional kafkaMessage = Optional.ofNullable(record.value());        if (kafkaMessage.isPresent()) {            Object message = kafkaMessage.get();            System.out.println("record =" + record);            System.out.println("message =" + message);         }    }    }

    6、测试

    在启动方法中模拟消息生产者,向kafka中发送消息

    package com.example.demo; import com.example.demo.service.KafkaSender;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.context.ConfigurableApplicationContext; @SpringBootApplicationpublic class KafkademoApplication {         public static void main(String[] args) {                ConfigurableApplicationContext context = SpringApplication.run(KafkademoApplication.class, args);                 KafkaSender sender = context.getBean(KafkaSender.class);                for (int i = 0; i <1000; i++) {                        sender.send();                         try {                                Thread.sleep(300);                        } catch (InterruptedException e) {                                e.printStackTrace();                        }                }          } }

    效果展示

    命令行直接消费消息

    遇到的问题

    生产端连接kafka超时

    at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:119)

    解决方案:

    修改kafka中的server.properties中的下面配置,将原来的默认配置替换成下面ip+端口的形式,重启kafka

    读到这里,这篇"Springboot 2.x集成kafka 2.2.0的方法"文章已经介绍完毕,想要掌握这篇文章的知识点还需要大家自己动手实践使用过才能领会,如果想了解更多相关内容的文章,欢迎关注行业资讯频道。

    0