千家信息网

rocketmq中MessageQueueSelector的作用是什么

发表于:2025-01-23 作者:千家信息网编辑
千家信息网最后更新 2025年01月23日,今天就跟大家聊聊有关rocketmq中MessageQueueSelector的作用是什么,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。Mes
千家信息网最后更新 2025年01月23日rocketmq中MessageQueueSelector的作用是什么

今天就跟大家聊聊有关rocketmq中MessageQueueSelector的作用是什么,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。

MessageQueueSelector

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/producer/MessageQueueSelector.java

public interface MessageQueueSelector {    MessageQueue select(final List mqs, final Message msg, final Object arg);}
  • MessageQueueSelector接口定义了select方法,返回MessageQueue;它有几个实现类,分别是SelectMessageQueueByHash、SelectMessageQueueByRandom、SelectMessageQueueByMachineRoom

SelectMessageQueueByHash

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/producer/selector/SelectMessageQueueByHash.java

public class SelectMessageQueueByHash implements MessageQueueSelector {    @Override    public MessageQueue select(List mqs, Message msg, Object arg) {        int value = arg.hashCode();        if (value < 0) {            value = Math.abs(value);        }        value = value % mqs.size();        return mqs.get(value);    }}
  • SelectMessageQueueByHash实现了MessageQueueSelector接口,其select方法取arg参数的hashcode的绝对值,然后对mqs.size()取余,得到目标队列在mqs的下标

SelectMessageQueueByRandom

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/producer/selector/SelectMessageQueueByRandom.java

public class SelectMessageQueueByRandom implements MessageQueueSelector {    private Random random = new Random(System.currentTimeMillis());    @Override    public MessageQueue select(List mqs, Message msg, Object arg) {        int value = random.nextInt(mqs.size());        return mqs.get(value);    }}
  • SelectMessageQueueByRandom实现了MessageQueueSelector接口,其select方法直接根据mqs.size()随机一个值作为目标队列在mqs的下标

SelectMessageQueueByMachineRoom

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/producer/selector/SelectMessageQueueByMachineRoom.java

public class SelectMessageQueueByMachineRoom implements MessageQueueSelector {    private Set consumeridcs;    @Override    public MessageQueue select(List mqs, Message msg, Object arg) {        return null;    }    public Set getConsumeridcs() {        return consumeridcs;    }    public void setConsumeridcs(Set consumeridcs) {        this.consumeridcs = consumeridcs;    }}
  • SelectMessageQueueByMachineRoom实现了MessageQueueSelector接口,其select方法目前返回null

RocketMQTemplate

rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/core/RocketMQTemplate.java

public class RocketMQTemplate extends AbstractMessageSendingTemplate implements InitializingBean, DisposableBean {    private static final  Logger log = LoggerFactory.getLogger(RocketMQTemplate.class);    private DefaultMQProducer producer;    private ObjectMapper objectMapper;    private String charset = "UTF-8";    private MessageQueueSelector messageQueueSelector = new SelectMessageQueueByHash();    private final Map cache = new ConcurrentHashMap<>(); //only put TransactionMQProducer by now!!!    public DefaultMQProducer getProducer() {        return producer;    }    public void setProducer(DefaultMQProducer producer) {        this.producer = producer;    }    public ObjectMapper getObjectMapper() {        return objectMapper;    }    public void setObjectMapper(ObjectMapper objectMapper) {        this.objectMapper = objectMapper;    }    public String getCharset() {        return charset;    }    public void setCharset(String charset) {        this.charset = charset;    }    public MessageQueueSelector getMessageQueueSelector() {        return messageQueueSelector;    }    public void setMessageQueueSelector(MessageQueueSelector messageQueueSelector) {        this.messageQueueSelector = messageQueueSelector;    }    //......}
  • RocketMQTemplate默认创建的MessageQueueSelector是SelectMessageQueueByHash

看完上述内容,你们对rocketmq中MessageQueueSelector的作用是什么有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注行业资讯频道,感谢大家的支持。

0