netty无缝切换rabbitmq、activemq、roc
发表于:2024-11-13 作者:千家信息网编辑
千家信息网最后更新 2024年11月13日,netty的pipeline处理链上的handler:需要IdleStateHandler心跳检测channel是否可以,以及处理登录认证的UserAuthHandler和消息处理MessageHan
千家信息网最后更新 2024年11月13日netty无缝切换rabbitmq、activemq、roc
netty的pipeline处理链上的handler:需要IdleStateHandler心跳检测channel是否可以,以及处理登录认证的UserAuthHandler和消息处理MessageHandler
protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(defLoopGroup, //编码解码器 new HttpServerCodec(), //将多个消息转换成单一的消息对象 new HttpObjectAggregator(65536), //支持异步发送大的码流,一般用于发送文件流 new ChunkedWriteHandler(), //检测链路是否读空闲,配合心跳handler检测channel是否正常 new IdleStateHandler(60, 0, 0), //处理握手和认证 new UserAuthHandler(), //处理消息的发送 new MessageHandler() );}
对于所有连进来的channel,我们需要保存起来,往后的群发消息需要依靠这些channel
public static void addChannel(Channel channel) { String remoteAddr = NettyUtil.parseChannelRemoteAddr(channel); System.out.println("addChannel:" + remoteAddr); if (!channel.isActive()) { logger.error("channel is not active, address: {}", remoteAddr); } UserInfo userInfo = new UserInfo(); userInfo.setAddr(remoteAddr); userInfo.setChannel(channel); userInfo.setTime(System.currentTimeMillis()); userInfos.put(channel, userInfo); }
登录后,channel就变成有效的channel,无效的channel之后将会丢弃
public static boolean saveUser(Channel channel, String nick, String password) { UserInfo userInfo = userInfos.get(channel); if (userInfo == null) { return false; } if (!channel.isActive()) { logger.error("channel is not active, address: {}, nick: {}", userInfo.getAddr(), nick); return false; } if (nick == null || password == null) { return false; } LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>(); lambdaQueryWrapper.eq(Account::getUsername, nick).eq(Account::getPassword, password); Account account = accountMapperStatic.selectOne(lambdaQueryWrapper); if (account == null) { return false; } // 增加一个认证用户 userCount.incrementAndGet(); userInfo.setNick(nick); userInfo.setAuth(true); userInfo.setId(account.getId()); userInfo.setUsername(account.getUsername()); userInfo.setGroupNumber(account.getGroupNumber()); userInfo.setTime(System.currentTimeMillis()); // 注册该用户推送消息的通道 offlineInfoTransmitStatic.registerPull(channel); return true; }
当channel关闭时,就不再接收消息。unregisterPull就是注销信息消费者,客户端不再接取聊天消息。此外,从下方有一个加写锁的操作,就是为了避免channel还在发送消息时,这边突然关闭channel,这样会导致报错。
public static void removeChannel(Channel channel) { try { logger.warn("channel will be remove, address is :{}", NettyUtil.parseChannelRemoteAddr(channel)); //加上读写锁保证移除channel时,避免channel关闭时,还有别的线程对其操作,造成错误 rwLock.writeLock().lock(); channel.close(); UserInfo userInfo = userInfos.get(channel); if (userInfo != null) { if (userInfo.isAuth()) { offlineInfoTransmitStatic.unregisterPull(channel); // 减去一个认证用户 userCount.decrementAndGet(); } userInfos.remove(channel); } } finally { rwLock.writeLock().unlock(); } }
为了无缝切换使用rabbitmq、rocketmq、activemq、不使用中间件存储和转发聊天消息这4种状态,定义如下4个接口。依次是发送单聊消息、群聊消息、客户端启动接收消息、客户端下线不接收消息。
public interface OfflineInfoTransmit { void pushP2P(Integer userId, String message); void pushGroup(String groupNumber, String message); void registerPull(Channel channel); void unregisterPull(Channel channel);}
其中,如何使用rabbitmq、rocketmq、activemq三种中间件中的一种来存储和转发聊天消息,它的处理流程如下:
- 单聊的模型参考线程池的模型,如果用户在线,直接通过channel发送给用户。如果用户离线,则发往中间件存储,下次用户上线时直接从中间件拉取消息。这样做对比所有消息的发送都通过中间件来转的好处是提升了性能
- 群聊则是完全通过中间件来转发消息,消息发送中间件,客户端从中间件接取消息。如果仍像单聊那样操作,在线用户直接通过channel发送,操作过于繁琐,要判断这个群组的哪些用户是否在线
- 如果用户在线就注册消费者,从中间件接取消息。否则,就断开消费者,消息保留在中间件中,以便客户端下次上线时拉取。这样就实现了离线消息的接收。
- 不管使用哪种中间件或使用不使用中间件,它的处理流程都遵循上面的3个要求,就能无缝切换上方的4种方法来存储和转发消息。需要哪种方法开启相应注解即可。
代码地址:
https://github.com/shuangyueliao/netty-chat
消息
中间件
用户
处理
客户
客户端
在线
存储
认证
消费者
检测
消费
无缝
切换
就是
方法
模型
流程
线程
登录
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
什么比喻网络安全的大门
有没有必要云数据库
战地1怎么举报服务器
嘉定区自主可控网络技术优势
数据库增删
虚表数据库
网络安全中代理服务在第几层
海淀区互联网企业主管科技部门
数据库取消主键和重建主键要点
网络安全靠大家研讨提纲
数据库增删改c
手机软件开发密码
外贸客户服务器
山西有没有英雄联盟服务器
网络安全专业设置
宁波交易软件开发
预订软件开发
说明数据库中的事务处理
软件开发专业高职学费
数据库分析工具
qq旋风源码数据库
数据库错误1461
关于网络安全的倡议书100字
云数据库的表怎么设置
网络安全新品发布仪式
网络服务器红灯一直闪
网络技术下的教学方式
加拿大软件开发招聘
怎样保护wifi网络安全
数据库闭包求主键