Springboot如何整合RocketMQ收发消息
发表于:2024-10-17 作者:千家信息网编辑
千家信息网最后更新 2024年10月17日,这篇文章将为大家详细讲解有关Springboot如何整合RocketMQ收发消息,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。Springboot 整合 Rocke
千家信息网最后更新 2024年10月17日Springboot如何整合RocketMQ收发消息
这篇文章将为大家详细讲解有关Springboot如何整合RocketMQ收发消息,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。
Springboot 整合 RocketMQ 收发消息
创建springboot项目
pom.xml添加rocketmq-spring-boot-starter
依赖。
org.apache.rocketmq rocketmq-spring-boot-starter 2.1.0
yml 配置
application.yml
rocketmq: name-server: 192.168.64.141:9876
application-demo1.yml
使用 demo1 profile 指定生产者组组名
rocketmq: producer: group: producer-demo1
application-demo2.yml
使用 demo2 profile 指定生产者组组名
rocketmq: producer: group: producer-demo2
测试
demo 1
发送普通消息
发送 Spring 的通用 Message 对象
发送异步消息
发送顺序消息
生产者
package cn.tedu.demo2.m1;import org.apache.rocketmq.client.producer.SendCallback;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.spring.core.RocketMQTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.messaging.Message;import org.springframework.messaging.support.MessageBuilder;import org.springframework.stereotype.Component;@Componentpublic class Producer { @Autowired private RocketMQTemplate t ; public void send(){ //发送同步消息 t.convertAndSend("Topic1:TagA", "Hello world! "); //发送spring的Message Messagemessage = MessageBuilder.withPayload("Hello Spring message! ").build(); t.send("Topic1:TagA",message); //发送异步消息 t.asyncSend("Topic1:TagA", "hello world asyn", new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.println("发送成功"); } @Override public void onException(Throwable throwable) { System.out.println("发送失败"); } }); //发送顺序消息 t.syncSendOrderly("Topic1", "98456237,创建", "98456237"); t.syncSendOrderly("Topic1", "98456237,支付", "98456237"); t.syncSendOrderly("Topic1", "98456237,完成", "98456237"); }}
消费者
package cn.tedu.demo2.m1;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQListener;import org.springframework.stereotype.Component;@Component@RocketMQMessageListener(topic = "Topic1",consumerGroup = "consumer-demo1")public class Consumer implements RocketMQListener{ @Override public void onMessage(String s) { System.out.println("收到"+s); }}
主类
package cn.tedu.demo2.m1;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplicationpublic class Main { public static void main(String[] args) { SpringApplication.run(Main.class, args); }}
测试类
需要放在 test 文件夹
激活 demo1 profile @ActiveProfiles("demo1")
package cn.tedu.demo2.m1;import org.junit.jupiter.api.Test;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.ActiveProfiles;@SpringBootTest@ActiveProfiles("demo1")public class Test1 { @Autowired private Producer producer; @Test public void test1(){ producer.send(); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } }}
demo 2
发送事务消息
生产者
package cn.tedu.demo2.m2;import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;import org.apache.rocketmq.spring.core.RocketMQTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.messaging.Message;import org.springframework.messaging.support.MessageBuilder;import org.springframework.stereotype.Component;@Componentpublic class Producer { @Autowired private RocketMQTemplate t; public void send(){ Messagemessage = MessageBuilder.withPayload("Hello world").build(); //一旦发送消息,则执行监听器 t.sendMessageInTransaction("Topic2",message,null); } @RocketMQTransactionListener class Lis implements RocketMQLocalTransactionListener { @Override public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) { System.out.println("执行本地事务"); return RocketMQLocalTransactionState.UNKNOWN; } @Override public RocketMQLocalTransactionState checkLocalTransaction(Message message) { System.out.println("执行事务回查"); return RocketMQLocalTransactionState.COMMIT; } }}
消费者
package cn.tedu.demo2.m2;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQListener;import org.springframework.stereotype.Component;@Component@RocketMQMessageListener(topic = "Topic2",consumerGroup = "consumer-demo2")public class Consumer implements RocketMQListener{ @Override public void onMessage(String s) { System.out.println("收到"+s); }}
主类
package cn.tedu.demo2.m2;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplicationpublic class Main { public static void main(String[] args) { SpringApplication.run(Main.class, args); }}
测试类
package cn.tedu.demo2.m2;import org.junit.jupiter.api.Test;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.ActiveProfiles;@SpringBootTest@ActiveProfiles("demo2")public class Test2 { @Autowired private Producer producer; @Test public void test1(){ producer.send(); //为了能够收到消费者消费的数据,这里通过休眠模拟等待时间 try { Thread.sleep(30000); } catch (InterruptedException e) { e.printStackTrace(); } }}
关于"Springboot如何整合RocketMQ收发消息"这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,使各位可以学到更多知识,如果觉得文章不错,请把它分享出去让更多的人看到。
消息
生产者
消费
生产
整合
事务
消费者
篇文章
测试
更多
顺序
不错
实用
普通
成功
内容
对象
数据
文件
文件夹
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
各部门加强网络安全工作
上海java软件开发价格
中指数据库 如何安装
源代码上传到服务器
服务器集群部署
海珠app软件开发解决方案
服务器断开了连
山西水性软件开发发展现状
软件开发工作满意度调查问卷
sql数据库股票
主服务器认证错误
经济开发区容程网络技术工作室
广西日志审计服务器
老头环有中间服务器吗
阜阳app软件开发费用
杭州智赢网络技术有限公司
jxgl数据库
风筝守护为什么总是服务器异常
单台服务器并发极限
网络安全重点布局项目标志性建筑
刺客信条3重制版数据库鼠标没用
网络安全等保三级在哪办理
互联网科技发展高歌猛进
有关网络安全的感悟
信阳网络技术是什么
网络安全法 新媒体
mqtt服务器开源有哪些
领会全国网络安全工作会议
俞平安 网络安全
数据库8.0怎么转换语言