千家信息网

Springboot如何整合RocketMQ收发消息

发表于:2025-01-16 作者:千家信息网编辑
千家信息网最后更新 2025年01月16日,这篇文章将为大家详细讲解有关Springboot如何整合RocketMQ收发消息,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。Springboot 整合 Rocke
千家信息网最后更新 2025年01月16日Springboot如何整合RocketMQ收发消息

这篇文章将为大家详细讲解有关Springboot如何整合RocketMQ收发消息,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。

Springboot 整合 RocketMQ 收发消息

创建springboot项目

pom.xml添加rocketmq-spring-boot-starter依赖。

    org.apache.rocketmq    rocketmq-spring-boot-starter    2.1.0

yml 配置

application.yml

rocketmq:  name-server: 192.168.64.141:9876

application-demo1.yml

使用 demo1 profile 指定生产者组组名

rocketmq:  producer:    group: producer-demo1

application-demo2.yml

使用 demo2 profile 指定生产者组组名

rocketmq:  producer:    group: producer-demo2

测试

demo 1

  • 发送普通消息

  • 发送 Spring 的通用 Message 对象

  • 发送异步消息

  • 发送顺序消息

生产者

package cn.tedu.demo2.m1;import org.apache.rocketmq.client.producer.SendCallback;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.spring.core.RocketMQTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.messaging.Message;import org.springframework.messaging.support.MessageBuilder;import org.springframework.stereotype.Component;@Componentpublic class Producer {    @Autowired    private RocketMQTemplate t ;    public  void send(){        //发送同步消息        t.convertAndSend("Topic1:TagA", "Hello world! ");        //发送spring的Message        Message message = MessageBuilder.withPayload("Hello Spring message! ").build();        t.send("Topic1:TagA",message);        //发送异步消息        t.asyncSend("Topic1:TagA", "hello world asyn", new SendCallback() {            @Override            public void onSuccess(SendResult sendResult) {                System.out.println("发送成功");            }            @Override            public void onException(Throwable throwable) {                System.out.println("发送失败");            }        });        //发送顺序消息        t.syncSendOrderly("Topic1", "98456237,创建", "98456237");        t.syncSendOrderly("Topic1", "98456237,支付", "98456237");        t.syncSendOrderly("Topic1", "98456237,完成", "98456237");    }}

消费者

package cn.tedu.demo2.m1;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQListener;import org.springframework.stereotype.Component;@Component@RocketMQMessageListener(topic = "Topic1",consumerGroup = "consumer-demo1")public class Consumer  implements RocketMQListener {    @Override    public void onMessage(String s) {        System.out.println("收到"+s);    }}

主类

package cn.tedu.demo2.m1;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplicationpublic class Main {    public static void main(String[] args) {        SpringApplication.run(Main.class, args);    }}

测试类

需要放在 test 文件夹

激活 demo1 profile @ActiveProfiles("demo1")

package cn.tedu.demo2.m1;import org.junit.jupiter.api.Test;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.ActiveProfiles;@SpringBootTest@ActiveProfiles("demo1")public class Test1 {    @Autowired    private  Producer producer;    @Test    public void test1(){        producer.send();        try {            Thread.sleep(5000);        } catch (InterruptedException e) {            e.printStackTrace();        }    }}

demo 2

发送事务消息

生产者

package cn.tedu.demo2.m2;import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;import org.apache.rocketmq.spring.core.RocketMQTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.messaging.Message;import org.springframework.messaging.support.MessageBuilder;import org.springframework.stereotype.Component;@Componentpublic class Producer {    @Autowired    private RocketMQTemplate t;    public void send(){        Message message = MessageBuilder.withPayload("Hello world").build();        //一旦发送消息,则执行监听器        t.sendMessageInTransaction("Topic2",message,null);    }    @RocketMQTransactionListener    class Lis implements RocketMQLocalTransactionListener {        @Override        public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {            System.out.println("执行本地事务");            return RocketMQLocalTransactionState.UNKNOWN;        }        @Override        public RocketMQLocalTransactionState checkLocalTransaction(Message message) {            System.out.println("执行事务回查");            return RocketMQLocalTransactionState.COMMIT;        }    }}

消费者

package cn.tedu.demo2.m2;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQListener;import org.springframework.stereotype.Component;@Component@RocketMQMessageListener(topic = "Topic2",consumerGroup = "consumer-demo2")public class Consumer implements RocketMQListener {    @Override    public void onMessage(String s) {        System.out.println("收到"+s);    }}

主类

package cn.tedu.demo2.m2;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplicationpublic class Main {    public static void main(String[] args) {        SpringApplication.run(Main.class, args);    }}

测试类

package cn.tedu.demo2.m2;import org.junit.jupiter.api.Test;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.ActiveProfiles;@SpringBootTest@ActiveProfiles("demo2")public class Test2 {    @Autowired    private  Producer producer;    @Test    public void  test1(){        producer.send();        //为了能够收到消费者消费的数据,这里通过休眠模拟等待时间        try {            Thread.sleep(30000);        } catch (InterruptedException e) {            e.printStackTrace();        }    }}

关于"Springboot如何整合RocketMQ收发消息"这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,使各位可以学到更多知识,如果觉得文章不错,请把它分享出去让更多的人看到。

0