Springboot如何整合RocketMQ收发消息
发表于:2025-01-16 作者:千家信息网编辑
千家信息网最后更新 2025年01月16日,这篇文章将为大家详细讲解有关Springboot如何整合RocketMQ收发消息,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。Springboot 整合 Rocke
千家信息网最后更新 2025年01月16日Springboot如何整合RocketMQ收发消息
这篇文章将为大家详细讲解有关Springboot如何整合RocketMQ收发消息,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。
Springboot 整合 RocketMQ 收发消息
创建springboot项目
pom.xml添加rocketmq-spring-boot-starter
依赖。
org.apache.rocketmq rocketmq-spring-boot-starter 2.1.0
yml 配置
application.yml
rocketmq: name-server: 192.168.64.141:9876
application-demo1.yml
使用 demo1 profile 指定生产者组组名
rocketmq: producer: group: producer-demo1
application-demo2.yml
使用 demo2 profile 指定生产者组组名
rocketmq: producer: group: producer-demo2
测试
demo 1
发送普通消息
发送 Spring 的通用 Message 对象
发送异步消息
发送顺序消息
生产者
package cn.tedu.demo2.m1;import org.apache.rocketmq.client.producer.SendCallback;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.spring.core.RocketMQTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.messaging.Message;import org.springframework.messaging.support.MessageBuilder;import org.springframework.stereotype.Component;@Componentpublic class Producer { @Autowired private RocketMQTemplate t ; public void send(){ //发送同步消息 t.convertAndSend("Topic1:TagA", "Hello world! "); //发送spring的Message Messagemessage = MessageBuilder.withPayload("Hello Spring message! ").build(); t.send("Topic1:TagA",message); //发送异步消息 t.asyncSend("Topic1:TagA", "hello world asyn", new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.println("发送成功"); } @Override public void onException(Throwable throwable) { System.out.println("发送失败"); } }); //发送顺序消息 t.syncSendOrderly("Topic1", "98456237,创建", "98456237"); t.syncSendOrderly("Topic1", "98456237,支付", "98456237"); t.syncSendOrderly("Topic1", "98456237,完成", "98456237"); }}
消费者
package cn.tedu.demo2.m1;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQListener;import org.springframework.stereotype.Component;@Component@RocketMQMessageListener(topic = "Topic1",consumerGroup = "consumer-demo1")public class Consumer implements RocketMQListener{ @Override public void onMessage(String s) { System.out.println("收到"+s); }}
主类
package cn.tedu.demo2.m1;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplicationpublic class Main { public static void main(String[] args) { SpringApplication.run(Main.class, args); }}
测试类
需要放在 test 文件夹
激活 demo1 profile @ActiveProfiles("demo1")
package cn.tedu.demo2.m1;import org.junit.jupiter.api.Test;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.ActiveProfiles;@SpringBootTest@ActiveProfiles("demo1")public class Test1 { @Autowired private Producer producer; @Test public void test1(){ producer.send(); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } }}
demo 2
发送事务消息
生产者
package cn.tedu.demo2.m2;import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;import org.apache.rocketmq.spring.core.RocketMQTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.messaging.Message;import org.springframework.messaging.support.MessageBuilder;import org.springframework.stereotype.Component;@Componentpublic class Producer { @Autowired private RocketMQTemplate t; public void send(){ Messagemessage = MessageBuilder.withPayload("Hello world").build(); //一旦发送消息,则执行监听器 t.sendMessageInTransaction("Topic2",message,null); } @RocketMQTransactionListener class Lis implements RocketMQLocalTransactionListener { @Override public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) { System.out.println("执行本地事务"); return RocketMQLocalTransactionState.UNKNOWN; } @Override public RocketMQLocalTransactionState checkLocalTransaction(Message message) { System.out.println("执行事务回查"); return RocketMQLocalTransactionState.COMMIT; } }}
消费者
package cn.tedu.demo2.m2;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQListener;import org.springframework.stereotype.Component;@Component@RocketMQMessageListener(topic = "Topic2",consumerGroup = "consumer-demo2")public class Consumer implements RocketMQListener{ @Override public void onMessage(String s) { System.out.println("收到"+s); }}
主类
package cn.tedu.demo2.m2;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplicationpublic class Main { public static void main(String[] args) { SpringApplication.run(Main.class, args); }}
测试类
package cn.tedu.demo2.m2;import org.junit.jupiter.api.Test;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.ActiveProfiles;@SpringBootTest@ActiveProfiles("demo2")public class Test2 { @Autowired private Producer producer; @Test public void test1(){ producer.send(); //为了能够收到消费者消费的数据,这里通过休眠模拟等待时间 try { Thread.sleep(30000); } catch (InterruptedException e) { e.printStackTrace(); } }}
关于"Springboot如何整合RocketMQ收发消息"这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,使各位可以学到更多知识,如果觉得文章不错,请把它分享出去让更多的人看到。
消息
生产者
消费
生产
整合
事务
消费者
篇文章
测试
更多
顺序
不错
实用
普通
成功
内容
对象
数据
文件
文件夹
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
四川工程软件开发价格
胶州ios软件开发企业
湖北省网络安全协会会长是谁
先进网络安全使用方法
小学安全教案网络安全要牢记
汇丰软件开发 面试
网络安全合规培训资料
华硕x79支持服务器
要投多少钱软件开发公司
软件开发人员是什么职位
武神4最火服务器
sql数据库国内研究现状
数据库检索压缩文件中pdf内容
pr中媒体缓存数据库
家具数据库
主机云服务器
1t服务器硬盘有多重
网络及网络安全培训什么内容
一年级网络安全手抄报大全
宁远租房软件开发
其他服务器访问本机数据库
长城宽带搭建服务器
云南什么是软件开发
方舟怎么成为服务器管理员
衡山县网络安全和信息化
成都极视互联网科技有限公司
维护网络安全 广告
未来之役服务器是什么
软件开发转行没机会
服务器安装家用系统开机还是慢吗