千家信息网

netty无缝切换rabbitmq和activem及qrocketmq实现聊天室单聊、群聊功能

发表于:2025-02-02 作者:千家信息网编辑
千家信息网最后更新 2025年02月02日,这篇文章将为大家详细讲解有关netty无缝切换rabbitmq和activem及qrocketmq实现聊天室单聊、群聊功能,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关
千家信息网最后更新 2025年02月02日netty无缝切换rabbitmq和activem及qrocketmq实现聊天室单聊、群聊功能

这篇文章将为大家详细讲解有关netty无缝切换rabbitmq和activem及qrocketmq实现聊天室单聊、群聊功能,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。

netty的pipeline处理链上的handler:需要IdleStateHandler心跳检测channel是否有效,以及处理登录认证的UserAuthHandler和消息处理MessageHandler

protected void initChannel(SocketChannel ch) throws Exception {        ch.pipeline().addLast(defLoopGroup,                //编码解码器                new HttpServerCodec(),                //将多个消息转换成单一的消息对象                new HttpObjectAggregator(65536),                //支持异步发送大的码流,一般用于发送文件流                new ChunkedWriteHandler(),                //检测链路是否读空闲,配合心跳handler检测channel是否正常                new IdleStateHandler(60, 0, 0),                //处理握手和认证                new UserAuthHandler(),                //处理消息的发送                new MessageHandler()        );}

对于所有连进来的channel,我们需要保存起来,往后的群发消息需要依靠这些channel

public static void addChannel(Channel channel) {        String remoteAddr = NettyUtil.parseChannelRemoteAddr(channel);        System.out.println("addChannel:" + remoteAddr);        if (!channel.isActive()) {            logger.error("channel is not active, address: {}", remoteAddr);        }        UserInfo userInfo = new UserInfo();        userInfo.setAddr(remoteAddr);        userInfo.setChannel(channel);        userInfo.setTime(System.currentTimeMillis());        userInfos.put(channel, userInfo);    }

登录后,channel就变成有效的channel,无效的channel之后将会丢弃

public static boolean saveUser(Channel channel, String nick, String password) {        UserInfo userInfo = userInfos.get(channel);        if (userInfo == null) {            return false;        }        if (!channel.isActive()) {            logger.error("channel is not active, address: {}, nick: {}", userInfo.getAddr(), nick);            return false;        }        // 验证用户名和密码        if (nick == null || password == null) {            return false;        }        LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>();        lambdaQueryWrapper.eq(Account::getUsername, nick).eq(Account::getPassword, password);        Account account = accountMapperStatic.selectOne(lambdaQueryWrapper);        if (account == null) {            return false;        }        // 增加一个认证用户        userCount.incrementAndGet();        userInfo.setNick(nick);        userInfo.setAuth(true);        userInfo.setId(account.getId());        userInfo.setUsername(account.getUsername());        userInfo.setGroupNumber(account.getGroupNumber());        userInfo.setTime(System.currentTimeMillis());        // 注册该用户推送消息的通道        offlineInfoTransmitStatic.registerPull(channel);        return true;    }

当channel关闭时,就不再接收消息。unregisterPull就是注销信息消费者,客户端不再接取聊天消息。此外,从下方有一个加写锁的操作,就是为了避免channel还在发送消息时,这边突然关闭channel,这样会导致报错。

public static void removeChannel(Channel channel) {        try {            logger.warn("channel will be remove, address is :{}", NettyUtil.parseChannelRemoteAddr(channel));            //加上读写锁保证移除channel时,避免channel关闭时,还有别的线程对其操作,造成错误            rwLock.writeLock().lock();            channel.close();            UserInfo userInfo = userInfos.get(channel);            if (userInfo != null) {                if (userInfo.isAuth()) {                    offlineInfoTransmitStatic.unregisterPull(channel);                    // 减去一个认证用户                    userCount.decrementAndGet();                }                userInfos.remove(channel);            }        } finally {            rwLock.writeLock().unlock();        }    }

为了无缝切换使用rabbitmq、rocketmq、activemq、不使用中间件存储和转发聊天消息这4种状态,定义如下4个接口。依次是发送单聊消息、群聊消息、客户端启动接收消息、客户端下线不接收消息。

public interface OfflineInfoTransmit {    void pushP2P(Integer userId, String message);    void pushGroup(String groupNumber, String message);    void registerPull(Channel channel);    void unregisterPull(Channel channel);}

其中,如何使用rabbitmq、rocketmq、activemq三种中间件中的一种来存储和转发聊天消息,它的处理流程如下:

  1. 单聊的模型参考线程池的模型,如果用户在线,直接通过channel发送给用户。如果用户离线,则发往中间件存储,下次用户上线时直接从中间件拉取消息。这样做对比所有消息的发送都通过中间件来转的好处是提升了性能

  2. 群聊则是完全通过中间件来转发消息,消息发送中间件,客户端从中间件接取消息。如果仍像单聊那样操作,在线用户直接通过channel发送,操作过于繁琐,要判断这个群组的哪些用户是否在线

  3. 如果用户在线就注册消费者,从中间件接取消息。否则,就断开消费者,消息保留在中间件中,以便客户端下次上线时拉取。这样就实现了离线消息的接收。

  4. 不管使用哪种中间件或使用不使用中间件,它的处理流程都遵循上面的3个要求,就能无缝切换上方的4种方法来存储和转发消息。需要哪种方法开启相应注解即可。

项目地址:https://github.com/shuangyueliao/netty-chat

关于netty无缝切换rabbitmq和activem及qrocketmq实现聊天室单聊、群聊功能就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。

消息 中间件 用户 处理 客户 客户端 无缝 切换 在线 存储 认证 消费者 检测 消费 功能 聊天室 有效 内容 就是 文章 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 牵牛软件开发 数据库知识与技术就业前景 小学网络安全宣传周手抄报 多台服务器怎样设置策略机 软件开发公司支付的认证费用 科研助力网络安全 我的世界蛋白服务器 传感器网络技术在农业上的应用 魔兽世界风暴之眼服务器角色 蚌埠咖啡点餐软件开发多少钱 matlab输入实验数据库 服务器内存满了 学计算机数据库管理 网络攻击是网络安全潜在的威胁 神奇屋网络技术服务 深圳横岗软件开发公司 计算机二级网络技术难度 智能机器人软件开发工资待遇 查出数据库的字段去除换行符 网络技术在职研究生 华为服务器平均无故障工作时间 数据库 人员信息 关键字 虹口区信息软件开发信息中心 网络安全复杂的小报画面新颖 数据库模型演进历史 wincc实时数据库 数据库 小抄 软件开发哪种语言 同花顺招聘应届大学生软件开发 数据库为什么要创建触发器
0