rocketmq中DefaultRocketMQListenerContainer的原理及用法
这篇文章主要介绍"rocketmq中DefaultRocketMQListenerContainer的原理及用法",在日常操作中,相信很多人在rocketmq中DefaultRocketMQListenerContainer的原理及用法问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"rocketmq中DefaultRocketMQListenerContainer的原理及用法"的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
序
本文主要研究一下rocketmq的DefaultRocketMQListenerContainer
DefaultRocketMQListenerContainer
rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
public class DefaultRocketMQListenerContainer implements InitializingBean, RocketMQListenerContainer, SmartLifecycle, ApplicationContextAware { private final static Logger log = LoggerFactory.getLogger(DefaultRocketMQListenerContainer.class); private ApplicationContext applicationContext; /** * The name of the DefaultRocketMQListenerContainer instance */ private String name; private long suspendCurrentQueueTimeMillis = 1000; /** * Message consume retry strategy
-1,no retry,put into DLQ directly
0,broker control retry frequency
* >0,client control retry frequency. */ private int delayLevelWhenNextConsume = 0; private String nameServer; private AccessChannel accessChannel = AccessChannel.LOCAL; private String consumerGroup; private String topic; private int consumeThreadMax = 64; private String charset = "UTF-8"; private ObjectMapper objectMapper; private RocketMQListener rocketMQListener; private RocketMQMessageListener rocketMQMessageListener; private DefaultMQPushConsumer consumer; private Class messageType; private boolean running; // The following properties came from @RocketMQMessageListener. private ConsumeMode consumeMode; private SelectorType selectorType; private String selectorExpression; private MessageModel messageModel; private long consumeTimeout; //...... public void setRocketMQMessageListener(RocketMQMessageListener anno) { this.rocketMQMessageListener = anno; this.consumeMode = anno.consumeMode(); this.consumeThreadMax = anno.consumeThreadMax(); this.messageModel = anno.messageModel(); this.selectorExpression = anno.selector_Expression(); this.selectorType = anno.selectorType(); this.consumeTimeout = anno.consumeTimeout(); } @Override public void setupMessageListener(RocketMQListener rocketMQListener) { this.rocketMQListener = rocketMQListener; } @Override public void destroy() { this.setRunning(false); if (Objects.nonNull(consumer)) { consumer.shutdown(); } log.info("container destroyed, {}", this.toString()); } @Override public boolean isAutoStartup() { return true; } @Override public void stop(Runnable callback) { stop(); callback.run(); } @Override public void start() { if (this.isRunning()) { throw new IllegalStateException("container already running. " + this.toString()); } try { consumer.start(); } catch (MQClientException e) { throw new IllegalStateException("Failed to start RocketMQ push consumer", e); } this.setRunning(true); log.info("running container: {}", this.toString()); } @Override public void stop() { if (this.isRunning()) { if (Objects.nonNull(consumer)) { consumer.shutdown(); } setRunning(false); } } @Override public boolean isRunning() { return running; } private void setRunning(boolean running) { this.running = running; } @Override public int getPhase() { // Returning Integer.MAX_VALUE only suggests that // we will be the first bean to shutdown and last bean to start return Integer.MAX_VALUE; } @Override public void afterPropertiesSet() throws Exception { initRocketMQPushConsumer(); this.messageType = getMessageType(); log.debug("RocketMQ messageType: {}", messageType.getName()); } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } @Override public String toString() { return "DefaultRocketMQListenerContainer{" + "consumerGroup='" + consumerGroup + '\'' + ", nameServer='" + nameServer + '\'' + ", topic='" + topic + '\'' + ", consumeMode=" + consumeMode + ", selectorType=" + selectorType + ", selectorExpression='" + selectorExpression + '\'' + ", messageModel=" + messageModel + '}'; } private void initRocketMQPushConsumer() throws MQClientException { Assert.notNull(rocketMQListener, "Property 'rocketMQListener' is required"); Assert.notNull(consumerGroup, "Property 'consumerGroup' is required"); Assert.notNull(nameServer, "Property 'nameServer' is required"); Assert.notNull(topic, "Property 'topic' is required"); RPCHook rpcHook = RocketMQUtil.getRPCHookByAkSk(applicationContext.getEnvironment(), this.rocketMQMessageListener.accessKey(), this.rocketMQMessageListener.secretKey()); boolean enableMsgTrace = rocketMQMessageListener.enableMsgTrace(); if (Objects.nonNull(rpcHook)) { consumer = new DefaultMQPushConsumer(consumerGroup, rpcHook, new AllocateMessageQueueAveragely(), enableMsgTrace, this.applicationContext.getEnvironment(). resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic())); consumer.setVipChannelEnabled(false); consumer.setInstanceName(RocketMQUtil.getInstanceName(rpcHook, consumerGroup)); } else { log.debug("Access-key or secret-key not configure in " + this + "."); consumer = new DefaultMQPushConsumer(consumerGroup, enableMsgTrace, this.applicationContext.getEnvironment(). resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic())); } String customizedNameServer = this.applicationContext.getEnvironment().resolveRequiredPlaceholders(this.rocketMQMessageListener.nameServer()); if (customizedNameServer != null) { consumer.setNamesrvAddr(customizedNameServer); } else { consumer.setNamesrvAddr(nameServer); } if (accessChannel != null) { consumer.setAccessChannel(accessChannel); } consumer.setConsumeThreadMax(consumeThreadMax); if (consumeThreadMax < consumer.getConsumeThreadMin()) { consumer.setConsumeThreadMin(consumeThreadMax); } consumer.setConsumeTimeout(consumeTimeout); consumer.setInstanceName(this.name); switch (messageModel) { case BROADCASTING: consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING); break; case CLUSTERING: consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING); break; default: throw new IllegalArgumentException("Property 'messageModel' was wrong."); } switch (selectorType) { case TAG: consumer.subscribe(topic, selectorExpression); break; case SQL92: consumer.subscribe(topic, MessageSelector.bySql(selectorExpression)); break; default: throw new IllegalArgumentException("Property 'selectorType' was wrong."); } switch (consumeMode) { case ORDERLY: consumer.setMessageListener(new DefaultMessageListenerOrderly()); break; case CONCURRENTLY: consumer.setMessageListener(new DefaultMessageListenerConcurrently()); break; default: throw new IllegalArgumentException("Property 'consumeMode' was wrong."); } if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) { ((RocketMQPushConsumerLifecycleListener) rocketMQListener).prepareStart(consumer); } } private Class getMessageType() { Class> targetClass = AopProxyUtils.ultimateTargetClass(rocketMQListener); Type[] interfaces = targetClass.getGenericInterfaces(); Class> superclass = targetClass.getSuperclass(); while ((Objects.isNull(interfaces) || 0 == interfaces.length) && Objects.nonNull(superclass)) { interfaces = superclass.getGenericInterfaces(); superclass = targetClass.getSuperclass(); } if (Objects.nonNull(interfaces)) { for (Type type : interfaces) { if (type instanceof ParameterizedType) { ParameterizedType parameterizedType = (ParameterizedType) type; if (Objects.equals(parameterizedType.getRawType(), RocketMQListener.class)) { Type[] actualTypeArguments = parameterizedType.getActualTypeArguments(); if (Objects.nonNull(actualTypeArguments) && actualTypeArguments.length > 0) { return (Class) actualTypeArguments[0]; } else { return Object.class; } } } } return Object.class; } else { return Object.class; } } //......}
DefaultRocketMQListenerContainer实现了InitializingBean, RocketMQListenerContainer, SmartLifecycle, ApplicationContextAware接口;setRocketMQMessageListener方法会根据RocketMQMessageListener注解的信息来设置consumeMode、consumeThreadMax、messageModel、selectorExpression、selectorType、consumeTimeout
afterPropertiesSet方法执行了initRocketMQPushConsumer及getMessageType方法;initRocketMQPushConsumer方法会根据rpcHook是否为null来创建不同的DefaultMQPushConsumer,之后根据messageModel、selectorType、consumeMode等来配置consumer;如果rocketMQListener类型是RocketMQPushConsumerLifecycleListener的,则执行RocketMQPushConsumerLifecycleListener的prepareStart方法
setupMessageListener方法主要是保存了rocketMQListener;isAutoStartup方法返回true;start方法主要是执行consumer.start()方法;stop及destroy方法主要是执行consumer.shutdown()
DefaultMessageListenerConcurrently
rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently { @SuppressWarnings("unchecked") @Override public ConsumeConcurrentlyStatus consumeMessage(Listmsgs, ConsumeConcurrentlyContext context) { for (MessageExt messageExt : msgs) { log.debug("received msg: {}", messageExt); try { long now = System.currentTimeMillis(); rocketMQListener.onMessage(doConvertMessage(messageExt)); long costTime = System.currentTimeMillis() - now; log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime); } catch (Exception e) { log.warn("consume message failed. messageExt:{}", messageExt, e); context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }
DefaultMessageListenerConcurrently方法实现了MessageListenerConcurrently接口;它的consumeMessage方法使用for循环try catch执行rocketMQListener.onMessage(doConvertMessage(messageExt))回调,都成功返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS,一旦异常则返回ConsumeConcurrentlyStatus.RECONSUME_LATER
DefaultMessageListenerOrderly
rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
public class DefaultMessageListenerOrderly implements MessageListenerOrderly { @SuppressWarnings("unchecked") @Override public ConsumeOrderlyStatus consumeMessage(Listmsgs, ConsumeOrderlyContext context) { for (MessageExt messageExt : msgs) { log.debug("received msg: {}", messageExt); try { long now = System.currentTimeMillis(); rocketMQListener.onMessage(doConvertMessage(messageExt)); long costTime = System.currentTimeMillis() - now; log.info("consume {} cost: {} ms", messageExt.getMsgId(), costTime); } catch (Exception e) { log.warn("consume message failed. messageExt:{}", messageExt, e); context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis); return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } } return ConsumeOrderlyStatus.SUCCESS; } }
DefaultMessageListenerOrderly实现了MessageListenerOrderly接口,其consumeMessage方法使用for循环try catch执行rocketMQListener.onMessage(doConvertMessage(messageExt))回调,都成功返回ConsumeOrderlyStatus.SUCCESS,一旦异常则返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT
小结
DefaultRocketMQListenerContainer实现了InitializingBean, RocketMQListenerContainer, SmartLifecycle, ApplicationContextAware接口;setRocketMQMessageListener方法会根据RocketMQMessageListener注解的信息来设置consumeMode、consumeThreadMax、messageModel、selectorExpression、selectorType、consumeTimeout
afterPropertiesSet方法执行了initRocketMQPushConsumer及getMessageType方法;initRocketMQPushConsumer方法会根据rpcHook是否为null来创建不同的DefaultMQPushConsumer,之后根据messageModel、selectorType、consumeMode等来配置consumer;如果rocketMQListener类型是RocketMQPushConsumerLifecycleListener的,则执行RocketMQPushConsumerLifecycleListener的prepareStart方法
setupMessageListener方法主要是保存了rocketMQListener;isAutoStartup方法返回true;start方法主要是执行consumer.start()方法;stop及destroy方法主要是执行consumer.shutdown()
到此,关于"rocketmq中DefaultRocketMQListenerContainer的原理及用法"的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注网站,小编会继续努力为大家带来更多实用的文章!