千家信息网

RocketMQ

发表于:2024-10-19 作者:千家信息网编辑
千家信息网最后更新 2024年10月19日,RocketMQ本文档主要是rocketmq实际代码使用,常见词语介绍等查看其他文档一 下载http://rocketmq.apache.org/release_notes/release-notes
千家信息网最后更新 2024年10月19日RocketMQ

RocketMQ

本文档主要是rocketmq实际代码使用,常见词语介绍等查看其他文档

一 下载

http://rocketmq.apache.org/release_notes/release-notes-4.3.2/ 二进制文件下载地址,下载后可以直接解压运行

https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.3.2/rocketmq-all-4.3.2-source-release.zip 源码方式下载地址, 下载后需要自己打包

二 启动

2.1 启动nameserver

进入rocketmq的bin目录

nohup sh mqnamesrv &

2.2 启动broker server

进入bin目录

nohup sh mqbroker -n localhost:9876 autoCreateTopicEnable=true &
集群方式参考集群配置文件RocketMQ集群

2.3 启动失败

默认情况下,我们的服务器都是单独的独立服务器,不会出现这种情况,但是我们在测试过程中使用的是虚拟机, 配置不够,会导致无法启动

修改runbroker.sh 和 runserver.sh

分别找到下图中的指示位置

修改内存大小即可,大小请自己按照自己虚拟机的配置适当调整,比如我修改为了以下值







三 图形化界面

此处非必须,实际开发中使用较少

下载rocketmq-console源码:https://github.com/apache/rocketmq-externals

进入子目录rocketmq-console

执行mvn命令打包

mvn clean package -DskipTests

进入target目录

rocketmq-console-ng-1.0.0.jar即为springBoot项目

在该目录下CMD执行命令:

java -jar rocketmq-console-ng-1.0.0.jar --server.port=12581 --rocketmq.config.namesrvAddr=10.89.0.65:9876
其中
--server.port为运行的这个web应用的端口,如果不设置的话默认为8080--rocketmq.config.namesrvAddrRocketMQ命名服务地址,如果不设置的话默认为""
OK了,访问下http://localhost:12581试试吧。

或者打包成 war 包扔到 tomcat 中运行

四 入门案例

此案例中使用的是一个消费者,所以消费者代码只有一个

4.1 pom.xml





org.apache.rocketmq
rocketmq-client
4.3.2


4.2 同步消息模式

原理:同步发送是指消息发送方发出数据后,会在收到接收方发回响应之后才发下一个数据包的通讯方式。

应用场景:此种方式应用场景非常广泛,例如重要通知邮件、报名短信通知、营销短信系统等。




4.2.1 生产者

/**
* Created by jackiechan on 18-8-19/下午8:37.
* 原理:同步发送是指消息发送方发出数据后,会在收到接收方发回响应之后才发下一个数据包的通讯方式。
*
* 应用场景:此种方式应用场景非常广泛,例如重要通知邮件、报名短信通知、营销短信系统等
*/
public class SyncProducer01 {
public static void main(String[] args) throws Exception {
//Instantiate with a producer group name.
DefaultMQProducer producer = new
DefaultMQProducer("group1");//groupname 同一个group代表是集群
//Launch the instance.
producer.setNamesrvAddr("192.168.3.8:9876");//设置nameserver地址
//设置实例名字
producer.setInstanceName("producer");//默认不需要设置,会以ip@pid作为名字, ip是机器ip,pid是jvmpid
producer.start();
for (int i = 0; i < 100; i++) {
//Create a message instance, specifying topic, tag and message body.
//topic和tags在消费者那边获取到消息后都可以获取, 可以通过tag区分消息
Message msg = new Message("TopicTest" /* Topic 消息所属的topic */,
"TagA" /* Tag */,
("Hello RocketMQ " +
i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
//Call send message to deliver message to one of brokers.
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
//Shut down once the producer instance is not longer in use.
producer.shutdown();
}
}

4.3 异步消息模式

原理:异步发送是指发送方发出数据后,不等接收方发回响应,接着发送下个数据包的通讯方式。MQ 的异步发送,需要用户实现异步发送回调接口(SendCallback),在执行消息的异步发送时,应用不需要等待服务器响应即可直接返回,通过回调接口接收务器响应,并对服务器的响应结果进行处理。

应用场景:异步发送一般用于链路耗时较长,对 RT 响应时间较为敏感的业务场景,例如用户视频上传后通知启动转码服务,转码完成后通知推送转码结果等。


4.3.1 生产者


/**
* Created by jackiechan on 18-8-19/下午10:05
*
* @author jackiechan
* 原理:异步发送是指发送方发出数据后,不等接收方发回响应,接着发送下个数据包的通讯方式。MQ 的异步发送,需要用户实现异步发送回调接口(SendCallback),在执行消息的异步发送时,应用不需要等待服务器响应即可直接返回,通过回调接口接收务器响应,并对服务器的响应结果进行处理。
*
* 应用场景:异步发送一般用于链路耗时较长,对 RT 响应时间较为敏感的业务场景,例如用户视频上传后通知启动转码服务,转码完成后通知推送转码结果等。
*/
public class AsyncProducer02 {
public static void main(String[] args) throws Exception {
//Instantiate with a producer group name.
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
//Launch the instance.
//Launch the instance.
producer.setNamesrvAddr("192.168.3.8:9876");//设置nameserver地址
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(0);
for (int i = 0; i < 100; i++) {
final int index = i;
//Create a message instance, specifying topic, tag and message body.
//消息的keys可以作为标记或者传递其他消息内容,可以在消费者获取到消息后获取keys进行区分
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
//发送异步消息, 通过设置回调来接受服务器给我们返回的消息
producer.send(msg, new SendCallback() {
//当发送成功的时候执行的方法
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%-10d OK %s %n", index,
sendResult.getMsgId());
}
//当发送失败的时候执行
@Override
public void onException(Throwable e) {
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
}
//Shut down once the producer instance is not longer in use.
//当发送异步消息的时候,producer 不要shutdown,因为回调是异步的,可能在收到回调的时候producer关闭了会出错
// producer.shutdown();
}
}

4.4 单向模式

原理:单向(Oneway)发送特点为只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。

应用场景:适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。


4.4.1 生产者


/**
* Created by jackiechan on 18-8-19/下午10:25
*
* @author jackiechan
* 原理:单向(Oneway)发送特点为只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。
*
* 应用场景:适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。
*/
public class OnewayProducer03 {
public static void main(String[] args) throws Exception{
//Instantiate with a producer group name.
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
//Launch the instance.
producer.setNamesrvAddr("192.168.3.8:9876");//设置nameserver地址
producer.start();
for (int i = 0; i < 100; i++) {
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " +
i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
//Call send message to deliver message to one of brokers.
producer.sendOneway(msg);

}
//Shut down once the producer instance is not longer in use.
producer.shutdown();
}
}

4.5消费者

此消费者可以接收上面三种不同的消息


/**
* Created by jackiechan on 18-8-19/下午9:50
*
* @authoe jackiechan
*/
public class MqConsumer {

public static void main(String[] args) {
//同一个group代表是集群
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushConsumer_yll");
consumer.setNamesrvAddr("192.168.3.8:9876");
try {
consumer.subscribe("TopicTest", "TagA||TagB");//可订阅多个tag,但是一个消息只能有一个tag
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
Message msg = list.get(0);
//输出消息内容
System.out.println("收到消息了:"+new String(msg.getBody()));
//此处可以根据消息的tag或者keys来区分消息
if (msg.getTags() != null&&msg.getTags().equals("TagA")) {
//执行TagA的逻辑
System.out.println("收到的是taga的消息");
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
} catch (MQClientException e) {
System.out.println("出错了");
}
}
}


五 顺序消费

消息顺序

消息顺序是只可以按照消息发送的顺序进行消费。一个订单产生3条消息,订单创建、付款、订单完成。消费时只有按照顺序消费才有意义,不可能先消费付款消息再消费订单创建消息,这样就乱了。另外,多笔订单又可以并行消费。如何保证呢?

一个订单产生的消息只能发送给同一个MQ服务器中的同一个分区,并且按顺序发送,这样才能在理论上保证消费者消费时是按照顺序消费的,因为一个分区就是一个逻辑队列。生产者虽然按顺序发送,但是第一条消息到达MQ的耗时比第二条多,那么第二条则会被先消费,这样就又导致消费时不是顺序的。那么如何解决呢?可以采取只有第一条被消费者消费成功后再发送第二条。看下图:


但是如果第一条被发送到消费者后,消费者没有响应(消费者发送响应但是因为网络问题丢失或者消费者就没有收到消息),那么在这种情况下你是继续发送第二条还是重发第一条呢?如果是严格消息顺序,那肯定是重发第一条,但是如果是消费者消费后的响应丢失了,那么重发第一条就会造成重复消费。

从另外一方面看,如果不考虑网络异常,那么要实现严格消息,就必须采取一种一对一关系,生产者A的消息对应到MQ服务器1的X队列,消费者A消费X队列。这样串行结构就会造成系统吞吐量太低;更多异常需要处理比如消费端出现问题,那么整个消息队列就会出现阻塞。RocketMQ通过轮询所有队列来确定消息发送到哪一个队列(负载均衡),比如相同订单号的消息会被先后发送到统一队列中。所以RocketMQ

消息重复

造成消费重复的根本原因是网络不可达,只要有网络,这种网络的不稳定因素就存在你无法规避。所以解决这个问题的最好办法就是绕过它。这就变成了,消费端收到两个一样的消息后如何处理,而不是从发送端解决不发送2个一样的消息。对于消费端的要求就是:

  • 消费端处理业务消息要保持幂等性,也就是同一个东西执行多次会得到相同结果

  • 保证每条消息都有唯一编号切保证消息处理成功与去重表的日志同时出现

第一条好理解,第二条就是利用一张日志表来记录已经处理成功的消息ID,如果新到的消息ID已经存在表中那么就不再处理这个消息。第一条是在消费端实现的,不属于消息系统的功能;第二条可以是消息系统实现也可以是业务端实现,处于对消息系统的吞吐量和高可用考虑最好还是由消费端去处理。所以这也就是RocketMQ不解决消息重复的原因

5.1 生产者


/**
* Created by jackiechan on 18-8-20/上午12:08
*
* @author jackiechan
*/
public class OrderedProducer {
public static void main(String[] args) throws Exception {
//Instantiate with a producer group name.
MQProducer producer = new DefaultMQProducer("example_group_name");
((DefaultMQProducer) producer).setNamesrvAddr(ServerUtil.SERVERADD);//设置服务器地址,请替换为自己的服务器地址
//Launch the instance.
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 100; i++) {
int orderId = i % 10;
int a=i;
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ==> " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List mqs, Message msg, Object arg) {

// arg的值其实就是orderId
Integer id = (Integer) arg;

// mqs是队列集合,也就是topic所对应的所有队列
int index = id % mqs.size();

// 这里根据前面的id对队列集合大小求余来返回所对应的队列
System.out.println(index+"====>"+a);
return mqs.get(index);

}
}, orderId);

// System.out.printf("%s%n", sendResult);
}
//server shutdown
producer.shutdown();
}
}

5.2 消费者

消费者有多个,代码一致


/**
* Created by jackiechan on 18-8-20/上午12:08
*
* @author jackiechan
* 顺序消费的场景,一个业务需要从头到尾按照固定顺序执行, 比如订单的顺序是 创建订单-支付-发货,必须按照这个顺序执行, 就可以通过顺序消费来解决这个问题
*/
public class OrderedConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
consumer.setNamesrvAddr(ServerUtil.SERVERADD);//设置服务器地址,实际开发替换为自己的地址
/**
* 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
* 如果非第一次启动,那么按照上次消费的位置继续消费
* 这里设置的是一个consumer的消费策略
* CONSUME_FROM_LAST_OFFSET 默认策略,从该队列最尾开始消费,即跳过历史消息
* CONSUME_FROM_FIRST_OFFSET 从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍
* CONSUME_FROM_TIMESTAMP 从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时以前
*
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

consumer.subscribe("TopicTestjjj", "TagA || TagC || TagD");
//设置一个Listener,主要进行消息的逻辑处理
//注意这里使用的是MessageListenerOrderly这个接口
consumer.registerMessageListener(new MessageListenerOrderly() {

AtomicLong consumeTimes = new AtomicLong(0);
@Override
public ConsumeOrderlyStatus consumeMessage(List msgs,
ConsumeOrderlyContext context) {
//返回消费状态
//SUCCESS 消费成功
//SUSPEND_CURRENT_QUEUE_A_MOMENT 消费失败,暂停当前队列的消费

context.setAutoCommit(false);//手动提交
System.out.printf(Thread.currentThread().getName()+"消费者1===>" + msgs.get(0).getQueueId() + "%n"+new String(msgs.get(0).getBody())+ "%n");
this.consumeTimes.incrementAndGet();
//以下内容模拟收消息失败,或者回滚等操作
// if ((this.consumeTimes.get() % 2) == 0) {
// return ConsumeOrderlyStatus.SUCCESS;
// } else if ((this.consumeTimes.get() % 3) == 0) {
// return ConsumeOrderlyStatus.ROLLBACK;
// } else if ((this.consumeTimes.get() % 4) == 0) {
// return ConsumeOrderlyStatus.COMMIT;
// } else if ((this.consumeTimes.get() % 5) == 0) {
// context.setSuspendCurrentQueueTimeMillis(3000);
// return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
// }
return ConsumeOrderlyStatus.SUCCESS;

}
});

consumer.start();

System.out.printf("Consumer Started.%n");
}
}

经过测试发现,不同队列的消息收取是无序的,但是同一队列中消息的收取顺序是按照发送顺序收取的

六 广播模式

6.1 生产者

/**
* Created by jackiechan on 2018/8/20/上午10:22
*/
public class BroadcastProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr(ServerUtil.SERVERADD);//设置服务器地址
producer.start();
for (int i = 0; i < 100; i++){
//发送消息
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
("Hello world==>"+i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
}
}

6.2 消费者

消费者有多个,代码一致


/**
* Created by jackiechan on 2018/8/20/上午10:23
* 广播模式的应用场景, 一个业务执行完成后需要多个不同的后续业务都执行,那么他们都需要知道前置业务完成,所以大家监听相同消息,同时获取消息
* 比如 电商中商品更新完成后, 可能会需要同时更新 redis 缓存与 solr 搜索引擎
*/
public class BroadcastConsumer1 {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
consumer.setConsumeMessageBatchMaxSize(10);//每次拉取十条
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setNamesrvAddr(ServerUtil.SERVERADD);
//set to broadcast mode,设置消费模式为广播
consumer.setMessageModel(MessageModel.BROADCASTING);

consumer.subscribe("TopicTest", "TagA || TagC || TagD");

consumer.registerMessageListener(new MessageListenerConcurrently() {

@Override
public ConsumeConcurrentlyStatus consumeMessage(List msgs,
ConsumeConcurrentlyContext context) {
System.out.printf(Thread.currentThread().getName() + " 消费者1收到消息 : " + new String(msgs.get(0).getBody()) + "%n");

0