千家信息网

基于netty的websocket在channelActive触发时发送数据异常问题分析是怎样的

发表于:2024-12-12 作者:千家信息网编辑
千家信息网最后更新 2024年12月12日,本篇文章给大家分享的是有关基于netty的websocket在channelActive触发时发送数据异常问题分析是怎样的,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,
千家信息网最后更新 2024年12月12日基于netty的websocket在channelActive触发时发送数据异常问题分析是怎样的

本篇文章给大家分享的是有关基于netty的websocket在channelActive触发时发送数据异常问题分析是怎样的,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。

####事情起因,用netty实现了websocket,在链接创建成功后发送一个消息给客户端,我们选择在channelActive中发送消息。 可想而知肯定是不行的了 代码如下

EventLoopGroup bossGroup = new NioEventLoopGroup();        EventLoopGroup group = new NioEventLoopGroup();        try {            ServerBootstrap sb = new ServerBootstrap();            sb.option(ChannelOption.SO_BACKLOG, 1024);            // 绑定线程池            sb.group(group, bossGroup)                    // 指定使用的channel                    .channel(NioServerSocketChannel.class)                    // 绑定监听端口                    .localAddress(this.port)                    .option(ChannelOption.SO_KEEPALIVE, true)                    // 绑定客户端连接时候触发操作                    .childHandler(new ChannelInitializer() {                        @Override                        protected void initChannel(SocketChannel ch) throws Exception {                            log.info("收到新连接");                            ch.pipeline().addLast(new LoggingHandler("DEBUG"));                            ch.pipeline().addLast(new IdleStateHandler(60, 0, 0));                            //websocket协议本身是基于http协议的,所以这边也要使用http解编码器                            ch.pipeline().addLast(new HttpServerCodec());                            //以块的方式来写的处理器                            ch.pipeline().addLast(new ChunkedWriteHandler());                            ch.pipeline().addLast(new HttpObjectAggregator(8192));                            ch.pipeline().addLast(new WebSocketServerProtocolHandler("/socket", null, true, 65536 * 10));                            ch.pipeline().addLast(new MyWebSocketHandler());                        }                    });            // 服务器异步创建绑定            ChannelFuture cf = sb.bind().sync();            log.info(NettyServer.class + " 启动正在监听: " + cf.channel().localAddress());            // 关闭服务器通道            cf.channel().closeFuture().sync();

#####排查1:因为没有任何异常,客户端没有收到消息,故先采用wireshark抓包,发现网卡上没有对应的想发送的消息。 为什么网卡没有对应的包呢,经过debug发现如果发送的数据类型是WebSocketFrame在最终发送时候异常了具体代码在HeadContext的write方法中,headcontext是netty的channelpipeline的头部,最终写出时都会从pipeline的尾部链接到头部来执行(pipeline为双向链表) 为什么在channelread中能写信息而在channelActive无法写信息呢,经过分析发现,channelActive的触发是在socketchannel第一次注册的时候发生的具体代码如下:abstrcatchannel中

private void register0(ChannelPromise promise) {            try {                // check if the channel is still open as it could be closed in the mean time when the register                // call was outside of the eventLoop                if (!promise.setUncancellable() || !ensureOpen(promise)) {                    return;                }                boolean firstRegistration = neverRegistered;                doRegister();                neverRegistered = false;                registered = true;                // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the                // user may already fire events through the pipeline in the ChannelFutureListener.                pipeline.invokeHandlerAddedIfNeeded();                safeSetSuccess(promise);                pipeline.fireChannelRegistered();                // Only fire a channelActive if the channel has never been registered. This prevents firing                // multiple channel actives if the channel is deregistered and re-registered.                if (isActive()) {                    if (firstRegistration) {                        pipeline.fireChannelActive();                    } else if (config().isAutoRead()) {                        // This channel was registered before and autoRead() is set. This means we need to begin read                        // again so that we process inbound data.                        //                        // See https://github.com/netty/netty/issues/4805                        beginRead();                    }                }            } catch (Throwable t) {                // Close the channel directly to avoid FD leak.                closeForcibly();                closeFuture.setClosed();                safeSetFailure(promise, t);            }        }

而抓包时发现触发此channelactive时,服务端还未返回websocket的协议握手包(websocket协议是在在http协议上衍生的,会先发一个http get请求然后服务端返回一个为websocket协议的包给客户端)至此问题就真相大白了,在我们添加的WebSocketServerProtocolHandler这个handller中有如下代码

    public void handlerAdded(ChannelHandlerContext ctx) {        ChannelPipeline cp = ctx.pipeline();        if (cp.get(WebSocketServerProtocolHandshakeHandler.class) == null) {            // Add the WebSocketHandshakeHandler before this one.            ctx.pipeline().addBefore(ctx.name(), WebSocketServerProtocolHandshakeHandler.class.getName(),                    new WebSocketServerProtocolHandshakeHandler(websocketPath, subprotocols,                            allowExtensions, maxFramePayloadLength, allowMaskMismatch, checkStartsWith));        }        if (cp.get(Utf8FrameValidator.class) == null) {            // Add the UFT8 checking before this one.            ctx.pipeline().addBefore(ctx.name(), Utf8FrameValidator.class.getName(),                    new Utf8FrameValidator());        }    }

添加的WebSocketServerProtocolHandshakeHandler中有如下代码

 public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {        final FullHttpRequest req = (FullHttpRequest) msg;        if (isNotWebSocketPath(req)) {            ctx.fireChannelRead(msg);            return;        }        try {            if (!GET.equals(req.method())) {                sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, FORBIDDEN));                return;            }            final WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(                    getWebSocketLocation(ctx.pipeline(), req, websocketPath), subprotocols,                            allowExtensions, maxFramePayloadSize, allowMaskMismatch);            final WebSocketServerHandshaker handshaker = wsFactory.newHandshaker(req);            if (handshaker == null) {                WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());            } else {                final ChannelFuture handshakeFuture = handshaker.handshake(ctx.channel(), req);                handshakeFuture.addListener(new ChannelFutureListener() {                    @Override                    public void operationComplete(ChannelFuture future) throws Exception {                        if (!future.isSuccess()) {                            ctx.fireExceptionCaught(future.cause());                        } else {                            // Kept for compatibility                            ctx.fireUserEventTriggered(                                    WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE);                            ctx.fireUserEventTriggered(                                    new WebSocketServerProtocolHandler.HandshakeComplete(                                            req.uri(), req.headers(), handshaker.selectedSubprotocol()));                        }                    }                });                WebSocketServerProtocolHandler.setHandshaker(ctx.channel(), handshaker);                ctx.pipeline().replace(this, "WS403Responder",                        WebSocketServerProtocolHandler.forbiddenHttpRequestResponder());            }        } finally {            req.release();        }    }

接受握手信息时候会添加两个handler 为websocket协议信息的编码和解码handler

public final ChannelFuture handshake(Channel channel, FullHttpRequest req,                                            HttpHeaders responseHeaders, final ChannelPromise promise) {        if (logger.isDebugEnabled()) {            logger.debug("{} WebSocket version {} server handshake", channel, version());        }        FullHttpResponse response = newHandshakeResponse(req, responseHeaders);        ChannelPipeline p = channel.pipeline();        if (p.get(HttpObjectAggregator.class) != null) {            p.remove(HttpObjectAggregator.class);        }        if (p.get(HttpContentCompressor.class) != null) {            p.remove(HttpContentCompressor.class);        }        ChannelHandlerContext ctx = p.context(HttpRequestDecoder.class);        final String encoderName;        if (ctx == null) {            // this means the user use a HttpServerCodec            ctx = p.context(HttpServerCodec.class);            if (ctx == null) {                promise.setFailure(                        new IllegalStateException("No HttpDecoder and no HttpServerCodec in the pipeline"));                return promise;            }            p.addBefore(ctx.name(), "wsdecoder", newWebsocketDecoder());            p.addBefore(ctx.name(), "wsencoder", newWebSocketEncoder());            encoderName = ctx.name();        } else {            p.replace(ctx.name(), "wsdecoder", newWebsocketDecoder());            encoderName = p.context(HttpResponseEncoder.class).name();            p.addBefore(encoderName, "wsencoder", newWebSocketEncoder());        }        channel.writeAndFlush(response).addListener(new ChannelFutureListener() {            @Override            public void operationComplete(ChannelFuture future) throws Exception {                if (future.isSuccess()) {                    ChannelPipeline p = future.channel().pipeline();                    p.remove(encoderName);                    promise.setSuccess();                } else {                    promise.setFailure(future.cause());                }            }        });        return promise;    }

如果想要实现在websocket协议连接成功后发送一个消息给客户端,我们发现在发送握手成功后触发了fireUserEventTriggered,去实现userEventTriggered然后判断evt类型做处理吧

以上就是基于netty的websocket在channelActive触发时发送数据异常问题分析是怎样的,小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注行业资讯频道。

0