千家信息网

redis消息队列的实现方法

发表于:2025-01-31 作者:千家信息网编辑
千家信息网最后更新 2025年01月31日,这篇文章主要讲解了"redis消息队列的实现方法",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"redis消息队列的实现方法"吧!方式一:通过list的
千家信息网最后更新 2025年01月31日redis消息队列的实现方法

这篇文章主要讲解了"redis消息队列的实现方法",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"redis消息队列的实现方法"吧!

方式一:通过list的阻塞读取命令,blpop或者brpop

消费者

public class Consumer extends DemoApplicationTests{    @Test    public void consume(){        int timeout = 0;//永不超时        String key = "test_que";        //list集合 第一个元素为key值,第二个元素为弹出的元素值;当超时返回[null]        while(true){            List obj = redisTemplate.executePipelined(new RedisCallback() {                @Override                public Object doInRedis(RedisConnection connection) throws DataAccessException {                    //队列没有元素会阻塞操作,直到队列获取新的元素或超时                    return connection.bLPop(timeout,key.getBytes());                }            },new StringRedisSerializer());            for(Object o:obj){                System.out.println("---------------"+o);            }        }    }}

生产者

public class Productor extends DemoApplicationTests {    @Test    public void generateMsg() {        String key = "test_que";        redisTemplate.opsForList().leftPush(key,"hht2");    }}

方式二:Pub/Sub(发布/订阅)使用的 spring boot

依赖包

            org.springframework.boot        spring-boot-starter-web                org.springframework.boot        spring-boot-starter-data-redis                org.springframework.boot        spring-boot-starter-test        test                org.apache.commons        commons-pool2    

配置类

@Configuration@AutoConfigureAfter(RedisAutoConfiguration.class)@EnableCachingpublic class RedisConfig extends CachingConfigurerSupport {    /**     * 配置自定义redisTemplate     * @return     */    @Bean    RedisTemplate redisTemplate(RedisConnectionFactory redisConnectionFactory) {        RedisTemplate template = new RedisTemplate<>();        template.setConnectionFactory(redisConnectionFactory);        //使用Jackson2JsonRedisSerializer来序列化和反序列化redis的value值        Jackson2JsonRedisSerializer serializer = new Jackson2JsonRedisSerializer(Object.class);        ObjectMapper mapper = new ObjectMapper();        mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);        mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);        serializer.setObjectMapper(mapper);        template.setValueSerializer(serializer);        //使用StringRedisSerializer来序列化和反序列化redis的key值        template.setKeySerializer(new StringRedisSerializer());        template.setHashKeySerializer(new StringRedisSerializer());        template.setHashValueSerializer(serializer);        template.afterPropertiesSet();        return template;    }    /**     * 序列化定制     *     * @return     */    @Bean    public Jackson2JsonRedisSerializer jackson2JsonSerializer() {        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(                Object.class);        // 初始化objectmapper        ObjectMapper mapper = new ObjectMapper();        mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);        mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);        jackson2JsonRedisSerializer.setObjectMapper(mapper);        return jackson2JsonRedisSerializer;    }    /**     * 消息监听器,使用MessageAdapter可实现自动化解码及方法代理     *     * @return     */    @Bean    public MessageListenerAdapter listener(Jackson2JsonRedisSerializer jackson2JsonRedisSerializer,                                           MessageSubscriber subscriber) {        MessageListenerAdapter adapter = new MessageListenerAdapter(subscriber, "onMessage");        adapter.setSerializer(jackson2JsonRedisSerializer);        adapter.afterPropertiesSet();        return adapter;    }    /**     * 将订阅器绑定到容器     *     * @param connectionFactory     * @param listenerAdapter     * @return     */    @Bean    public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,                                                   MessageListenerAdapter listener) {        RedisMessageListenerContainer container = new RedisMessageListenerContainer();        container.setConnectionFactory(connectionFactory);        container.addMessageListener(listener, new PatternTopic("/redis/*"));        return container;    }}

模拟消息发布类

@Servicepublic class RedisPubSub {        private static final Logger logger = LoggerFactory.getLogger(RedisPubSub.class);        @Autowired        private RedisTemplate redisTemplate;        private ChannelTopic topic = new ChannelTopic("/redis/pubsub");        @Scheduled(initialDelay = 5000, fixedDelay = 10000)        private void schedule() {            logger.info("publish message");            publish("admin", "hey you must go now!");        }        /**         * 推送消息         *         * @param publisher         * @param message         */        public void publish(String publisher, String content) {            logger.info("message send {} by {}", content, publisher);            redisTemplate.convertAndSend(topic.getTopic(), content);        }}

模拟消息接收类

@Componentpublic class MessageSubscriber {    Logger logger = LoggerFactory.getLogger(MessageSubscriber.class);    public void onMessage(String message, String pattern) {        logger.info("topic {} received {} ", pattern, message);    }}

启动类

@SpringBootApplication@EnableSchedulingpublic class DemoApplication {    public static void main(String[] args) {        SpringApplication.run(DemoApplication.class, args);    }}

感谢各位的阅读,以上就是"redis消息队列的实现方法"的内容了,经过本文的学习后,相信大家对redis消息队列的实现方法这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!

0