千家信息网

RocketMQ延迟消息的实现方法

发表于:2025-02-03 作者:千家信息网编辑
千家信息网最后更新 2025年02月03日,这篇文章主要介绍"RocketMQ延迟消息的实现方法",在日常操作中,相信很多人在RocketMQ延迟消息的实现方法问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"Roc
千家信息网最后更新 2025年02月03日RocketMQ延迟消息的实现方法

这篇文章主要介绍"RocketMQ延迟消息的实现方法",在日常操作中,相信很多人在RocketMQ延迟消息的实现方法问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"RocketMQ延迟消息的实现方法"的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

延时消息即消息发送后并不立即对消费者可见,而是在用户指定的时间投递给消费者。比如我们现在发送一条延时30秒的消息,消息发送后立即发送给服务器,但是服务器在30秒后才将该消息交给消费者。

RocketMQ通过配置的延迟级别延迟消息投递到消费者,其中不同的延迟级别对应不同的延迟时间,可配置,默认的延迟级别有18种,分别是1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,支持时间单位 s 秒 m分钟 h小时 d天。

源码 MessageStoreConfig.java 是定义如下:

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

可以在brocker配置 messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,自定义其时间级别。

1、代码验证

前提:先启动消费者等待消息的发送,先发送消息,消费者启动需要时间,影响测试结果。

1.1、生产者Producer

public class DelayProducer {                 public static void main(String[] args) throws MQClientException, InterruptedException {                                 DefaultMQProducer producer = new DefaultMQProducer("producer_test");                producer.setNamesrvAddr("10.10.12.203:9876;10.10.12.204:9876");                producer.start();                SimpleDateFormat sd = new SimpleDateFormat("YYYY-MM-dd HH:mm:ss");                for (int i = 0; i < 10; i++) {                    try {                            //构建消息                            Message msg = new Message("TopicTest" /* Topic */,                                "TagA" /* Tag */,                                ("延迟消息" + i).getBytes(RemotingHelper.DEFAULT_CHARSET)                            );                            //延时的级别为3 对应的时间为10s 就是发送后延时10S在把消息投递出去                            msg.setDelayTimeLevel(3);                            SendResult sendResult = producer.send(msg);                                                        System.out.printf("%s%n", sd.format(new Date())+" == "+sendResult);                    } catch (Exception e) {                        e.printStackTrace();                        Thread.sleep(1000);                    }                }                producer.shutdown();        }}

查看结果:

1.2、消费者Consumer

public class DelayConsumer {        public static void main(String[] args) {                try {                        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();                        consumer.setConsumerGroup("consumer_delay");                        consumer.setNamesrvAddr("10.10.12.203:9876;10.10.12.204:9876");                        consumer.subscribe("TopicTest", "*");                        consumer.registerMessageListener(new MessageListenerConcurrently(){                                @Override                                public ConsumeConcurrentlyStatus consumeMessage(List paramList,                                                ConsumeConcurrentlyContext paramConsumeConcurrentlyContext) {                                        try {                                            for(MessageExt msg : paramList){                                                    String msgbody = new String(msg.getBody(), "utf-8");                                                    SimpleDateFormat sd = new SimpleDateFormat("YYYY-MM-dd HH:mm:ss");                                                    System.out.println("接收时间 :  "+ sd.format(new Date()) +" == MessageBody: "+ msgbody);//输出消息内容                                            }                                        } catch (Exception e) {                                            e.printStackTrace();                                            return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再试                                        }                                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消费成功                                }                        });                        consumer.start();                        System.out.println("DelayConsumer===启动成功!");                } catch (Exception e) {                        // TODO Auto-generated catch block                        e.printStackTrace();                }        }}

查看结果:

2、内部机制分析

查看其消息投递的核心方法org.apache.rocketmq.store.CommitLog.putMessage

    public PutMessageResult putMessage(final MessageExtBrokerInner msg) {        //设置消息存储到文件中的时间        msg.setStoreTimestamp(System.currentTimeMillis());        // Set the message body BODY CRC (consider the most appropriate setting        // on the client)        msg.setBodyCRC(UtilAll.crc32(msg.getBody()));        // Back to Results        AppendMessageResult result = null;        StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();        String topic = msg.getTopic();        int queueId = msg.getQueueId();        final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());        if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE            || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {            // Delay Delivery消息的延迟级别是否大于0            if (msg.getDelayTimeLevel() > 0) {                    //如果消息的延迟级别大于最大的延迟级别则置为最大延迟级别                if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {                    msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());                }                //将消息主题设置为SCHEDULE_TOPIC_XXXX                topic = ScheduleMessageService.SCHEDULE_TOPIC;                //将消息队列设置为延迟的消息队列的ID                queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());                // Backup real topic, queueId                //消息的原有的主题和消息队列存入属性中                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));                msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));                msg.setTopic(topic);                msg.setQueueId(queueId);            }        }        long eclipseTimeInLock = 0;        MappedFile unlockMappedFile = null;        //获取最后一个消息的映射文件,mappedFileQueue可看作是CommitLog文件夹下的一个个文件的映射        MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();        //写入消息之前先申请putMessageLock,也就是保证消息写入CommitLog文件中串行的        putMessageLock.lock(); //spin or ReentrantLock ,depending on store config        try {            long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();            this.beginTimeInLock = beginLockTimestamp;            // Here settings are stored timestamp, in order to ensure an orderly            // global            //设置消息的存储时间            msg.setStoreTimestamp(beginLockTimestamp);            //mappedFile==null标识CommitLog文件还未创建,第一次存消息则创建CommitLog文件            //mappedFile.isFull()表示mappedFile文件已满,需要重新创建CommitLog文件            if (null == mappedFile || mappedFile.isFull()) {                    //里面的参数0代表偏移量                mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise            }            //mappedFile==null说明创建CommitLog文件失败抛出异常,创建失败可能是磁盘空间不足或者权限不够            if (null == mappedFile) {                log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());                beginTimeInLock = 0;                return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);            }            //mappedFile文件后面追加消息            result = mappedFile.appendMessage(msg, this.appendMessageCallback);            switch (result.getStatus()) {                case PUT_OK:                    break;                case END_OF_FILE:                    unlockMappedFile = mappedFile;                    // Create a new file, re-write the message                    mappedFile = this.mappedFileQueue.getLastMappedFile(0);                    if (null == mappedFile) {                        // XXX: warn and notify me                        log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());                        beginTimeInLock = 0;                        return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);                    }                    result = mappedFile.appendMessage(msg, this.appendMessageCallback);                    break;                case MESSAGE_SIZE_EXCEEDED:                case PROPERTIES_SIZE_EXCEEDED:                    beginTimeInLock = 0;                    return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);                case UNKNOWN_ERROR:                    beginTimeInLock = 0;                    return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);                default:                    beginTimeInLock = 0;                    return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);            }            eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;            beginTimeInLock = 0;        } finally {                //释放锁            putMessageLock.unlock();        }        if (eclipseTimeInLock > 500) {            log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", eclipseTimeInLock, msg.getBody().length, result);        }        if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {            this.defaultMessageStore.unlockMappedFile(unlockMappedFile);        }        PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);        // Statistics        storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();        storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());        //消息刷盘        handleDiskFlush(result, putMessageResult, msg);        //主从数据同步复制        handleHA(result, putMessageResult, msg);        return putMessageResult;    }

我们发现在通过putMessage 延迟消息就被放存放到了主题为 SCHEDULE_TOPIC_XXXX的commitlog中,消息的原有的主题和消息队列存入属性中,后面再通过定时的方式对这这些消息进行重新发送。

ScheduleMessageService.start()启动会为每一个延迟队列创建一个调度任务每一个调度任务对应SCHEDULE_TOPIC_XXXX主题下的一个消息消费队列。

    public void start() {        for (Map.Entry entry : this.delayLevelTable.entrySet()) {            Integer level = entry.getKey();            Long timeDelay = entry.getValue();            Long offset = this.offsetTable.get(level);            if (null == offset) {                offset = 0L;            }            if (timeDelay != null) {                this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);            }        }        this.timer.scheduleAtFixedRate(new TimerTask() {            @Override            public void run() {                try {                    ScheduleMessageService.this.persist();                } catch (Throwable e) {                    log.error("scheduleAtFixedRate flush exception", e);                }            }        }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());    }

定时任务的实现类DeliverDelayedMessageTimerTask,核心方法是executeOnTimeup

public void executeOnTimeup() {                //根据延迟级别获取该延迟队列信息            ConsumeQueue cq =                ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC,                    delayLevel2QueueId(delayLevel));            long failScheduleOffset = offset;            //未找到说明目前没有该延迟级别的消息,忽略本次任务            if (cq != null) {                    //根据offset获取队列中获取当前队列中有效的消息,                SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);                if (bufferCQ != null) {                    try {                        long nextOffset = offset;                        int i = 0;                        ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();                                                //遍历ConsumeQueue,每一个ConsumeQueue条目是20个字节解析消息                        for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {                                //物理偏移量                            long offsetPy = bufferCQ.getByteBuffer().getLong();                            //消息长度                            int sizePy = bufferCQ.getByteBuffer().getInt();                            //消息的tag的Hash值                            long tagsCode = bufferCQ.getByteBuffer().getLong();                            //                            if (cq.isExtAddr(tagsCode)) {                                if (cq.getExt(tagsCode, cqExtUnit)) {                                    tagsCode = cqExtUnit.getTagsCode();                                } else {                                    //can't find ext content.So re compute tags code.                                    log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",                                        tagsCode, offsetPy, sizePy);                                    long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);                                    tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);                                }                            }                            long now = System.currentTimeMillis();                            long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);                            nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);                            long countdown = deliverTimestamp - now;                            if (countdown <= 0) {                                    //根据物理偏移量和消息的大小从Commitlog文件中查找消息                                MessageExt msgExt =                                    ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(                                        offsetPy, sizePy);                                if (msgExt != null) {                                    try {                                        MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);                                        //消息存储到Commitlog文件中,转发到主题对应的消息队列上,供消费者再次消费。                                        PutMessageResult putMessageResult =                                            ScheduleMessageService.this.defaultMessageStore                                                .putMessage(msgInner);                                        if (putMessageResult != null                                            && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {                                            continue;                                        } else {                                            // XXX: warn and notify me                                            log.error(                                                "ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",                                                msgExt.getTopic(), msgExt.getMsgId());                                            ScheduleMessageService.this.timer.schedule(                                                new DeliverDelayedMessageTimerTask(this.delayLevel,                                                    nextOffset), DELAY_FOR_A_PERIOD);                                            ScheduleMessageService.this.updateOffset(this.delayLevel,                                                nextOffset);                                            return;                                        }                                    } catch (Exception e) {                                        log.error(                                            "ScheduleMessageService, messageTimeup execute error, drop it. msgExt="                                                + msgExt + ", nextOffset=" + nextOffset + ",offsetPy="                                                + offsetPy + ",sizePy=" + sizePy, e);                                    }                                }                            } else {                                ScheduleMessageService.this.timer.schedule(                                    new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),                                    countdown);                                ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);                                return;                            }                        } // end of for                                                nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);                        ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(                            this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);                        ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);                        return;                    } finally {                        bufferCQ.release();                    }                }else {                        //未找到有效的消息,更新延迟队列定时拉取进度,并创建定时任务带下一次继续尝试                    long cqMinOffset = cq.getMinOffsetInQueue();                    if (offset < cqMinOffset) {                        failScheduleOffset = cqMinOffset;                        log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="                            + cqMinOffset + ", queueId=" + cq.getQueueId());                    }                }            }            //创建延迟任务            ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,                failScheduleOffset), DELAY_FOR_A_WHILE);        }

图解:

1、消息生产者发送消息,如果发送的消息DelayTimeLevel大于0,则改变消息主题为SCHEDULE_TOPIC_XXXX,消息的队列为DelayTimeLevel-1

2、消息经由Commitlog转发到消息队列SCHEDULE_TOPIC_XXXX的消费队列1。

3、定时任务Timer每隔1秒根据上次拉取消息的偏移量从消费队列中取出所有消息。

4、根据消息的物理偏移量和消息大小从Commitlog中拉取消息。(PS:消息存储章节中会重点讲解)

5、根据消息的属性重新创建消息,并恢复原主题TopicTest、原消息队列ID,清除DelayTimeLevel属性存入Commitlog中。

6、记录原主题TopicTest的消息队列的消息偏移量,供消费者索引检索消息进行消费。

到此,关于"RocketMQ延迟消息的实现方法"的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注网站,小编会继续努力为大家带来更多实用的文章!

0