netty中pipeline如何添加handler
发表于:2025-02-04 作者:千家信息网编辑
千家信息网最后更新 2025年02月04日,这篇文章将为大家详细讲解有关netty中pipeline如何添加handler,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。pipeline无疑是 netty中非常
千家信息网最后更新 2025年02月04日netty中pipeline如何添加handler
这篇文章将为大家详细讲解有关netty中pipeline如何添加handler,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。
pipeline无疑是 netty中非常重要的一环,在io处理中netty通过已存放在pipeline中的handler有序处理socket中的信息,而pipeline中包含的都是一个个的handler,它是怎么添加进去的呢
创建代码如下:其中有handler和childhandler
public void start() throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup group = new NioEventLoopGroup(); try { ServerBootstrap sb = new ServerBootstrap(); sb.option(ChannelOption.SO_BACKLOG, 1024); // 绑定线程池 sb.group(group, bossGroup) // 指定使用的channel .channel(NioServerSocketChannel.class) // 绑定监听端口 .localAddress(this.port) .handler(new LoggingHandler(LogLevel.INFO)) // 绑定客户端连接时候触发操作 .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); pipeline.addLast(new LengthFieldPrepender(4)); //字符串解码 pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8)); //字符串编码 pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8)); pipeline.addLast(new IdleStateHandler(0, 0, 120, TimeUnit.SECONDS)); } }); // 服务器异步创建绑定 ChannelFuture cf = sb.bind().sync(); log.info(NettyServer.class + " 启动正在监听: " + cf.channel().localAddress()); // 关闭服务器通道 cf.channel().closeFuture().sync(); } finally { // 释放线程池资源 group.shutdownGracefully().sync(); bossGroup.shutdownGracefully().sync(); } }
顺着服务端bind方法点进去会看到如下:创建完channel实例后 会调用一个init方法
final ChannelFuture initAndRegister() { Channel channel = null; try { channel = channelFactory.newChannel(); init(channel); } catch (Throwable t) { if (channel != null) { // channel can be null if newChannel crashed (eg SocketException("too many open files")) channel.unsafe().closeForcibly(); // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t); } ChannelFuture regFuture = config().group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture; }
init方法为抽象方法 服务端实现如下
void init(Channel channel) throws Exception {//省略若干 ChannelPipeline p = channel.pipeline(); final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry, Object>[] currentChildOptions; final Entry , Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0)); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0)); } //看这里 添加了一个ChannelInitializer实例 实例中实现了initChannel 获取配置的handler添加到pipeline p.addLast(new ChannelInitializer () { @Override public void initChannel(final Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); }
而pipeline的addlast方法如下
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) { final AbstractChannelHandlerContext newCtx; synchronized (this) { checkMultiplicity(handler); newCtx = newContext(group, filterName(name, handler), handler); //这方法很简单 添加到链表中 但是此时添加的是上文的ChannelInitializer 而不是我们定义的handler addLast0(newCtx); // If the registered is false it means that the channel was not registered on an eventLoop yet. // In this case we add the context to the pipeline and add a task that will call // ChannelHandler.handlerAdded(...) once the channel is registered. if (!registered) { newCtx.setAddPending(); callHandlerCallbackLater(newCtx, true); return this; } EventExecutor executor = newCtx.executor(); if (!executor.inEventLoop()) { callHandlerAddedInEventLoop(newCtx, executor); return this; } } //这个方法就是关键 将ChannelInitializer中的handler添加到pipeline中 以及删除ChannelInitializer //但请注意 首次注册不会走到这里 在前面的判断就会返回 会在注册的逻辑中触发 最终调用的一致 callHandlerAdded0(newCtx); return this; }
查看ChannelInitializer中的关键代码如下 OK 通过如下代码我们就看到 调用了ChannelInitializer的initChannel方法 就是在最开始我们定义的那个,然后里面就用继续调用pipeline add我们真正的handler
public void handlerAdded(ChannelHandlerContext ctx) throws Exception { if (ctx.channel().isRegistered()) { // This should always be true with our current DefaultChannelPipeline implementation. // The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering // surprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers // will be added in the expected order. if (initChannel(ctx)) { // We are done with init the Channel, removing the initializer now. removeState(ctx); } } } private boolean initChannel(ChannelHandlerContext ctx) throws Exception { if (initMap.add(ctx)) { // Guard against re-entrance. try { initChannel((C) ctx.channel()); } catch (Throwable cause) { // Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...). // We do so to prevent multiple calls to initChannel(...). exceptionCaught(ctx, cause); } finally { //看这里删除pipeline中的 ChannelInitializer ChannelInitializer 其实就是一个handler 只是它重写了handlerAdded方法 ChannelPipeline pipeline = ctx.pipeline(); if (pipeline.context(this) != null) { pipeline.remove(this); } } return true; } return false; }
关于"netty中pipeline如何添加handler"这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,使各位可以学到更多知识,如果觉得文章不错,请把它分享出去让更多的人看到。
方法
服务
代码
实例
就是
篇文章
关键
字符
字符串
更多
服务器
线程
处理
监听
不错
实用
有序
重要
一致
上文
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
mt4设置服务器
服务器安全组888
美国的科技互联网公司
网络安全信息办是做什么
阿里应用服务器部署价格
万国网络软件开发
mysql数据库节点
河北服务器硬盘价格
做网站需要多少服务器
新城区系统软件开发
做网络安全的软件工程师
怎么获取网吧服务器管理权限
赛亚网络安全工程师
长丰正规网络技术市场报价
木叶传说数据库
c 软件开发应聘要求
卡通抠图软件开发
c 数据库监控源码
手机软件开发流程视频教程
网络安全设备分类有哪些
工控系统网络安全人才
服务器红灯亮是什么原因
江苏软件开发专业团队
公安刑侦网关于网络安全提醒
罗尼-普莱斯数据库
制作网络安全书签
查询数据库是否运行
爱玛数据库
ebsco全文数据库
票的宝安全接入服务器地址