千家信息网

netty中pipeline如何添加handler

发表于:2024-11-12 作者:千家信息网编辑
千家信息网最后更新 2024年11月12日,这篇文章将为大家详细讲解有关netty中pipeline如何添加handler,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。pipeline无疑是 netty中非常
千家信息网最后更新 2024年11月12日netty中pipeline如何添加handler

这篇文章将为大家详细讲解有关netty中pipeline如何添加handler,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。

pipeline无疑是 netty中非常重要的一环,在io处理中netty通过已存放在pipeline中的handler有序处理socket中的信息,而pipeline中包含的都是一个个的handler,它是怎么添加进去的呢
创建代码如下:其中有handler和childhandler
 public void start() throws Exception {        EventLoopGroup bossGroup = new NioEventLoopGroup();        EventLoopGroup group = new NioEventLoopGroup();        try {            ServerBootstrap sb = new ServerBootstrap();            sb.option(ChannelOption.SO_BACKLOG, 1024);            // 绑定线程池            sb.group(group, bossGroup)                    // 指定使用的channel                    .channel(NioServerSocketChannel.class)                    // 绑定监听端口                    .localAddress(this.port)                    .handler(new LoggingHandler(LogLevel.INFO))                    // 绑定客户端连接时候触发操作                    .childHandler(new ChannelInitializer() {                        @Override                        protected void initChannel(SocketChannel ch) throws Exception {                            ChannelPipeline pipeline = ch.pipeline();                            pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));                            pipeline.addLast(new LengthFieldPrepender(4));                            //字符串解码                            pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));                            //字符串编码                            pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));                            pipeline.addLast(new IdleStateHandler(0, 0, 120, TimeUnit.SECONDS));                        }                    });            // 服务器异步创建绑定            ChannelFuture cf = sb.bind().sync();            log.info(NettyServer.class + " 启动正在监听: " + cf.channel().localAddress());            // 关闭服务器通道            cf.channel().closeFuture().sync();        } finally {            // 释放线程池资源            group.shutdownGracefully().sync();            bossGroup.shutdownGracefully().sync();        }    }
顺着服务端bind方法点进去会看到如下:创建完channel实例后 会调用一个init方法
final ChannelFuture initAndRegister() {        Channel channel = null;        try {            channel = channelFactory.newChannel();            init(channel);        } catch (Throwable t) {            if (channel != null) {                // channel can be null if newChannel crashed (eg SocketException("too many open files"))                channel.unsafe().closeForcibly();                // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor                return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);            }            // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor            return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);        }        ChannelFuture regFuture = config().group().register(channel);        if (regFuture.cause() != null) {            if (channel.isRegistered()) {                channel.close();            } else {                channel.unsafe().closeForcibly();            }        }        return regFuture;    }
init方法为抽象方法 服务端实现如下
void init(Channel channel) throws Exception {//省略若干        ChannelPipeline p = channel.pipeline();        final EventLoopGroup currentChildGroup = childGroup;        final ChannelHandler currentChildHandler = childHandler;        final Entry, Object>[] currentChildOptions;        final Entry, Object>[] currentChildAttrs;        synchronized (childOptions) {            currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));        }        synchronized (childAttrs) {            currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));        }                //看这里  添加了一个ChannelInitializer实例 实例中实现了initChannel   获取配置的handler添加到pipeline        p.addLast(new ChannelInitializer() {            @Override            public void initChannel(final Channel ch) throws Exception {                final ChannelPipeline pipeline = ch.pipeline();                ChannelHandler handler = config.handler();                if (handler != null) {                    pipeline.addLast(handler);                }                ch.eventLoop().execute(new Runnable() {                    @Override                    public void run() {                        pipeline.addLast(new ServerBootstrapAcceptor(                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));                    }                });            }        });    }
而pipeline的addlast方法如下
    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {        final AbstractChannelHandlerContext newCtx;        synchronized (this) {            checkMultiplicity(handler);            newCtx = newContext(group, filterName(name, handler), handler);                        //这方法很简单 添加到链表中  但是此时添加的是上文的ChannelInitializer 而不是我们定义的handler            addLast0(newCtx);            // If the registered is false it means that the channel was not registered on an eventLoop yet.            // In this case we add the context to the pipeline and add a task that will call            // ChannelHandler.handlerAdded(...) once the channel is registered.            if (!registered) {                newCtx.setAddPending();                callHandlerCallbackLater(newCtx, true);                return this;            }            EventExecutor executor = newCtx.executor();            if (!executor.inEventLoop()) {                callHandlerAddedInEventLoop(newCtx, executor);                return this;            }        }                //这个方法就是关键 将ChannelInitializer中的handler添加到pipeline中 以及删除ChannelInitializer                //但请注意 首次注册不会走到这里  在前面的判断就会返回  会在注册的逻辑中触发  最终调用的一致        callHandlerAdded0(newCtx);        return this;    }
查看ChannelInitializer中的关键代码如下 OK 通过如下代码我们就看到 调用了ChannelInitializer的initChannel方法 就是在最开始我们定义的那个,然后里面就用继续调用pipeline add我们真正的handler
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {        if (ctx.channel().isRegistered()) {            // This should always be true with our current DefaultChannelPipeline implementation.            // The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering            // surprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers            // will be added in the expected order.            if (initChannel(ctx)) {                // We are done with init the Channel, removing the initializer now.                removeState(ctx);            }        }    }            private boolean initChannel(ChannelHandlerContext ctx) throws Exception {        if (initMap.add(ctx)) { // Guard against re-entrance.            try {                initChannel((C) ctx.channel());            } catch (Throwable cause) {                // Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).                // We do so to prevent multiple calls to initChannel(...).                exceptionCaught(ctx, cause);            } finally {                                //看这里删除pipeline中的 ChannelInitializer   ChannelInitializer 其实就是一个handler  只是它重写了handlerAdded方法                ChannelPipeline pipeline = ctx.pipeline();                if (pipeline.context(this) != null) {                    pipeline.remove(this);                }            }            return true;        }        return false;    }

关于"netty中pipeline如何添加handler"这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,使各位可以学到更多知识,如果觉得文章不错,请把它分享出去让更多的人看到。

0