千家信息网

如何理解RocketMQ消费位置

发表于:2024-10-14 作者:千家信息网编辑
千家信息网最后更新 2024年10月14日,这篇文章给大家介绍如何理解RocketMQ消费位置,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。RocketMQ创建消费者的时指定了Topic主题及Tag,我们发现新创建的消费者
千家信息网最后更新 2024年10月14日如何理解RocketMQ消费位置

这篇文章给大家介绍如何理解RocketMQ消费位置,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。

RocketMQ创建消费者的时指定了Topic主题及Tag,我们发现新创建的消费者消费不了历史的数据,只能消费掉创建以后消费者发送的数据。这是什么原因,我们能把所有的消息都消费吗?,我们可以指定需要消费的消息的时间吗?答案是肯定的,下面我们具体分析一下。

前提:我们讨论是集群模式下的,广播模式也是一样的,只是示例代码我们用集群模式来讨论。

消息消费的位置目前提供了三种方式CONSUME_FROM_LAST_OFFSET(队列尾部消费)、CONSUME_FROM_FIRST_OFFSET(队列头部消费)、CONSUME_FROM_TIMESTAMP(指定消费时间点)。

public enum ConsumeFromWhere {    CONSUME_FROM_LAST_OFFSET,    @Deprecated    CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST,    @Deprecated    CONSUME_FROM_MIN_OFFSET,    @Deprecated    CONSUME_FROM_MAX_OFFSET,    CONSUME_FROM_FIRST_OFFSET,    CONSUME_FROM_TIMESTAMP,}

分析源码我们看到有6种方式,其他三种已经废弃掉了,不做讨论。

1、从队列尾部消费(默认)

我们从新创建一个消费组来消费某个主题下的消息时,历史消息没有被消费,当生产者重新发送消息时则会接收到最新的,我们分析下其在哪设置的。

当创建消费者的时候内置了一些参数,从队列尾部消费。

从队列尾部消费导致历史消息消费不了,会丢失一部分数据,如果仅仅是状态数据则可以这样设置,如果是业务数据导致数据丢失。

对于设置这个参数仅对于消费组第一次创建时生效,后面再次设置不生效,因为该消费组在服务端已经记录了消费的进度,已有进度位置。

查看消费进度文件的位置,我们根据上几节的内容查看TopicTest主题下的这个consumer_test_clustering消费组的消息消费的进度。查看Broker-a服务器节点上的信息。

查看消费的消费进度先根据可视化界面查看

查看服务器文件上的消费进度信息:/usr/local/rocketmq-all-4.2.0/store/config/consumerOffset.json

2、从队列头部消费

编写Consumer

public class Consumer1 {          public static void main(String[] args){                try {                        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();                        consumer.setConsumerGroup("consumer_first_offset");                        consumer.setNamesrvAddr("10.10.12.203:9876;10.10.12.204:9876");                        consumer.subscribe("TopicTest", "*");                        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);                        consumer.registerMessageListener(new MessageListenerConcurrently(){                                @Override                                public ConsumeConcurrentlyStatus consumeMessage(List paramList,                                                ConsumeConcurrentlyContext paramConsumeConcurrentlyContext) {                                        try {                                            for(MessageExt msg : paramList){                                                    String msgbody = new String(msg.getBody(), "utf-8");                                                    SimpleDateFormat sd = new SimpleDateFormat("YYYY-MM-dd HH:mm:ss");                                                    Date date = new Date(msg.getStoreTimestamp());                                                    System.out.println("Consumer1===  存入时间 :  "+ sd.format(date) +" == MessageBody: "+ msgbody);//输出消息内容                                            }                                        } catch (Exception e) {                                            e.printStackTrace();                                            return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再试                                        }                                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消费成功                                }                        });                        consumer.start();                        System.out.println("Consumer1===启动成功!");                } catch (Exception e) {                        // TODO Auto-generated catch block                        e.printStackTrace();                }        }}

设置了消费组:consumer.setConsumerGroup("consumer_first_offset");

设置了消费位置:consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

查看其结果

从头开始消费是指目前还储存在broker的上的消息全部消费一遍,因为RocketMQ会将消息持久化到磁盘文件中,时间长就会导致磁盘文件会很多,RocketMQ有一种机制,只是保留一段时间的消息,之前的消息会删除,可以指定时间点删除(无论消息是否被消费,到时间点文件都会被删除)

3、从指定时间点消费

消费者代码

public class Consumer1 {                public static void main(String[] args){                try {                        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();                        consumer.setConsumerGroup("consumer_time_offset");                        consumer.setNamesrvAddr("10.10.12.203:9876;10.10.12.204:9876");                        consumer.subscribe("TopicTest", "*");                        //可以设置从什么时间开始消费,配合setConsumeTimestamp一起使用默认半小时之前的,格式yyyyMMddhhmmss                        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);                        consumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis() - 1800000L));                         consumer.registerMessageListener(new MessageListenerConcurrently(){                                @Override                                public ConsumeConcurrentlyStatus consumeMessage(List paramList,                                                ConsumeConcurrentlyContext paramConsumeConcurrentlyContext) {                                        try {                                            for(MessageExt msg : paramList){                                                    String msgbody = new String(msg.getBody(), "utf-8");                                                    SimpleDateFormat sd = new SimpleDateFormat("YYYY-MM-dd HH:mm:ss");                                                    Date date = new Date(msg.getStoreTimestamp());                                                    System.out.println("Consumer1===  存入时间 :  "+ sd.format(date) +" == MessageBody: "+ msgbody);//输出消息内容                                            }                                        } catch (Exception e) {                                            e.printStackTrace();                                            return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再试                                        }                                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消费成功                                }                        });                        consumer.start();                        System.out.println("Consumer1===启动成功!");                } catch (Exception e) {                        // TODO Auto-generated catch block                        e.printStackTrace();                }        }}

设置消费位置:consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP); 设置消费的时间点:consumer.setConsumeTimestamp("20181222171201");

如果从消息进度服务OffsetStore读取到MessageQueue中的偏移量大于等于0,则使用读取到的偏移量,只有读取到的偏移量小于0时上述策略才会生效。

关于如何理解RocketMQ消费位置就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。

0