Redisson中怎么实现一个延时消息组件
发表于:2025-01-29 作者:千家信息网编辑
千家信息网最后更新 2025年01月29日,Redisson中怎么实现一个延时消息组件,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。定义主题队列注解@Target({Eleme
千家信息网最后更新 2025年01月29日Redisson中怎么实现一个延时消息组件
Redisson中怎么实现一个延时消息组件,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。
定义主题队列注解
@Target({ElementType.TYPE})@Retention(RetentionPolicy.RUNTIME)@Documented@Componentpublic @interface RMessage { /** * 消息队列 * @return */ String queue(); /** * 主题 * @return */ String topic() default "system";}
springboot启动监听初始化任务队列与消息主题,消费者订阅主题
@Slf4j@Componentpublic class RMessageListener implements ApplicationListener{ /** * consumer monitoringMethod monitorMessage */ private final static String METHOD_MONITOR_MESSAGE = "monitorMessage"; /** * redisson topic name */ private final static String ATTRIBUTE_NAME_TOPIC = "topic"; /** * redisson messageQueue name */ private final static String ATTRIBUTE_NAME_QUEUE = "queue"; /** * redisson queue map */ public static Map > messageQueue = new ConcurrentHashMap<>(); /** * redisson offQueue map */ public static Map > offQueue = new ConcurrentHashMap<>(); /** * redisson topic map */ public static Map topicMap = new ConcurrentHashMap<>(); @Override public void onApplicationEvent(ApplicationStartedEvent applicationStartedEvent) { ClassPathScanningCandidateComponentProvider provider = new ClassPathScanningCandidateComponentProvider(false); provider.addIncludeFilter(new AnnotationTypeFilter(RMessage.class)); String basePackage = applicationStartedEvent.getSpringApplication().getMainApplicationClass().getPackage().getName(); Set beanDefinitions = provider.findCandidateComponents(basePackage); ConfigurableListableBeanFactory beanFactory = applicationStartedEvent.getApplicationContext().getBeanFactory(); mqInit(beanDefinitions, beanFactory); provider.clearCache(); provider.resetFilters(false); provider.addIncludeFilter(new AssignableTypeFilter(RMessageConsumer.class)); Set consumers = provider.findCandidateComponents(basePackage); consumerSubscribe(beanFactory, consumers); } /** * consumer subscription news * * @param beanFactory * @param consumers */ private void consumerSubscribe(ConfigurableListableBeanFactory beanFactory, Set consumers) { consumers.forEach(beanDefinition -> { log.info("rMessage init consumer {}",beanDefinition.getBeanClassName()); try { Object bean = beanFactory.getBean(Class.forName(beanDefinition.getBeanClassName())); Method method = bean.getClass().getMethod(METHOD_MONITOR_MESSAGE); ReflectionUtils.invokeMethod(method,bean); } catch (ClassNotFoundException | NoSuchMethodException e) { e.printStackTrace(); } }); } /** * Parameter initialization * * @param beanDefinitions * @param beanFactory */ private void mqInit(Set beanDefinitions,final ConfigurableListableBeanFactory beanFactory) { RedissonClient redissonClient = beanFactory.getBean(RedissonClient.class); beanDefinitions.stream().filter(beanDefinition -> beanDefinition instanceof AnnotatedBeanDefinition).forEach(beanDefinition->{ AnnotatedBeanDefinition annotatedBeanDefinition = (AnnotatedBeanDefinition)beanDefinition; AnnotationMetadata annotationMetadata = annotatedBeanDefinition.getMetadata(); MergedAnnotation mergedAnnotation = annotationMetadata.getAnnotations().get(RMessage.class); String queryName = mergedAnnotation.getString(ATTRIBUTE_NAME_QUEUE); String topicName = mergedAnnotation.getString(ATTRIBUTE_NAME_TOPIC); String shortName = topicName+"."+queryName; RBlockingDeque super Serializable> blockingDeque = redissonClient.getBlockingDeque(shortName); messageQueue.put(shortName,blockingDeque); RDelayedQueue super Serializable> delayedQueue = redissonClient.getDelayedQueue(blockingDeque); offQueue.put(shortName,delayedQueue); RTopic topic = redissonClient.getTopic(topicName); topicMap.put(shortName,topic); }); }}
抽象队列主题列表
public abstract class AbstractQueue { Map> offQueue = RMessageListener.offQueue; Map > messageQueue = RMessageListener.messageQueue; Map topicMap = RMessageListener.topicMap; protected RDelayedQueue super Serializable> getRDelayedQueue() { return offQueue.get(shortName()); } protected RBlockingDeque super Serializable> getMessageQueue() { return messageQueue.get(shortName()); } private String shortName() { Annotation[] annotations = this.getClass().getAnnotations(); RMessage rMessage = Arrays.stream(annotations).filter(annotation -> annotation instanceof RMessage) .map(annotation -> (RMessage)annotation).findAny().get(); String queryName = rMessage.queue(); String topicName = rMessage.topic(); return topicName+"."+queryName; } protected RTopic getTopic() { return topicMap.get(shortName()); }}
抽象生产者模板
@Slf4jpublic abstract class RMessageProducerextends AbstractQueue { /** * 发送延时消息 * @param message * @param delay * @param timeUnit */ public void sendMessage(T message, long delay, TimeUnit timeUnit) { log.info("rMessage sendMessage: {}, delayTime {}",message.toString(),delay+timeUnit.name()); super.getRDelayedQueue().offer(message,delay,timeUnit); super.getTopic().publish(this.hashCode()); } /** * 发送异步消息 * @param message */ public void sendMessage(T message) { this.sendMessage(message,0,TimeUnit.MILLISECONDS); }}
抽象消费者模板
@Slf4jpublic abstract class RMessageConsumerextends AbstractQueue { public void monitorMessage() { CompletableFuture.runAsync(this::pastConsumption); super.getTopic().addListener(Object.class,(c,m)-> { try { Object take = super.getMessageQueue().take(); log.info("rMessage receiveMessage: {}, receiving time {}",take.toString(), LocalDateTime.now()); this.useMessage((T)take); } catch (InterruptedException e) { e.printStackTrace(); } }); } protected abstract void useMessage(T message); public void pastConsumption() { while (super.getRDelayedQueue().size() > 0 || super.getMessageQueue().size() > 0) { try { Object take = super.getMessageQueue().take(); log.info("rMessage receiveMessage: {}, receiving time {}",take.toString(), LocalDateTime.now()); this.useMessage((T)take); } catch (InterruptedException e) { e.printStackTrace(); } } }}
具体使用
生产者
@RMessage(queue = "redisQuery",topic = "order")public class RedissonProducer extends RMessageProducer{}@RestController@RequestMapping("producer")@AllArgsConstructorpublic class ProducerController { private RedissonProducer redissonProducer; @PostMapping public String send() { HashMap map = new HashMap<>(); map.put("name","张三"); map.put("time", "测试顺序第二条"+LocalDateTime.now()); redissonProducer.sendMessage(map,5, TimeUnit.MINUTES); return "send msg"; }}
消费者
@RMessage(queue = "redisQuery",topic = "order")public class RedissonConsumer extends RMessageConsumer{ @Override protected void useMessage(HashMap message) { System.out.println("接收到消息:"+message); }}
关于 Redisson中怎么实现一个延时消息组件问题的解答就分享到这里了,希望以上内容可以对大家有一定的帮助,如果你还有很多疑惑没有解开,可以关注行业资讯频道了解更多相关知识。
消息
主题
队列
消费者
问题
消费
组件
更多
模板
生产者
帮助
生产
解答
易行
简单易行
任务
内容
小伙
小伙伴
方法
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
南京学习网络技术哪家好
软件开发的经费预算
网络安全技能竞赛的通知
ppt服务器图标素材
国家网络安全应急预案时间
青少年互联网科技创新
魔兽世界tbc奎尔塞拉服务器
问道手游怎么连接服务器进不去
数据软件开发检测中心
数据库根据字段名查找唯一
服务器电源加装可调压
那年设置的首都网络安全日
江西软件开发费用
网络营销存在网络安全隐患
阿里ob数据库客户端软件
数据库加字段数据备份
网络安全是哪个部门的责任
数据库定义sex约束条件
网络技术A的实训课
魔幻模拟战哪个服务器
陕西钱道互联网科技有限公司
苏州阿里云服务器数据丢失
东莞蜀山软件开发工作室
aws的服务器公钥
cmd里面怎么关闭数据库
ios获取网页数据库
汕尾导航软件开发
鸿蒙软件开发看什么书
计算机网络安全涉及的保密性
w10sql数据库卸载