千家信息网

redis流数据推送多用户的方法是什么

发表于:2025-02-06 作者:千家信息网编辑
千家信息网最后更新 2025年02月06日,本篇内容主要讲解"redis流数据推送多用户的方法是什么",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"redis流数据推送多用户的方法是什么"吧!1 当用
千家信息网最后更新 2025年02月06日redis流数据推送多用户的方法是什么

本篇内容主要讲解"redis流数据推送多用户的方法是什么",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"redis流数据推送多用户的方法是什么"吧!

1 当用户几百 几千个时 如何推送?取缔线程池 采用单线程异步同步推送。

2 现在的逻辑:

每次项目重新启动: 初始化channel 、服务端断开连接-重新连接。当有服务连接不上的时候定时器连接。

这样会产生一个问题:异步连接的任务特别多 导致服务奔溃

/** * 关闭连接时 */@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {  InetSocketAddress ipSocket = (InetSocketAddress) ctx.channel().remoteAddress();  int port = ipSocket.getPort();  String host = ipSocket.getHostString();  String serverUrl = host + ":" + port;  String prev = redisTemplate.opsForValue().get(SOCKET_CONNECT_PREFIX + serverUrl);  Integer cur = Math.toIntExact(System.currentTimeMillis() / 1000);  if (null == prev || cur - Integer.parseInt(prev) > 20 * 60) {    redisTemplate.opsForValue().set(SOCKET_CONNECT_PREFIX + serverUrl, cur + "");    log.info("服务端断开连接=====" + host + port + "20分钟之后 重新连接");    final EventLoop eventLoop = ctx.channel().eventLoop();    Bootstrap bootstrap = defaultProcessHandler.getBootstrap(eventLoop);    eventLoop.schedule(        () -> defaultProcessHandler.doConnect(bootstrap, host + ":" + port, new AtomicInteger(0)),        20,        TimeUnit.MINUTES);  } else {    log.warn(serverUrl + "距离上次断开连接不足20分钟==" + (cur - Integer.parseInt(prev)) + "s");  }  super.channelInactive(ctx);}

在系统启动初始化 连接10次:

3 如何保证发送数据的完整性 TCP 粘包问题

服务端添加按换行符分隔的解码器;

serverBootstrap.childHandler(new ChannelInitializer() {  @Override  protected void initChannel(Channel channel) {    //此方法每次客户端连接都会调用,是为通道初始化的方法
//获得通道channel中的管道链(执行链、handler链) ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast(new LineBasedFrameDecoder(Short.MAX_VALUE * 10)); pipeline.addLast(new StringDecoder()); pipeline.addLast(new StringHandler());
log.info("success to initHandler!"); }});


4 关于推送服务断开之后 重新连接上来 接着上次发送的位置继续推送。
ChannelFuture future = channels.get(callbackUrl.getUrl());if (null != future) {  try {    boolean result = sendMsg(future, pushDataStr);    this.redisTemplate.opsForValue().set(callbackUrl.getUrl(), offset.toString());    if (!result) {      log.error("this channel push failed {}", callbackUrl.getUrl());      returnVal = false;    }  } catch (Exception e) {    log.error("push exception", e);    returnVal = false;  }}return returnVal;

当推送成功 会记录次用户 此次的数据游标数据到redis.

当推送服务挂断之后,会进行任务的初始化 此时会从redis中读取每个客户上次读取的位置offest 提交到任务线程池

这个问题同样解决了服务重启之后,依然可以从上次读取结束的位置接着读取。读取任务的开始游标位置 :是上次服务成功处理后的游标。

5 当接收数据的服务端 重新断开之后,如何保证接着上次读取的位置?

BaseReceiver 接口 handler 方法 每次接收到数据 返回当前游标值。当进行业务处理成功之后 返回true .会自动进行后续数据读取。当客户那边的接收数据服务端挂了之后 首先会进行自动重连操作,此时读取datahub数据的线程依然在返回数据 但是不能推送成功,所以游标值 不会后移。

到此,相信大家对"redis流数据推送多用户的方法是什么"有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

0