netty中pipeline如何添加handler
发表于:2024-11-12 作者:千家信息网编辑
千家信息网最后更新 2024年11月12日,这篇文章将为大家详细讲解有关netty中pipeline如何添加handler,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。pipeline无疑是 netty中非常
千家信息网最后更新 2024年11月12日netty中pipeline如何添加handler
这篇文章将为大家详细讲解有关netty中pipeline如何添加handler,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。
pipeline无疑是 netty中非常重要的一环,在io处理中netty通过已存放在pipeline中的handler有序处理socket中的信息,而pipeline中包含的都是一个个的handler,它是怎么添加进去的呢
创建代码如下:其中有handler和childhandler
public void start() throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup group = new NioEventLoopGroup(); try { ServerBootstrap sb = new ServerBootstrap(); sb.option(ChannelOption.SO_BACKLOG, 1024); // 绑定线程池 sb.group(group, bossGroup) // 指定使用的channel .channel(NioServerSocketChannel.class) // 绑定监听端口 .localAddress(this.port) .handler(new LoggingHandler(LogLevel.INFO)) // 绑定客户端连接时候触发操作 .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); pipeline.addLast(new LengthFieldPrepender(4)); //字符串解码 pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8)); //字符串编码 pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8)); pipeline.addLast(new IdleStateHandler(0, 0, 120, TimeUnit.SECONDS)); } }); // 服务器异步创建绑定 ChannelFuture cf = sb.bind().sync(); log.info(NettyServer.class + " 启动正在监听: " + cf.channel().localAddress()); // 关闭服务器通道 cf.channel().closeFuture().sync(); } finally { // 释放线程池资源 group.shutdownGracefully().sync(); bossGroup.shutdownGracefully().sync(); } }
顺着服务端bind方法点进去会看到如下:创建完channel实例后 会调用一个init方法
final ChannelFuture initAndRegister() { Channel channel = null; try { channel = channelFactory.newChannel(); init(channel); } catch (Throwable t) { if (channel != null) { // channel can be null if newChannel crashed (eg SocketException("too many open files")) channel.unsafe().closeForcibly(); // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t); } ChannelFuture regFuture = config().group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture; }
init方法为抽象方法 服务端实现如下
void init(Channel channel) throws Exception {//省略若干 ChannelPipeline p = channel.pipeline(); final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry, Object>[] currentChildOptions; final Entry , Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0)); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0)); } //看这里 添加了一个ChannelInitializer实例 实例中实现了initChannel 获取配置的handler添加到pipeline p.addLast(new ChannelInitializer () { @Override public void initChannel(final Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); }
而pipeline的addlast方法如下
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) { final AbstractChannelHandlerContext newCtx; synchronized (this) { checkMultiplicity(handler); newCtx = newContext(group, filterName(name, handler), handler); //这方法很简单 添加到链表中 但是此时添加的是上文的ChannelInitializer 而不是我们定义的handler addLast0(newCtx); // If the registered is false it means that the channel was not registered on an eventLoop yet. // In this case we add the context to the pipeline and add a task that will call // ChannelHandler.handlerAdded(...) once the channel is registered. if (!registered) { newCtx.setAddPending(); callHandlerCallbackLater(newCtx, true); return this; } EventExecutor executor = newCtx.executor(); if (!executor.inEventLoop()) { callHandlerAddedInEventLoop(newCtx, executor); return this; } } //这个方法就是关键 将ChannelInitializer中的handler添加到pipeline中 以及删除ChannelInitializer //但请注意 首次注册不会走到这里 在前面的判断就会返回 会在注册的逻辑中触发 最终调用的一致 callHandlerAdded0(newCtx); return this; }
查看ChannelInitializer中的关键代码如下 OK 通过如下代码我们就看到 调用了ChannelInitializer的initChannel方法 就是在最开始我们定义的那个,然后里面就用继续调用pipeline add我们真正的handler
public void handlerAdded(ChannelHandlerContext ctx) throws Exception { if (ctx.channel().isRegistered()) { // This should always be true with our current DefaultChannelPipeline implementation. // The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering // surprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers // will be added in the expected order. if (initChannel(ctx)) { // We are done with init the Channel, removing the initializer now. removeState(ctx); } } } private boolean initChannel(ChannelHandlerContext ctx) throws Exception { if (initMap.add(ctx)) { // Guard against re-entrance. try { initChannel((C) ctx.channel()); } catch (Throwable cause) { // Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...). // We do so to prevent multiple calls to initChannel(...). exceptionCaught(ctx, cause); } finally { //看这里删除pipeline中的 ChannelInitializer ChannelInitializer 其实就是一个handler 只是它重写了handlerAdded方法 ChannelPipeline pipeline = ctx.pipeline(); if (pipeline.context(this) != null) { pipeline.remove(this); } } return true; } return false; }
关于"netty中pipeline如何添加handler"这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,使各位可以学到更多知识,如果觉得文章不错,请把它分享出去让更多的人看到。
方法
服务
代码
实例
就是
篇文章
关键
字符
字符串
更多
服务器
线程
处理
监听
不错
实用
有序
重要
一致
上文
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
高档服务器插座
腾讯服务器能恢复微信聊天记录吗
网络安全事件处置措施
存货监管软件开发定制公司
服务器能上youtube
查询中奖记录使用什么数据库
CNKI数据库中医学专辑
科技互联网之窗
未心网络安全团队
光学 软件开发
网络安全攻防沙盘
上海地区软件开发
铜陵直播软件开发
广东教育软件开发收费
天津惠普服务器维修多少钱
服务器未认证继续进行安全连接
越界科技互联网
充电桩大数据库
永兴电脑软件开发学费多少
软件开发人员自学
软件开发学习哪个方向
网络技术复习知识点
网络安全意识文献
深圳卫星软件开发批发价
数据库学生表班级表如何对应
什么是科技互联网
画饭圈乱象网络安全
哪些学校有网络安全专业
充电桩大数据库
5种关系数据库