千家信息网

RocketMQTemplate的原理和作用是什么

发表于:2025-01-24 作者:千家信息网编辑
千家信息网最后更新 2025年01月24日,本篇内容介绍了"RocketMQTemplate的原理和作用是什么"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能
千家信息网最后更新 2025年01月24日RocketMQTemplate的原理和作用是什么

本篇内容介绍了"RocketMQTemplate的原理和作用是什么"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

本文主要研究一下RocketMQTemplate

RocketMQTemplate

rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/core/RocketMQTemplate.java

public class RocketMQTemplate extends AbstractMessageSendingTemplate implements InitializingBean, DisposableBean {    private static final  Logger log = LoggerFactory.getLogger(RocketMQTemplate.class);    private DefaultMQProducer producer;    private ObjectMapper objectMapper;    private String charset = "UTF-8";    private MessageQueueSelector messageQueueSelector = new SelectMessageQueueByHash();    private final Map cache = new ConcurrentHashMap<>(); //only put TransactionMQProducer by now!!!    //......    @Override    public void afterPropertiesSet() throws Exception {        if (producer != null) {            producer.start();        }    }    @Override    protected void doSend(String destination, Message message) {        SendResult sendResult = syncSend(destination, message);        log.debug("send message to `{}` finished. result:{}", destination, sendResult);    }    @Override    protected Message doConvert(Object payload, Map headers, MessagePostProcessor postProcessor) {        String content;        if (payload instanceof String) {            content = (String) payload;        } else {            // If payload not as string, use objectMapper change it.            try {                content = objectMapper.writeValueAsString(payload);            } catch (JsonProcessingException e) {                log.error("convert payload to String failed. payload:{}", payload);                throw new RuntimeException("convert to payload to String failed.", e);            }        }        MessageBuilder builder = MessageBuilder.withPayload(content);        if (headers != null) {            builder.copyHeaders(headers);        }        builder.setHeaderIfAbsent(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN);        Message message = builder.build();        if (postProcessor != null) {            message = postProcessor.postProcessMessage(message);        }        return message;    }    @Override    public void destroy() {        if (Objects.nonNull(producer)) {            producer.shutdown();        }        for (Map.Entry kv : cache.entrySet()) {            if (Objects.nonNull(kv.getValue())) {                kv.getValue().shutdown();            }        }        cache.clear();    }    //......}
  • RocketMQTemplate继承了spring-messaging的AbstractMessageSendingTemplate,实现了InitializingBean, DisposableBean接口;提供了syncSend、syncSendOrderly、asyncSend、asyncSendOrderly、sendOneWay、sendOneWayOrderly、sendMessageInTransaction等方法

  • afterPropertiesSet方法执行producer.start();destroy方法执行producer.shutdown()以及TransactionMQProducer的shutdown并清空cache集合

  • doSend方法内部调用的是syncSend方法,返回的sendResult仅仅debug输出;doConvert方法针对String类型的payload不做处理,其他类型使用objectMapper.writeValueAsString转为String作为content,然后构造message,执行postProcessor.postProcessMessage,然后返回

syncSend

rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/core/RocketMQTemplate.java

    /**     * Same to {@link #syncSend(String, Message)} with send timeout specified in addition.     *     * @param destination formats: `topicName:tags`     * @param message     {@link org.springframework.messaging.Message}     * @param timeout     send timeout with millis     * @param delayLevel  level for the delay message     * @return {@link SendResult}     */    public SendResult syncSend(String destination, Message message, long timeout, int delayLevel) {        if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {            log.error("syncSend failed. destination:{}, message is null ", destination);            throw new IllegalArgumentException("`message` and `message.payload` cannot be null");        }        try {            long now = System.currentTimeMillis();            org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper,                charset, destination, message);            if (delayLevel > 0) {                rocketMsg.setDelayTimeLevel(delayLevel);            }            SendResult sendResult = producer.send(rocketMsg, timeout);            long costTime = System.currentTimeMillis() - now;            log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());            return sendResult;        } catch (Exception e) {            log.error("syncSend failed. destination:{}, message:{} ", destination, message);            throw new MessagingException(e.getMessage(), e);        }    }    /**     * syncSend batch messages in a given timeout.     *     * @param destination formats: `topicName:tags`     * @param messages    Collection of {@link org.springframework.messaging.Message}     * @param timeout     send timeout with millis     * @return {@link SendResult}     */    public SendResult syncSend(String destination, Collection> messages, long timeout) {        if (Objects.isNull(messages) || messages.size() == 0) {            log.error("syncSend with batch failed. destination:{}, messages is empty ", destination);            throw new IllegalArgumentException("`messages` can not be empty");        }        try {            long now = System.currentTimeMillis();            Collection rmqMsgs = new ArrayList<>();            org.apache.rocketmq.common.message.Message rocketMsg;            for (Message msg:messages) {                if (Objects.isNull(msg) || Objects.isNull(msg.getPayload())) {                    log.warn("Found a message empty in the batch, skip it");                    continue;                }                rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper, charset, destination, msg);                rmqMsgs.add(rocketMsg);            }            SendResult sendResult = producer.send(rmqMsgs, timeout);            long costTime = System.currentTimeMillis() - now;            log.debug("send messages cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());            return sendResult;        } catch (Exception e) {            log.error("syncSend with batch failed. destination:{}, messages.size:{} ", destination, messages.size());            throw new MessagingException(e.getMessage(), e);        }    }
  • syncSend方法支持单个及多个org.springframework.messaging.Message,其中单个Message的接口支持delayLevel

syncSendOrderly

rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/core/RocketMQTemplate.java

    /**     * Same to {@link #syncSendOrderly(String, Message, String)} with send timeout specified in addition.     *     * @param destination formats: `topicName:tags`     * @param message     {@link org.springframework.messaging.Message}     * @param hashKey     use this key to select queue. for example: orderId, productId ...     * @param timeout     send timeout with millis     * @return {@link SendResult}     */    public SendResult syncSendOrderly(String destination, Message message, String hashKey, long timeout) {        if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {            log.error("syncSendOrderly failed. destination:{}, message is null ", destination);            throw new IllegalArgumentException("`message` and `message.payload` cannot be null");        }        try {            long now = System.currentTimeMillis();            org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper,                charset, destination, message);            SendResult sendResult = producer.send(rocketMsg, messageQueueSelector, hashKey, timeout);            long costTime = System.currentTimeMillis() - now;            log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());            return sendResult;        } catch (Exception e) {            log.error("syncSendOrderly failed. destination:{}, message:{} ", destination, message);            throw new MessagingException(e.getMessage(), e);        }    }
  • syncSendOrderly方法内部调用的是producer.send(rocketMsg, messageQueueSelector, hashKey, timeout)方法,同步返回SendResult

asyncSend

rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/core/RocketMQTemplate.java

    /**     * Same to {@link #asyncSend(String, Message, SendCallback)} with send timeout and delay level specified in addition.     *     * @param destination  formats: `topicName:tags`     * @param message      {@link org.springframework.messaging.Message}     * @param sendCallback {@link SendCallback}     * @param timeout      send timeout with millis     * @param delayLevel   level for the delay message     */    public void asyncSend(String destination, Message message, SendCallback sendCallback, long timeout, int delayLevel) {        if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {            log.error("asyncSend failed. destination:{}, message is null ", destination);            throw new IllegalArgumentException("`message` and `message.payload` cannot be null");        }        try {            org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper,                charset, destination, message);            if (delayLevel > 0) {                rocketMsg.setDelayTimeLevel(delayLevel);            }            producer.send(rocketMsg, sendCallback, timeout);        } catch (Exception e) {            log.info("asyncSend failed. destination:{}, message:{} ", destination, message);            throw new MessagingException(e.getMessage(), e);        }    }
  • asyncSend方法需要传入SendCallback,内部执行的是producer.send(rocketMsg, sendCallback, timeout)

asyncSendOrderly

rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/core/RocketMQTemplate.java

    /**     * Same to {@link #asyncSendOrderly(String, Message, String, SendCallback)} with send timeout specified in     * addition.     *     * @param destination  formats: `topicName:tags`     * @param message      {@link org.springframework.messaging.Message}     * @param hashKey      use this key to select queue. for example: orderId, productId ...     * @param sendCallback {@link SendCallback}     * @param timeout      send timeout with millis     */    public void asyncSendOrderly(String destination, Message message, String hashKey, SendCallback sendCallback,                                 long timeout) {        if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {            log.error("asyncSendOrderly failed. destination:{}, message is null ", destination);            throw new IllegalArgumentException("`message` and `message.payload` cannot be null");        }        try {            org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper,                charset, destination, message);            producer.send(rocketMsg, messageQueueSelector, hashKey, sendCallback, timeout);        } catch (Exception e) {            log.error("asyncSendOrderly failed. destination:{}, message:{} ", destination, message);            throw new MessagingException(e.getMessage(), e);        }    }
  • asyncSendOrderly方法内部执行的是producer.send(rocketMsg, messageQueueSelector, hashKey, sendCallback, timeout)

sendOneWay

rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/core/RocketMQTemplate.java

    /**     * Similar to UDP, this method won't wait for     * acknowledgement from broker before return. Obviously, it has maximums throughput yet potentials of message loss.     * 

* One-way transmission is used for cases requiring moderate reliability, such as log collection. * * @param destination formats: `topicName:tags` * @param message {@link org.springframework.messaging.Message} */ public void sendOneWay(String destination, Message message) { if (Objects.isNull(message) || Objects.isNull(message.getPayload())) { log.error("sendOneWay failed. destination:{}, message is null ", destination); throw new IllegalArgumentException("`message` and `message.payload` cannot be null"); } try { org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper, charset, destination, message); producer.sendOneway(rocketMsg); } catch (Exception e) { log.error("sendOneWay failed. destination:{}, message:{} ", destination, message); throw new MessagingException(e.getMessage(), e); } }

  • sendOneWay方法内部执行的是producer.sendOneway(rocketMsg)

sendOneWayOrderly

rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/core/RocketMQTemplate.java

    /**     * Same to {@link #sendOneWay(String, Message)} with send orderly with hashKey by specified.     *     * @param destination formats: `topicName:tags`     * @param message     {@link org.springframework.messaging.Message}     * @param hashKey     use this key to select queue. for example: orderId, productId ...     */    public void sendOneWayOrderly(String destination, Message message, String hashKey) {        if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {            log.error("sendOneWayOrderly failed. destination:{}, message is null ", destination);            throw new IllegalArgumentException("`message` and `message.payload` cannot be null");        }        try {            org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper,                charset, destination, message);            producer.sendOneway(rocketMsg, messageQueueSelector, hashKey);        } catch (Exception e) {            log.error("sendOneWayOrderly failed. destination:{}, message:{}", destination, message);            throw new MessagingException(e.getMessage(), e);        }    }
  • sendOneWayOrderly方法内部执行的是producer.sendOneway(rocketMsg, messageQueueSelector, hashKey)

sendMessageInTransaction

rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/core/RocketMQTemplate.java

    /**     * Send Spring Message in Transaction     *     * @param txProducerGroup the validate txProducerGroup name, set null if using the default name     * @param destination     destination formats: `topicName:tags`     * @param message         message {@link org.springframework.messaging.Message}     * @param arg             ext arg     * @return TransactionSendResult     * @throws MessagingException     */    public TransactionSendResult sendMessageInTransaction(final String txProducerGroup, final String destination, final Message message, final Object arg) throws MessagingException {        try {            TransactionMQProducer txProducer = this.stageMQProducer(txProducerGroup);            org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper,                charset, destination, message);            return txProducer.sendMessageInTransaction(rocketMsg, arg);        } catch (MQClientException e) {            throw RocketMQUtil.convert(e);        }    }
  • sendMessageInTransaction方法内部执行的是txProducer.sendMessageInTransaction(rocketMsg, arg)

小结

  • RocketMQTemplate继承了spring-messaging的AbstractMessageSendingTemplate,实现了InitializingBean, DisposableBean接口;提供了syncSend、syncSendOrderly、asyncSend、asyncSendOrderly、sendOneWay、sendOneWayOrderly、sendMessageInTransaction等方法

  • afterPropertiesSet方法执行producer.start();destroy方法执行producer.shutdown()以及TransactionMQProducer的shutdown并清空cache集合

  • doSend方法内部调用的是syncSend方法,返回的sendResult仅仅debug输出;doConvert方法针对String类型的payload不做处理,其他类型使用objectMapper.writeValueAsString转为String作为content,然后构造message,执行postProcessor.postProcessMessage,然后返回

"RocketMQTemplate的原理和作用是什么"的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注网站,小编将为大家输出更多高质量的实用文章!

0