千家信息网

rocketmq中DefaultRocketMQListenerContainer的原理及用法

发表于:2025-01-23 作者:千家信息网编辑
千家信息网最后更新 2025年01月23日,这篇文章主要介绍"rocketmq中DefaultRocketMQListenerContainer的原理及用法",在日常操作中,相信很多人在rocketmq中DefaultRocketMQListe
千家信息网最后更新 2025年01月23日rocketmq中DefaultRocketMQListenerContainer的原理及用法

这篇文章主要介绍"rocketmq中DefaultRocketMQListenerContainer的原理及用法",在日常操作中,相信很多人在rocketmq中DefaultRocketMQListenerContainer的原理及用法问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"rocketmq中DefaultRocketMQListenerContainer的原理及用法"的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

本文主要研究一下rocketmq的DefaultRocketMQListenerContainer

DefaultRocketMQListenerContainer

rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java

public class DefaultRocketMQListenerContainer implements InitializingBean,    RocketMQListenerContainer, SmartLifecycle, ApplicationContextAware {    private final static Logger log = LoggerFactory.getLogger(DefaultRocketMQListenerContainer.class);    private ApplicationContext applicationContext;    /**     * The name of the DefaultRocketMQListenerContainer instance     */    private String name;    private long suspendCurrentQueueTimeMillis = 1000;    /**     * Message consume retry strategy
-1,no retry,put into DLQ directly
0,broker control retry frequency
* >0,client control retry frequency. */ private int delayLevelWhenNextConsume = 0; private String nameServer; private AccessChannel accessChannel = AccessChannel.LOCAL; private String consumerGroup; private String topic; private int consumeThreadMax = 64; private String charset = "UTF-8"; private ObjectMapper objectMapper; private RocketMQListener rocketMQListener; private RocketMQMessageListener rocketMQMessageListener; private DefaultMQPushConsumer consumer; private Class messageType; private boolean running; // The following properties came from @RocketMQMessageListener. private ConsumeMode consumeMode; private SelectorType selectorType; private String selectorExpression; private MessageModel messageModel; private long consumeTimeout; //...... public void setRocketMQMessageListener(RocketMQMessageListener anno) { this.rocketMQMessageListener = anno; this.consumeMode = anno.consumeMode(); this.consumeThreadMax = anno.consumeThreadMax(); this.messageModel = anno.messageModel(); this.selectorExpression = anno.selector_Expression(); this.selectorType = anno.selectorType(); this.consumeTimeout = anno.consumeTimeout(); } @Override public void setupMessageListener(RocketMQListener rocketMQListener) { this.rocketMQListener = rocketMQListener; } @Override public void destroy() { this.setRunning(false); if (Objects.nonNull(consumer)) { consumer.shutdown(); } log.info("container destroyed, {}", this.toString()); } @Override public boolean isAutoStartup() { return true; } @Override public void stop(Runnable callback) { stop(); callback.run(); } @Override public void start() { if (this.isRunning()) { throw new IllegalStateException("container already running. " + this.toString()); } try { consumer.start(); } catch (MQClientException e) { throw new IllegalStateException("Failed to start RocketMQ push consumer", e); } this.setRunning(true); log.info("running container: {}", this.toString()); } @Override public void stop() { if (this.isRunning()) { if (Objects.nonNull(consumer)) { consumer.shutdown(); } setRunning(false); } } @Override public boolean isRunning() { return running; } private void setRunning(boolean running) { this.running = running; } @Override public int getPhase() { // Returning Integer.MAX_VALUE only suggests that // we will be the first bean to shutdown and last bean to start return Integer.MAX_VALUE; } @Override public void afterPropertiesSet() throws Exception { initRocketMQPushConsumer(); this.messageType = getMessageType(); log.debug("RocketMQ messageType: {}", messageType.getName()); } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } @Override public String toString() { return "DefaultRocketMQListenerContainer{" + "consumerGroup='" + consumerGroup + '\'' + ", nameServer='" + nameServer + '\'' + ", topic='" + topic + '\'' + ", consumeMode=" + consumeMode + ", selectorType=" + selectorType + ", selectorExpression='" + selectorExpression + '\'' + ", messageModel=" + messageModel + '}'; } private void initRocketMQPushConsumer() throws MQClientException { Assert.notNull(rocketMQListener, "Property 'rocketMQListener' is required"); Assert.notNull(consumerGroup, "Property 'consumerGroup' is required"); Assert.notNull(nameServer, "Property 'nameServer' is required"); Assert.notNull(topic, "Property 'topic' is required"); RPCHook rpcHook = RocketMQUtil.getRPCHookByAkSk(applicationContext.getEnvironment(), this.rocketMQMessageListener.accessKey(), this.rocketMQMessageListener.secretKey()); boolean enableMsgTrace = rocketMQMessageListener.enableMsgTrace(); if (Objects.nonNull(rpcHook)) { consumer = new DefaultMQPushConsumer(consumerGroup, rpcHook, new AllocateMessageQueueAveragely(), enableMsgTrace, this.applicationContext.getEnvironment(). resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic())); consumer.setVipChannelEnabled(false); consumer.setInstanceName(RocketMQUtil.getInstanceName(rpcHook, consumerGroup)); } else { log.debug("Access-key or secret-key not configure in " + this + "."); consumer = new DefaultMQPushConsumer(consumerGroup, enableMsgTrace, this.applicationContext.getEnvironment(). resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic())); } String customizedNameServer = this.applicationContext.getEnvironment().resolveRequiredPlaceholders(this.rocketMQMessageListener.nameServer()); if (customizedNameServer != null) { consumer.setNamesrvAddr(customizedNameServer); } else { consumer.setNamesrvAddr(nameServer); } if (accessChannel != null) { consumer.setAccessChannel(accessChannel); } consumer.setConsumeThreadMax(consumeThreadMax); if (consumeThreadMax < consumer.getConsumeThreadMin()) { consumer.setConsumeThreadMin(consumeThreadMax); } consumer.setConsumeTimeout(consumeTimeout); consumer.setInstanceName(this.name); switch (messageModel) { case BROADCASTING: consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING); break; case CLUSTERING: consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING); break; default: throw new IllegalArgumentException("Property 'messageModel' was wrong."); } switch (selectorType) { case TAG: consumer.subscribe(topic, selectorExpression); break; case SQL92: consumer.subscribe(topic, MessageSelector.bySql(selectorExpression)); break; default: throw new IllegalArgumentException("Property 'selectorType' was wrong."); } switch (consumeMode) { case ORDERLY: consumer.setMessageListener(new DefaultMessageListenerOrderly()); break; case CONCURRENTLY: consumer.setMessageListener(new DefaultMessageListenerConcurrently()); break; default: throw new IllegalArgumentException("Property 'consumeMode' was wrong."); } if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) { ((RocketMQPushConsumerLifecycleListener) rocketMQListener).prepareStart(consumer); } } private Class getMessageType() { Class targetClass = AopProxyUtils.ultimateTargetClass(rocketMQListener); Type[] interfaces = targetClass.getGenericInterfaces(); Class superclass = targetClass.getSuperclass(); while ((Objects.isNull(interfaces) || 0 == interfaces.length) && Objects.nonNull(superclass)) { interfaces = superclass.getGenericInterfaces(); superclass = targetClass.getSuperclass(); } if (Objects.nonNull(interfaces)) { for (Type type : interfaces) { if (type instanceof ParameterizedType) { ParameterizedType parameterizedType = (ParameterizedType) type; if (Objects.equals(parameterizedType.getRawType(), RocketMQListener.class)) { Type[] actualTypeArguments = parameterizedType.getActualTypeArguments(); if (Objects.nonNull(actualTypeArguments) && actualTypeArguments.length > 0) { return (Class) actualTypeArguments[0]; } else { return Object.class; } } } } return Object.class; } else { return Object.class; } } //......}
  • DefaultRocketMQListenerContainer实现了InitializingBean, RocketMQListenerContainer, SmartLifecycle, ApplicationContextAware接口;setRocketMQMessageListener方法会根据RocketMQMessageListener注解的信息来设置consumeMode、consumeThreadMax、messageModel、selectorExpression、selectorType、consumeTimeout

  • afterPropertiesSet方法执行了initRocketMQPushConsumer及getMessageType方法;initRocketMQPushConsumer方法会根据rpcHook是否为null来创建不同的DefaultMQPushConsumer,之后根据messageModel、selectorType、consumeMode等来配置consumer;如果rocketMQListener类型是RocketMQPushConsumerLifecycleListener的,则执行RocketMQPushConsumerLifecycleListener的prepareStart方法

  • setupMessageListener方法主要是保存了rocketMQListener;isAutoStartup方法返回true;start方法主要是执行consumer.start()方法;stop及destroy方法主要是执行consumer.shutdown()

DefaultMessageListenerConcurrently

rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java

    public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently {        @SuppressWarnings("unchecked")        @Override        public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {            for (MessageExt messageExt : msgs) {                log.debug("received msg: {}", messageExt);                try {                    long now = System.currentTimeMillis();                    rocketMQListener.onMessage(doConvertMessage(messageExt));                    long costTime = System.currentTimeMillis() - now;                    log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);                } catch (Exception e) {                    log.warn("consume message failed. messageExt:{}", messageExt, e);                    context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;                }            }            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;        }    }
  • DefaultMessageListenerConcurrently方法实现了MessageListenerConcurrently接口;它的consumeMessage方法使用for循环try catch执行rocketMQListener.onMessage(doConvertMessage(messageExt))回调,都成功返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS,一旦异常则返回ConsumeConcurrentlyStatus.RECONSUME_LATER

DefaultMessageListenerOrderly

rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java

    public class DefaultMessageListenerOrderly implements MessageListenerOrderly {        @SuppressWarnings("unchecked")        @Override        public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) {            for (MessageExt messageExt : msgs) {                log.debug("received msg: {}", messageExt);                try {                    long now = System.currentTimeMillis();                    rocketMQListener.onMessage(doConvertMessage(messageExt));                    long costTime = System.currentTimeMillis() - now;                    log.info("consume {} cost: {} ms", messageExt.getMsgId(), costTime);                } catch (Exception e) {                    log.warn("consume message failed. messageExt:{}", messageExt, e);                    context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis);                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;                }            }            return ConsumeOrderlyStatus.SUCCESS;        }    }
  • DefaultMessageListenerOrderly实现了MessageListenerOrderly接口,其consumeMessage方法使用for循环try catch执行rocketMQListener.onMessage(doConvertMessage(messageExt))回调,都成功返回ConsumeOrderlyStatus.SUCCESS,一旦异常则返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT

小结

  • DefaultRocketMQListenerContainer实现了InitializingBean, RocketMQListenerContainer, SmartLifecycle, ApplicationContextAware接口;setRocketMQMessageListener方法会根据RocketMQMessageListener注解的信息来设置consumeMode、consumeThreadMax、messageModel、selectorExpression、selectorType、consumeTimeout

  • afterPropertiesSet方法执行了initRocketMQPushConsumer及getMessageType方法;initRocketMQPushConsumer方法会根据rpcHook是否为null来创建不同的DefaultMQPushConsumer,之后根据messageModel、selectorType、consumeMode等来配置consumer;如果rocketMQListener类型是RocketMQPushConsumerLifecycleListener的,则执行RocketMQPushConsumerLifecycleListener的prepareStart方法

  • setupMessageListener方法主要是保存了rocketMQListener;isAutoStartup方法返回true;start方法主要是执行consumer.start()方法;stop及destroy方法主要是执行consumer.shutdown()

到此,关于"rocketmq中DefaultRocketMQListenerContainer的原理及用法"的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注网站,小编会继续努力为大家带来更多实用的文章!

0