千家信息网

微服务架构设计RocketMQ基础及环境整合的方法是什么

发表于:2025-02-21 作者:千家信息网编辑
千家信息网最后更新 2025年02月21日,本篇内容主要讲解"微服务架构设计RocketMQ基础及环境整合的方法是什么",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"微服务架构设计RocketMQ基础
千家信息网最后更新 2025年02月21日微服务架构设计RocketMQ基础及环境整合的方法是什么

本篇内容主要讲解"微服务架构设计RocketMQ基础及环境整合的方法是什么",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"微服务架构设计RocketMQ基础及环境整合的方法是什么"吧!

概述&选型

消息队列作为高并发系统的核心组件之一,能够帮助业务系统解构提升开发效率和系统稳定性。主要用于三种典型场景:应用解耦、流量消峰、消息分发。

目前主流的MQ主要是Rocketmq、kafka、Rabbitmq,Rocketmq相比于Rabbitmq、kafka具有主要优势特性有:

支持事务型消息(消息发送和DB操作保持两方的最终一致性,rabbitmq和kafka不支持)

支持结合rocketmq的多个系统之间数据最终一致性(多方事务,二方事务是前提)

支持18个级别的延迟消息(rabbitmq和kafka不支持)

支持指定次数和时间间隔的失败消息重发(kafka不支持,rabbitmq需要手动确认)

支持consumer端tag过滤,减少不必要的网络传输(rabbitmq和kafka不支持)

支持重复消费(rabbitmq不支持,kafka支持)

单机安装配置

工欲善其事必先利其器,要想深入了解RocketMQ得先把环境安装好,咱们先开始单机版RocketMQ的安装!

解压安装
unzip rocketmq-all-4.7.0-bin-release.zip

启动 Name Server
> nohup sh bin/mqnamesrv &

查看 Name Server启动日志
> tail -f ~/logs/rocketmqlogs/namesrv.log

启动 Broker Server
> nohup sh bin/mqbroker -n localhost:9876 &

查看 Broker Server 启动日志
> tail -f ~/logs/rocketmqlogs/broker.log

单机情况下安装使用RocketMQ很简单,只需要分别启动NameServer和Broker Server即可!

关闭RockerMQ需要使用下面的命令:

# 先关闭Broker Server
> sh bin/mqshutdown broker
# 再关闭NameServer
> sh bin/mqshutdown namesrv

双机主从高可用搭建

为了消除单机故障,增加可靠性或增大吞吐量,可以在多台服务器上部署多个NameServer和Broker,并为每个Broker部署一个或多个Slave。本节将说明使用两台机器,搭建双主、双从、无单点故障的高可用RocketMQ集群。假设现在有两台服务器,IP地址分别为:192.168.100.43和192.168.100.44,部署架构如下:

启动多个NameServer 和 Broker

首先需要在两台服务器上分别启动NameServer(nohup sh bin/mqnamesrv &),这样我们就得到了一个无单点的NameServer服务,服务地址为192.168.100.43:9876和192.168.100.44:9876。

然后在两台服务器中RocketMQ的conf目录分别建立两个文件 broker-master.properties,broker-slave.properties,下面是不同服务器的配置说明:

192.168.100.43 机器上的broker-master.properties文件:

namesrvAddr = 192.168.100.43:9876;192.168.100.44:9876brokerClusterName = DefaultClusterbrokerName = broker-abrokerId = 0deleteWhen = 04fileReservedTime = 48brokerRole = SYNC_MASTERflushDiskType = ASYNC_FLUSHlistenPort = 10911storePathRootDir = /app/rocketmq/store-a

192.168.100.43 机器上的broker-slave.properties文件:

namesrvAddr = 192.168.100.43:9876;192.168.100.44:9876brokerClusterName = DefaultClusterbrokerName = broker-bbrokerId = 1deleteWhen = 04fileReservedTime = 48brokerRole = SLAVEflushDiskType = ASYNC_FLUSHlistenPort = 11011storePathRootDir = /app/rocketmq/store-b

192.168.100.44 机器上的broker-master.properties文件:

namesrvAddr = 192.168.100.43:9876;192.168.100.44:9876brokerClusterName = DefaultClusterbrokerName = broker-bbrokerId = 0deleteWhen = 04fileReservedTime = 48brokerRole = SYNC_MASTERflushDiskType = ASYNC_FLUSHlistenPort = 10911storePathRootDir = /app/rocketmq/store-b

192.168.100.44 机器上的broker-slave.properties文件:

namesrvAddr = 192.168.100.43:9876;192.168.100.44:9876brokerClusterName = DefaultClusterbrokerName = broker-abrokerId = 1deleteWhen = 04fileReservedTime = 48brokerRole = SLAVEflushDiskType = ASYNC_FLUSHlistenPort = 11011storePathRootDir = /app/rocketmq/store-a

然后分别使用如下命令启动两台服务器的主节点和从节点

nohup sh bin/mqbroker -c conf/broker-master.properties &
nohup sh bin/mqbroker -c conf/broker-slave.properties &

这样一个高可用的RockerMQ集群就搭建好了,我们登陆可视化运维管理界面查看集群状态,集群正常启动。

重要参数说明

本节主要是对Broker的配置文件中用到的参数进行说明

namesrvAddr = 192.168.100.43:9876;192.168.100.44:9876
指定NameServer的地址,可以是多个。

brokerClusterName = DefaultCluster
Cluster地址,如果集群数量比较多,可以分成多个Cluster,每个Cluster供一个业务群使用。

brokerName = broker-a
Broker的名称,Master 和Slave 通过使用相同的 Broker 名称来表明相互关系,以说明某个Slave 是哪个Master 的 Slave。

brokerId = 1
一个Master可以有多个Slave,0表示Master,大于0的表示不同Slave的ID。

fileReservedTime = 48
在磁盘上保存消息的时长,单位是小时,自动删除超时的消息。

deleteWhen = 04
与 fileReservedTime 参数对应,表明在几点做消息删除动作,默认是凌晨4点。

brokerRole = SYNC_MASTER

brokerRole的可选参数有SYNC_MASTER,ASYNC_MASTER,SLAVE三种。SYNC 和ASYNC 表示MASTER 和SLAVE 之间同步消息的机制,SYNC的意思是当Slave 和 Master 的消息同步完成后再返回发送成功的状态。

flushDiskType = ASYNC_FLUSH

flushDiskType 表示刷盘策略,可选值有ASYNC_FLUSH 和 SYNC_FLUSH两种,分别代表同步刷盘和异步刷盘。同步情况下,消息只有真正写入磁盘才返回成功状态;异步情况下,消息写入page_cache后就返回成功状态。

listenPort = 11011
Broker监听的端口,一台服务器启动多个Broker,需要设置不同的监听端口避免端口冲突。

storePathRootDir = /app/rocketmq/store-a
存储消息以及配置信息的根目录。

可视化管理平台

RocketMQ可以使用rocketmq-externals作为运维管理平台,Github地址https://github.com/apache/rocketmq-externals,我们需要将源码下载下来后再进行手动编译,过程如下:

下载
从github(https://github.com/apache/rocketmq-externals) 下载RocketMQ可视化管理工具 rocketmq-externals 的源码;

打包
下载完成后切换进rocketmq-console目录,使用maven命令对其打包 mvn clean package -Dmaven.test.skip=true
打包完成后生成可执行文件rocketmq-console-ng-1.0.1.jar

运行
使用 java -jar rocketmq-console-ng-1.0.1.jar --server.port=8080 --rocketmq.config.namesrvAddr=xxxx.xxx.xxx.xxx:9876 命令启动

这里注意需要设置两个参数:
--server.port 为运行的这个web应用的端口,如果不设置的话默认为8080;
--rocketmq.config.namesrvAddr 为RocketMQ命名服务地址,若NameServer为集群则使用英文 ; 分割

访问
浏览器访问 xxx.xxx.xxx.xxx:8080 进入控制台界面,效果如下

SpringBoot整合RocketMQ

在SpringBoot中整合RocketMQ主要用到 rocketmq-spring-boot-starter 组件,下面是详细整合过程。

引入组件rocketmq-spring-boot-starter 依赖

        org.apache.rocketmq        rocketmq-spring-boot-starter        2.1.0

修改application.yml,添加RocketMQ相关配置

rocketmq:  name-server: 192.168.100.43:9876;192.168.100.44:9876  producer:    group: test-group    send-message-timeout: 3000

如果是集群,多个name-server使用英文 ; 分割。

编写消息生产者 MessageProduce

/** * Description: * rocketMQ消息发送方法 * @author javadaily */@Componentpublic class MessageProduce {    @Autowired    private RocketMQTemplate rocketMQTemplate;    /**     * 发送消息     * @param topic 主题     * @param message 消息体     */    public void sendMessage(String topic,String message){        this.rocketMQTemplate.convertAndSend(topic,message);    }}

使用RocketMQTemplate发送消息

编写消息消费者 MessageConsumer

@Slf4j@Component@RocketMQMessageListener(        topic = "test-topic",        consumerGroup = "test-group",        selectorExpression = "*")public class MessageConsumer implements RocketMQListener {    @Override    public void onMessage(String message) {        log.info("received message is {}", message);    }}

消费者只需要继承RocketMQListener类即可,主要关注实现类上的 @RocketMQMessageListener 注解,配置的 topicconsumerGroup 需要跟消息生产者的配置保持一致。

编写单元测试发送消息

@RunWith(SpringRunner.class)@SpringBootTestpublic class MessageProduceTest {    @Autowired    private MessageProduce messageProduce;    @Test    public void testSendMessage() {        messageProduce.sendMessage("test-topic","Hello,JAVA日知录");    }}

测试

先启动springboot应用,再执行测试用例。

到此,相信大家对"微服务架构设计RocketMQ基础及环境整合的方法是什么"有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

消息 服务 支持 多个 文件 服务器 集群 配置 整合 地址 方法 机器 架构 环境 单机 参数 命令 状态 端口 系统 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 路由器上网显示服务器无反应 有些软件点开为啥老是服务器异常 河南大学网络安全中心 广电网络安全施工队培训记录 轮巡负载均衡落到固定几台服务器 县网络安全信息化领导小组 网络安全公司的会计处理 网络安全技术研究进度计划 鄞州一站式软件开发流程 长沙纯科网络技术有限公司 网络安全包括五个方面 少年派网络技术有限公司主销产品 法律保护网络安全的手抄报 txt如何模拟数据库 学院网络安全工作要点2021 上海停车系统软件开发机构 数据库物理结构设计数据表设计 拿什么描述网络安全 泰兴直销网络技术大概费用 易神州网络技术有限公司 泰安景区计算机网络技术人员 delphi连接sql数据库 新炬网络最大数据库 江苏服务器供应商虚拟主机 宁波软件开发驻场服务费 民宿软件开发的模板 湖北武汉遇法网络技术 广东专升本数据库真题 网络安全问题情侣手链 北京软件开发月薪
0