Linux下怎么部署分布式消息系统RocketMQ
本篇内容主要讲解"Linux下怎么部署分布式消息系统RocketMQ",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"Linux下怎么部署分布式消息系统RocketMQ"吧!
一、本篇所需文件下载
链接:https://pan.baidu.com/s/17iUB1lBOjv4CBAEQFvn65A 提取码:v0sn
一、Linux环境搭建
1、安装 jdk环境
RocketMQ java编写,需要jdk环境
下载jdk 1.7.0_80 上传到linux ,必须64位,32位RocketMQ不支持
tar -zxvf jdk-7u80-linux-x64.tar.gz //解压
修改环境变量 vim /etc/profile
export JAVA_HOME=/usr/local/jdk1.7.0_80export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jarexport PATH=$JAVA_HOME/bin:$PATH
刷新配置
source /etc/profile
或jdk1.8下载安装教程:https://blog.csdn.net/qq_41463655/article/details/99173682
2、安装RocketMQ
2.1、上传alibaba-rocketmq-3.2.6.tar.gz 上传到linux解压安装
tar -zxvf alibaba-rocketmq-3.2.6.tar.gz -C /usr/local //解压到 /usr/localmv /usr/local/alibaba-rocketmq /usr/local/alibaba-rocketmq-3.2.6 //重命名ln -s /usr/local/alibaba-rocketmq-3.2.6 rocketmq //安装
安装好了
2.2、创建存储路径
cd /usr/local/rocketmqmkdir storemkdir store/commitlogmkdir store/consumequeuemkdir store/index
2.3、日志配置
cd /usr/local/rocketmqmkdir logs cd conf && sed -i 's#${user.home}#/usr/local/rocketmq#g' *.xml
2.4、配置 broker-a.properties / broker-b.properties /usr/local/rocketmq/conf/2m-noslave/ 目录下
2.4.1、broker-a.properties
#所属集群名字brokerClusterName=rocketmq-cluster#broker名字,注意此处不同的配置文件填写的不一样brokerName=broker-a|broker-b#0 表示 Master,>0 表示 SlavebrokerId=0#nameServer地址,分号分割namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数defaultTopicQueueNums=4#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭autoCreateTopicEnable=true#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭autoCreateSubscriptionGroup=true#Broker 对外服务的监听端口listenPort=10911#删除文件时间点,默认凌晨 4点deleteWhen=04#文件保留时间,默认 48 小时fileReservedTime=120#commitLog每个文件的大小默认1GmapedFileSizeCommitLog=1073741824#ConsumeQueue每个文件默认存30W条,根据业务情况调整mapedFileSizeConsumeQueue=300000#destroyMapedFileIntervalForcibly=120000#redeleteHangedFileInterval=120000#检测物理文件磁盘空间diskMaxUsedSpaceRatio=88#存储路径storePathRootDir=/usr/local/rocketmq/store#commitLog 存储路径storePathCommitLog=/usr/local/rocketmq/store/commitlog#消费队列存储路径存储路径storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue#消息索引存储路径storePathIndex=/usr/local/rocketmq/store/index#checkpoint 文件存储路径storeCheckpoint=/usr/local/rocketmq/store/checkpoint#abort 文件存储路径abortFile=/usr/local/rocketmq/store/abort#限制的消息大小maxMessageSize=65536#flushCommitLogLeastPages=4#flushConsumeQueueLeastPages=2#flushCommitLogThoroughInterval=10000#flushConsumeQueueThoroughInterval=60000#Broker 的角色#- ASYNC_MASTER 异步复制Master#- SYNC_MASTER 同步双写Master#- SLAVEbrokerRole=ASYNC_MASTER#刷盘方式#- ASYNC_FLUSH 异步刷盘#- SYNC_FLUSH 同步刷盘flushDiskType=ASYNC_FLUSH#checkTransactionMessageEnable=false#发消息线程池数量#sendMessageThreadPoolNums=128#拉消息线程池数量#pullMessageThreadPoolNums=128
2.4.2、broker-b.properties
#所属集群名字brokerClusterName=rocketmq-cluster#broker名字,注意此处不同的配置文件填写的不一样brokerName=broker-a|broker-b#0 表示 Master,>0 表示 SlavebrokerId=0#nameServer地址,分号分割namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数defaultTopicQueueNums=4#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭autoCreateTopicEnable=true#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭autoCreateSubscriptionGroup=true#Broker 对外服务的监听端口listenPort=10911#删除文件时间点,默认凌晨 4点deleteWhen=04#文件保留时间,默认 48 小时fileReservedTime=120#commitLog每个文件的大小默认1GmapedFileSizeCommitLog=1073741824#ConsumeQueue每个文件默认存30W条,根据业务情况调整mapedFileSizeConsumeQueue=300000#destroyMapedFileIntervalForcibly=120000#redeleteHangedFileInterval=120000#检测物理文件磁盘空间diskMaxUsedSpaceRatio=88#存储路径storePathRootDir=/usr/local/rocketmq/store#commitLog 存储路径storePathCommitLog=/usr/local/rocketmq/store/commitlog#消费队列存储路径存储路径storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue#消息索引存储路径storePathIndex=/usr/local/rocketmq/store/index#checkpoint 文件存储路径storeCheckpoint=/usr/local/rocketmq/store/checkpoint#abort 文件存储路径abortFile=/usr/local/rocketmq/store/abort#限制的消息大小maxMessageSize=65536#flushCommitLogLeastPages=4#flushConsumeQueueLeastPages=2#flushCommitLogThoroughInterval=10000#flushConsumeQueueThoroughInterval=60000#Broker 的角色#- ASYNC_MASTER 异步复制Master#- SYNC_MASTER 同步双写Master#- SLAVEbrokerRole=ASYNC_MASTER#刷盘方式#- ASYNC_FLUSH 异步刷盘#- SYNC_FLUSH 同步刷盘flushDiskType=ASYNC_FLUSH#checkTransactionMessageEnable=false#发消息线程池数量#sendMessageThreadPoolNums=128#拉消息线程池数量#pullMessageThreadPoolNums=128
两个配置文件需修改处
brokerName=broker-a|broker-b 集群a服务器配置修改为 brokerName=broker-abrokerName=broker-a|broker-b 集群b服务器配置修改为 brokerName=broker-b
2.5、修改启动参数 /rocketm/bin下 (jvm)
runbroker.sh 的JAVA_OPT runserver.sh 的JAVA_OPT
JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:PermSize=128m -XX:MaxPermSize=320m"修改为JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m -XX:PermSize=128m -XX:MaxPermSize=320m"
2.6、启动 NameServer 安装目录 /usr/local/ /rocketmq/bin 目录下
nohup sh mqnamesrv &
2.7、启动 BrokerServer /rocketmq/bin 目录下
nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-noslave/broker-a.properties >/dev/null 2>&1 &netstat -ntlp
查看启动状态
jps
结果如下启动成功
3.修改linux 服务器host
本机ip,配置域名
192.168.177.128 rocketmq-nameserver1192.168.177.128 rocketmq-master1192.168.111.129 rocketmq-nameserver2192.168.111.129 rocketmq-master2
图片
4.安装后台管理平台
解压安装 tomcat 7.0到 /usr/local/
tar -zxvf apache-tomcat-7.0.65.tar.gz -C /usr/local
把rocketmq-web-console.war 复制到apache-tomcat-7.0.65 的webapps 目录下 启动tomcat 自动解压,然后修改config /rocketmq-web-console/WEB-INF/classes 的 config.properties 配置 修改ip
单服务器rocketmq.namesrv.addr=192.168.177.128:9876多服务器 rocketmq.namesrv.addr=192.168.177.128:9876;192.168.177.129:9876
关闭tomcat / 重启tomcat
关闭防火墙
systemctl disable firewalld 或 chkconfig iptables off
访问 —-》 ip:8080/rocketmq-web-console 出现下方界面就ok了
java 操作
1、生产者
import com.alibaba.rocketmq.client.exception.MQClientException;import com.alibaba.rocketmq.client.producer.DefaultMQProducer;import com.alibaba.rocketmq.client.producer.SendResult;import com.alibaba.rocketmq.common.message.Message;public class Producer { public static void main(String[] args) throws MQClientException { DefaultMQProducer producer = new DefaultMQProducer("rmq-group"); producer.setNamesrvAddr("192.168.177.128:9876;192.268.177.129:9876"); producer.setInstanceName("producer"); producer.start(); try { for (int i = 0; i "test-topic", "TagA", ("test-topic-"+i).getBytes() ); SendResult sendResult = producer.send(msg); System.out.println(sendResult.toString()); } } catch (Exception e) { e.printStackTrace(); } producer.shutdown(); }}
2、消费者
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;import com.alibaba.rocketmq.client.exception.MQClientException;import com.alibaba.rocketmq.common.message.MessageExt;import java.util.List;public class Consumer { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group"); consumer.setNamesrvAddr("192.168.177.128:9876;192.268.177.129:9876"); consumer.setInstanceName("consumer"); consumer.subscribe("test-topic", "TagA"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println(msg.getMsgId()+"---"+new String(msg.getBody())); } //返回成功消费状态 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer Started."); }}
会出现幂等问题,使用全局id,或者时间戳,业务的唯一id 进行判断,使用redis等日志记录判断是否存在,存在表示已经成功消费
到此,相信大家对"Linux下怎么部署分布式消息系统RocketMQ"有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!