千家信息网

59. Netty源代码分析-ServerBootstrap bind 过程-2

发表于:2025-02-03 作者:千家信息网编辑
千家信息网最后更新 2025年02月03日,一. 接上一篇https://blog.51cto.com/483181/2121265我们继续分析doBind0(regFuture, channel, localAddress, promise)
千家信息网最后更新 2025年02月03日59. Netty源代码分析-ServerBootstrap bind 过程-2

一. 接上一篇

https://blog.51cto.com/483181/2121265

我们继续分析doBind0(regFuture, channel, localAddress, promise)

 private ChannelFuture doBind(final SocketAddress localAddress) {        final ChannelFuture regFuture = initAndRegister();        final Channel channel = regFuture.channel();        if (regFuture.cause() != null) {            return regFuture;        }        if (regFuture.isDone()) {            // At this point we know that the registration was complete and successful.            ChannelPromise promise = channel.newPromise();            doBind0(regFuture, channel, localAddress, promise); //3. 我们这篇要分析的内容            return promise;        } else {            ...            return promise;        }    }

二. doBind0

2.1 4个参数

如上面代码,doBind0有4个参数regFuture, channel, localAddress, promise,它们的类型如下:
regFuture: DefaultChannelPromise
channel:NioServerSocketChannel
localAddress:SocketAddress
promise:DefaultChannelPromise

那继续往下面看代码,

private static void doBind0(            final ChannelFuture regFuture, final Channel channel,            final SocketAddress localAddress, final ChannelPromise promise) {        // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up        // the pipeline in its channelRegistered() implementation.        channel.eventLoop().execute(new Runnable() {            @Override            public void run() {                if (regFuture.isSuccess()) {                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);                } else {                    promise.setFailure(regFuture.cause());                }            }        });    }

doBind0()代码很简单,调用channel.eventloop()执行了一个Runnable。channel.eventloop()调用的是AbstractChannle.eventloop()

@Override    public EventLoop eventLoop() {        EventLoop eventLoop = this.eventLoop;        if (eventLoop == null) {            throw new IllegalStateException("channel not registered to an event loop");        }        return eventLoop;    }

而this.eventLoop初始化是在register的时候,具体可以参考上一篇bind初始化的分析

https://blog.51cto.com/483181/2121265

@Override        public final void register(EventLoop eventLoop, final ChannelPromise promise) {            ....            AbstractChannel.this.eventLoop = eventLoop;                        ...           }                

它的类型是一个NioEventLoop,所以它就是往自己的线程池里面丢了一个Runnable任务
NioEventLoop的继承关系图如下:

我们可以看一下NioEventLoop.execute(Runnable )方法.

2.2 execute(Runnable)方法

这个方法的实现是在SingleThreadEventExecutor.java里面。

private final Queue taskQueue;@Override    public void execute(Runnable task) {        if (task == null) {            throw new NullPointerException("task");        }        boolean inEventLoop = inEventLoop();        addTask(task);        if (!inEventLoop) {            startThread();            if (isShutdown() && removeTask(task)) {                reject();            }        }        if (!addTaskWakesUp && wakesUpForTask(task)) {            wakeup(inEventLoop);        }    }protected void addTask(Runnable task) {        if (task == null) {            throw new NullPointerException("task");        }        if (!offerTask(task)) {            reject(task);        }    }       final boolean offerTask(Runnable task) {        if (isShutdown()) {            reject();        }        return taskQueue.offer(task);    }       

它是把传入的Runnable对象放到一个taskQueue队列里面。

那我们继续看Runnable里面的实现,channel.bind(xxxx)

channel.eventLoop().execute(new Runnable() {            @Override            public void run() {                             ...               channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);            }        });

2.3 channel.bind

Channel的类型是NioServerSocketChannel,而bind方法的实现类是在AbstractChannel.java里面.

@Override    public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {        return pipeline.bind(localAddress, promise);    }

调用的是pipeline.bind(xxx),pipeline我们知道,它链接了ChannelHandler的Context,有head和tail。head是outbound,tail是inbound.

2.4 pipe.bind(xxx)

DefaultChannelPipeline.java

@Override    public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {        return tail.bind(localAddress, promise);    }

直接调用的是tail.bind,继续往下面看
AbstractChannelHandlerContext.java

@Override    public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {        final AbstractChannelHandlerContext next = findContextOutbound();        EventExecutor executor = next.executor();        if (executor.inEventLoop()) {            next.invokeBind(localAddress, promise);        } else {            safeExecute(executor, new Runnable() {                @Override                public void run() {                    next.invokeBind(localAddress, promise);                }            }, promise, null);        }        return promise;    }private AbstractChannelHandlerContext findContextOutbound() {        AbstractChannelHandlerContext ctx = this;        do {            ctx = ctx.prev;        } while (!ctx.outbound);        return ctx;    }       

它调用findContextOutbound(),然后调用它的bind方法,从以前的分析可以知道,outbound讲的是head。所以,我们来到head.bind方法

2.5 head.bind

DefaultChannelPipeline.java

@Override        public void bind(                ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)                throws Exception {            unsafe.bind(localAddress, promise);        }

调用的是unsafe.bind

2.6 unsafe.bind

AbstractChannel.java

@Override        public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {            assertEventLoop();            ...            boolean wasActive = isActive();            try {                doBind(localAddress);            } catch (Throwable t) {                ...            }            if (!wasActive && isActive()) {                invokeLater(new Runnable() {                    @Override                    public void run() {                        pipeline.fireChannelActive();                    }                });            }            safeSetSuccess(promise);        }

继续看doBind()

2.7 doBind()

doBind是在NioServerSocketChannel里面

@Override    protected void doBind(SocketAddress localAddress) throws Exception {        if (PlatformDependent.javaVersion() >= 7) {            javaChannel().bind(localAddress, config.getBacklog());        } else {            javaChannel().socket().bind(localAddress, config.getBacklog());        }    }

这样就调用了Java的接口绑定了我们传入的端口。

这样bind逻辑整个就分析完了,我们来分析一下整个流程以及类之间的关系。

三. 总结

3.1 各个类之间的关系

  1. EventLoopGroup里面包含若干个EventLoop,具体数目我们可以手动指定,如果不指定,一般默认就是cpu x 2个。
  2. EventLoop继承自SingleThreadEventLoop,表示一个线程,里面有个队列queue,用来存丢过来的Runnable任务.
  3. 我们一般实例化两个EventLoopGroup,一个是bossGroup,一个是workGroup,bossGroup用来接收客户端连接,连接可以用channel来描述。然后就把channel丢给workGroup,由workGroup具体负责这个channel后面的操作。

3.2 对应关系

  1. 一个EventLoopGroup包含多个EventLoop
  2. 一个EventLoop对应多个channel,一个channel只对应一个EventLoop。EventLoop一般比channel少,好比饭店里面服务员一般比客人少。所以,也可以想象如果某一个channel上EventLoop做的业务逻辑比较复杂的话,有可能造成来不及相应其他channel的请求。
  3. 一个NioServerSocketChannel里面包含一个pipeline,一个unsafe对象
0