千家信息网

Redisson中怎么实现一个延时消息组件

发表于:2025-01-29 作者:千家信息网编辑
千家信息网最后更新 2025年01月29日,Redisson中怎么实现一个延时消息组件,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。定义主题队列注解@Target({Eleme
千家信息网最后更新 2025年01月29日Redisson中怎么实现一个延时消息组件

Redisson中怎么实现一个延时消息组件,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。

定义主题队列注解

@Target({ElementType.TYPE})@Retention(RetentionPolicy.RUNTIME)@Documented@Componentpublic @interface RMessage {     /**      * 消息队列      * @return      */     String queue();     /**      * 主题      * @return      */     String topic() default "system";}

springboot启动监听初始化任务队列与消息主题,消费者订阅主题

@Slf4j@Componentpublic class RMessageListener implements ApplicationListener {    /**     * consumer monitoringMethod monitorMessage     */    private final static String METHOD_MONITOR_MESSAGE = "monitorMessage";    /**     * redisson topic name     */    private final static String ATTRIBUTE_NAME_TOPIC = "topic";    /**     * redisson messageQueue name     */    private final static String ATTRIBUTE_NAME_QUEUE = "queue";    /**     * redisson queue map     */    public static Map> messageQueue = new ConcurrentHashMap<>();    /**     * redisson offQueue map     */    public static Map> offQueue = new ConcurrentHashMap<>();    /**     * redisson topic map     */    public static Map topicMap = new ConcurrentHashMap<>();    @Override    public void onApplicationEvent(ApplicationStartedEvent applicationStartedEvent) {        ClassPathScanningCandidateComponentProvider provider = new ClassPathScanningCandidateComponentProvider(false);        provider.addIncludeFilter(new AnnotationTypeFilter(RMessage.class));        String basePackage = applicationStartedEvent.getSpringApplication().getMainApplicationClass().getPackage().getName();        Set beanDefinitions = provider.findCandidateComponents(basePackage);        ConfigurableListableBeanFactory beanFactory = applicationStartedEvent.getApplicationContext().getBeanFactory();        mqInit(beanDefinitions, beanFactory);        provider.clearCache();        provider.resetFilters(false);        provider.addIncludeFilter(new AssignableTypeFilter(RMessageConsumer.class));        Set consumers = provider.findCandidateComponents(basePackage);        consumerSubscribe(beanFactory, consumers);    }    /**     * consumer subscription news     *     * @param beanFactory     * @param consumers     */    private void consumerSubscribe(ConfigurableListableBeanFactory beanFactory, Set consumers) {        consumers.forEach(beanDefinition -> {            log.info("rMessage init consumer {}",beanDefinition.getBeanClassName());            try {                Object bean = beanFactory.getBean(Class.forName(beanDefinition.getBeanClassName()));                Method method = bean.getClass().getMethod(METHOD_MONITOR_MESSAGE);                ReflectionUtils.invokeMethod(method,bean);            } catch (ClassNotFoundException | NoSuchMethodException e) {                e.printStackTrace();            }        });    }    /**     * Parameter initialization     *     * @param beanDefinitions     * @param beanFactory     */    private void mqInit(Set beanDefinitions,final ConfigurableListableBeanFactory beanFactory) {        RedissonClient redissonClient = beanFactory.getBean(RedissonClient.class);        beanDefinitions.stream().filter(beanDefinition -> beanDefinition instanceof AnnotatedBeanDefinition).forEach(beanDefinition->{            AnnotatedBeanDefinition annotatedBeanDefinition = (AnnotatedBeanDefinition)beanDefinition;            AnnotationMetadata annotationMetadata = annotatedBeanDefinition.getMetadata();            MergedAnnotation mergedAnnotation = annotationMetadata.getAnnotations().get(RMessage.class);            String queryName = mergedAnnotation.getString(ATTRIBUTE_NAME_QUEUE);            String topicName = mergedAnnotation.getString(ATTRIBUTE_NAME_TOPIC);            String shortName = topicName+"."+queryName;            RBlockingDeque blockingDeque = redissonClient.getBlockingDeque(shortName);            messageQueue.put(shortName,blockingDeque);            RDelayedQueue delayedQueue = redissonClient.getDelayedQueue(blockingDeque);            offQueue.put(shortName,delayedQueue);            RTopic topic = redissonClient.getTopic(topicName);            topicMap.put(shortName,topic);        });    }}

抽象队列主题列表

public abstract class AbstractQueue {    Map> offQueue = RMessageListener.offQueue;    Map> messageQueue = RMessageListener.messageQueue;    Map topicMap = RMessageListener.topicMap;    protected RDelayedQueue getRDelayedQueue() {        return offQueue.get(shortName());    }    protected RBlockingDeque getMessageQueue() {        return messageQueue.get(shortName());    }    private String shortName() {        Annotation[] annotations = this.getClass().getAnnotations();        RMessage rMessage = Arrays.stream(annotations).filter(annotation -> annotation instanceof RMessage)            .map(annotation -> (RMessage)annotation).findAny().get();        String queryName = rMessage.queue();        String topicName = rMessage.topic();        return topicName+"."+queryName;    }    protected RTopic getTopic() {        return topicMap.get(shortName());    }}

抽象生产者模板

@Slf4jpublic abstract class RMessageProducer extends AbstractQueue {    /**     * 发送延时消息     * @param message     * @param delay     * @param timeUnit     */    public void sendMessage(T message, long delay, TimeUnit timeUnit) {        log.info("rMessage sendMessage: {}, delayTime {}",message.toString(),delay+timeUnit.name());        super.getRDelayedQueue().offer(message,delay,timeUnit);        super.getTopic().publish(this.hashCode());    }    /**     * 发送异步消息     * @param message     */    public void sendMessage(T message) {        this.sendMessage(message,0,TimeUnit.MILLISECONDS);    }}

抽象消费者模板

@Slf4jpublic abstract class RMessageConsumer extends AbstractQueue {    public void monitorMessage() {        CompletableFuture.runAsync(this::pastConsumption);        super.getTopic().addListener(Object.class,(c,m)-> {            try {                Object take = super.getMessageQueue().take();                log.info("rMessage receiveMessage: {}, receiving time {}",take.toString(), LocalDateTime.now());                this.useMessage((T)take);            } catch (InterruptedException e) {                e.printStackTrace();            }        });    }    protected abstract void useMessage(T message);    public void pastConsumption() {        while (super.getRDelayedQueue().size() > 0 || super.getMessageQueue().size() > 0) {            try {                Object take = super.getMessageQueue().take();                log.info("rMessage receiveMessage: {}, receiving time {}",take.toString(), LocalDateTime.now());                this.useMessage((T)take);            } catch (InterruptedException e) {                e.printStackTrace();            }        }    }}

具体使用

生产者

@RMessage(queue = "redisQuery",topic = "order")public class RedissonProducer extends RMessageProducer {}@RestController@RequestMapping("producer")@AllArgsConstructorpublic class ProducerController {    private RedissonProducer redissonProducer;    @PostMapping    public String send() {        HashMap map = new HashMap<>();        map.put("name","张三");        map.put("time", "测试顺序第二条"+LocalDateTime.now());        redissonProducer.sendMessage(map,5, TimeUnit.MINUTES);        return "send msg";    }}

消费者

@RMessage(queue = "redisQuery",topic = "order")public class RedissonConsumer extends RMessageConsumer {    @Override    protected void useMessage(HashMap message) {        System.out.println("接收到消息:"+message);    }}

关于 Redisson中怎么实现一个延时消息组件问题的解答就分享到这里了,希望以上内容可以对大家有一定的帮助,如果你还有很多疑惑没有解开,可以关注行业资讯频道了解更多相关知识。

0