千家信息网

58. Netty源代码分析-ServerBootstrap bind 过程-1

发表于:2024-12-12 作者:千家信息网编辑
千家信息网最后更新 2024年12月12日,一. 开始接上一篇 ServerBootstrap的初始化https://blog.51cto.com/483181/2119149二. bind过程2.1 代码先看下调用的源代码public voi
千家信息网最后更新 2024年12月12日58. Netty源代码分析-ServerBootstrap bind 过程-1

一. 开始

接上一篇 ServerBootstrap的初始化
https://blog.51cto.com/483181/2119149

二. bind过程

2.1 代码

先看下调用的源代码

public void bind(int port) throws Exception {        ...        try {            ...            ChannelFuture f = b.bind(port).sync(); //bind过程            ...        } catch (Exception e) {            e.printStackTrace();        } finally {            bossGroup.shutdownGracefully();            workerGroup.shutdownGracefully();        }    }

2.2 bind

public ChannelFuture bind(int inetPort) {        return bind(new InetSocketAddress(inetPort));    }public ChannelFuture bind(SocketAddress localAddress) {        validate();        if (localAddress == null) {            throw new NullPointerException("localAddress");        }        return doBind(localAddress);}

从上面代码可以看出几点:

  1. bind方法逻辑很简单,经过一系列的判断后最后调用doBind()方法
  2. 发现Netty代码里面,从外面调用进去后,内部方法一般用doxxx,xxx0这种命名;以前自己看安卓源代码的时候,安卓一般喜欢用xxxInner的命名。风格而已,也许自己以后写代码可以参考(看源代码除了了解原理外,学习别人的代码架构方法也是一种收获)。

继续看doBind

2.3 doBind

private ChannelFuture doBind(final SocketAddress localAddress) {        final ChannelFuture regFuture = initAndRegister(); //1. init和register        final Channel channel = regFuture.channel();        if (regFuture.cause() != null) {            return regFuture;        }        if (regFuture.isDone()) {             // At this point we know that the registration was complete and successful.            ChannelPromise promise = channel.newPromise();            doBind0(regFuture, channel, localAddress, promise);            return promise;        } else {            // Registration future is almost always fulfilled already, but just in case it's not.            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);            regFuture.addListener(new ChannelFutureListener() {                @Override                public void operationComplete(ChannelFuture future) throws Exception {                    Throwable cause = future.cause();                    if (cause != null) {                        // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an                        // IllegalStateException once we try to access the EventLoop of the Channel.                        promise.setFailure(cause);                    } else {                        // Registration was successful, so set the correct executor to use.                        // See https://github.com/netty/netty/issues/2586                        promise.registered();                        doBind0(regFuture, channel, localAddress, promise);                    }                }            });            return promise;        }    }

上面这一段代码包含的东西就比较多了,先来看 initAndRegister

2.4 initAndRegister

顾名思义,这个方法包含初始化和注册两个步骤,代码如下:

final ChannelFuture initAndRegister() {        Channel channel = null;        try {            channel = channelFactory.newChannel();            init(channel);        } catch (Throwable t) {            ...        }        ChannelFuture regFuture = config().group().register(channel);        if (regFuture.cause() != null) {            if (channel.isRegistered()) {                channel.close();            } else {                channel.unsafe().closeForcibly();            }        }        return regFuture;    }

从上面代码,我们可以看到几点:

  1. channel = channelFactory.newChannel();
    channelFactory是什么?它的类型是ReflectiveChannelFactory,如果大家不记得了,可以看看上一篇channel设置那个地方。
    https://blog.51cto.com/483181/2119149
public B channel(Class channelClass) {        return channelFactory(new ReflectiveChannelFactory(channelClass));    }public class ReflectiveChannelFactory implements ChannelFactory {    @Override    public T newChannel() {        try {            return clazz.getConstructor().newInstance();        } catch (Throwable t) {        }    }   }       

它的newChannel方法也是非常的简单,直接实例化传入的channel对象,也就是NioServerSocketChannel (可以看上一篇初始化的分析)
代码如下:

ServerBootstrap b = new ServerBootstrap();            b.group(bossGroup, workerGroup)                .channel(NioServerSocketChannel.class)

我们先看看NioServerSocketChannel的实现

2.5 NioServerSocketChannel

先看下NioServerSocketChannel的继承关系

NioServerSocketChannel提供了一个无参构造函数,然后分别有SelectorProvider,ServerSocketChannel的构造函数,如下:

private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();    private static ServerSocketChannel newSocket(SelectorProvider provider) {        try {            return provider.openServerSocketChannel();        } catch (IOException e) {        }    }    private final ServerSocketChannelConfig config;    /**     * Create a new instance     */    public NioServerSocketChannel() {        this(newSocket(DEFAULT_SELECTOR_PROVIDER));    }    /**     * Create a new instance using the given {@link ServerSocketChannel}.     */    public NioServerSocketChannel(ServerSocketChannel channel) {        super(null, channel, SelectionKey.OP_ACCEPT);        config = new NioServerSocketChannelConfig(this, javaChannel().socket());    }

无参构造函数里面调用newSocket(xx),参数是SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
先看看SelectorProvider.provider()

private static SelectorProvider provider = null;public static SelectorProvider provider() {        synchronized (lock) {            if (provider != null)                return provider;                ...        }    }

可以看到provider是个单例,不知道大家是否记得上上一篇文章(NioEventLoopGroup实例化)分析的时候也有provider,类型是KQueueSelectorProvider
具体可以看: https://blog.51cto.com/483181/2118817

回到newSocket里面,调用的是provider.openServerSocketChannel()

代码是SelectorProviderImpl里面,返回的是 ServerSocketChannel

public ServerSocketChannel openServerSocketChannel() throws IOException {        return new ServerSocketChannelImpl(this);    }

得到ServerSocketChannel之后,继续调用构造函数

public NioServerSocketChannel(ServerSocketChannel channel) {        super(null, channel, SelectionKey.OP_ACCEPT);        config = new NioServerSocketChannelConfig(this, javaChannel().socket());    }

这个构造方法里面做了两件事

  1. 调用父类的构造方法
  2. 利用刚刚生成好的ServerSocketChannel实例化了一个NioServerSocketChannelConfig

看它的父类构造函数是怎么实现的

首先是AbstractNioChannel.java

protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {        super(parent);        this.ch = ch;        this.readInterestOp = readInterestOp;        try {            ch.configureBlocking(false);        } catch (IOException e) {        }    }
  1. 继续调用父类的构造方法
  2. 首先吧传入的ServerSocketChannel保存起来,变量是ch
  3. 然后把readInterestOp存起来,变量是readInterestOp,值是SelectionKey.OP_ACCEPT
  4. 调用ch.configureBlocking(false);把channel设置成非阻塞。
    这里稍微介绍下SelectionKey.OP_ACCEPT
    SelectionKey有4种类型,是java提供的,分别是
public static final int OP_READ = 1 << 0;public static final int OP_WRITE = 1 << 2;public static final int OP_CONNECT = 1 << 3;public static final int OP_ACCEPT = 1 << 4;

然后继续看AbstractNioChannel的父类构造方法,也就是AbstractChannel

private final ChannelId id;protected abstract AbstractUnsafe newUnsafe();private final DefaultChannelPipeline pipeline;protected AbstractChannel(Channel parent) {        this.parent = parent;        id = newId();        unsafe = newUnsafe();        pipeline = newChannelPipeline();}

可以看到这几点:

  1. Channel parent变量,null
  2. 初始化ChannelId id
  3. 初始化unsafe
  4. 初始化pipeline

先看unsafe的初始化

2.6 newUnsafe

在AbstractChannel里面,它是一个抽象类

protected abstract AbstractUnsafe newUnsafe();

实现类在子类AbstractNioMessageChannel里面,如下,类型是NioMessageUnsafe

@Override    protected AbstractNioUnsafe newUnsafe() {        return new NioMessageUnsafe();    }

NioMessageUnsafe代码后面再看。

继续看pipeline的初始化,初始化了一个 DefaultChannelPipeline

protected DefaultChannelPipeline newChannelPipeline() {        return new DefaultChannelPipeline(this);    }
protected DefaultChannelPipeline(Channel channel) {        this.channel = ObjectUtil.checkNotNull(channel, "channel");        succeededFuture = new SucceededChannelFuture(channel, null);        voidPromise =  new VoidChannelPromise(channel, true);        tail = new TailContext(this);        head = new HeadContext(this);        head.next = tail;        tail.prev = head;    }

在DefaultChannelPipeline里面初始化了一个head和tail,分别是HeadContext和TailConext类型,而且head和tail组成双向链表。
head和tail的区别之一就是inbound和outbound值是相反的,如下:

节点inboundoutbound
headfalsetrue
tailtruefalse
HeadContext(DefaultChannelPipeline pipeline) {            super(pipeline, null, HEAD_NAME, false, true);            unsafe = pipeline.channel().unsafe();            setAddComplete();        }TailContext(DefaultChannelPipeline pipeline) {            super(pipeline, null, TAIL_NAME, true, false);            setAddComplete();        }               

借一张图显示下ChannelInBound和ChannelOutBound,如下。head是发送出去的入口,tail是接收消息的入口。

另外我们来看一下添加一个ChannelHandler的流程,比如addLast

public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {        final AbstractChannelHandlerContext newCtx;        synchronized (this) {            checkMultiplicity(handler);            newCtx = newContext(group, filterName(name, handler), handler);            addLast0(newCtx);            ...                    return this;    }private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {        return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);    }       private void addLast0(AbstractChannelHandlerContext newCtx) {        AbstractChannelHandlerContext prev = tail.prev;        newCtx.prev = prev;        newCtx.next = tail;        prev.next = newCtx;        tail.prev = newCtx;    }       
  1. 首先它初始化了一个DefaultChannelHandlerContext对象,里面封装了要add的channelHandler,这个很重要,在Netty的pipeLine里面,都是通过ChannelHandlerContext来描述的,不是直接添加channelHandler。

  2. addLast0()里面就是简单的双向链表添加的方法,把封装了channelHandler的ChannelHandlerContext对象添加到tail的前一个节点。

那,我们来总结下NioServerSocketChannel的初始化过程:

1. NioServerSocketChannel提供了一个无参构造函数,里面SelectorProvider DEFAULT_SELECTOR_PROVIDER,它是一个单例,类型是KQueueSelectorProvider。

2. 我们调用KQueueSelectorProvider.openServerSocketChannel()方法,得到一个ServerSocketChannel

3. 我们用生成的ServerSocketChannel对象创建了一个ServerSocketChannelConfig config,具体是NioServerSocketChannelConfig对象,存在NioServerSocketChannel里面

4. 我们用生成的ServerSocketChannel调用它的父类构造函数,先来到了AbstractNioChannel

5. 在AbstractNioChannel会把ServerSocketChannel存起来,变量是ch,然后把channel设置成非阻塞。

6. AbstractNioChannel还会把readInterestOp存起来,类型是SelectionKey.OP_ACCEPT

7. 继续调用父类构造函数,来到AbstractChannel

8. AbstractChannel里面的parent设置成null

9. AbstractChannel初始化channel id

10. AbstractChannel初始化unsafe,类型是NioMessageUnsafe.

11. AbstractChannel初始化pipeline,类型是DefaultChannelPipeline, 每个Channel都有一个自己的Pipeline

看完NioServerSocketChannel的实例化方法后,我们继续往下看init

2.7 init

abstract void init(Channel channel) throws Exception;

AbstractBootstrap里面的init(channel)方法是一个抽象方法,参数是Channel类型,其实就是上一步实例化好的NioServerSocketChannel对象。

具体实现方法在它的子类ServerBootstrap和Bootstrap(给客户端启动使用的),那我们是分析服务端的代码,所以看ServerBootstrap里面的实现。

void init(Channel channel) throws Exception {        final Map, Object> options = options0();        synchronized (options) { //1. 设置options            setChannelOptions(channel, options, logger);        }        final Map, Object> attrs = attrs0();        synchronized (attrs) {            for (Entry, Object> e: attrs.entrySet()) { //设置attr属性                @SuppressWarnings("unchecked")                AttributeKey key = (AttributeKey) e.getKey();                channel.attr(key).set(e.getValue());            }        }        ChannelPipeline p = channel.pipeline();        final EventLoopGroup currentChildGroup = childGroup;        final ChannelHandler currentChildHandler = childHandler;        final Entry, Object>[] currentChildOptions;        final Entry, Object>[] currentChildAttrs;        synchronized (childOptions) {            currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));        }        synchronized (childAttrs) {            currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));        }        p.addLast(new ChannelInitializer() {            @Override            public void initChannel(final Channel ch) throws Exception {                final ChannelPipeline pipeline = ch.pipeline();                ChannelHandler handler = config.handler();                if (handler != null) {                    pipeline.addLast(handler);                }                ch.eventLoop().execute(new Runnable() {                    @Override                    public void run() {                        pipeline.addLast(new ServerBootstrapAcceptor(                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));                    }                });            }        });    }

先来看设置options

2.8 setOptions

final Map, Object> options = options0();        synchronized (options) { //1. 设置options            setChannelOptions(channel, options, logger);        }static void setChannelOptions(            Channel channel, Map, Object> options, InternalLogger logger) {        for (Map.Entry, Object> e: options.entrySet()) {            setChannelOption(channel, e.getKey(), e.getValue(), logger);        }    }private static void setChannelOption(            Channel channel, ChannelOption option, Object value, InternalLogger logger) {        try {            if (!channel.config().setOption((ChannelOption) option, value)) {            }        } catch (Throwable t) {}    }       

这段代码我们这样看

  1. options是哪来的?
    options是一个map,服务器代码是这样设置的
 b.xxxx.    .option(ChannelOption.SO_BACKLOG, 100)
  1. 它其实调用的是channel.config()对象去设置option,那config对象是什么呢?这个上面分析Channel初始化的时候说过,它是NioServerSocketChannelConfig对象,NioServerSocketChannelConfig的类继承关系如下:

`

  1. 所以setOption的实现在DefaultServerSocketChannelConfig里面
@Override    public  boolean setOption(ChannelOption option, T value) {        validate(option, value);        if (option == SO_RCVBUF) {            setReceiveBufferSize((Integer) value);        } else if (option == SO_REUSEADDR) {            setReuseAddress((Boolean) value);        } else if (option == SO_BACKLOG) {            setBacklog((Integer) value);        } else {            return super.setOption(option, value);        }        return true;    }父类 DefaultChannelConfig.javapublic  boolean setOption(ChannelOption option, T value) {        validate(option, value);        if (option == CONNECT_TIMEOUT_MILLIS) {            setConnectTimeoutMillis((Integer) value);        } else if (option == MAX_MESSAGES_PER_READ) {            setMaxMessagesPerRead((Integer) value);        } else if (option == WRITE_SPIN_COUNT) {            setWriteSpinCount((Integer) value);        } else if (option == ALLOCATOR) {            setAllocator((ByteBufAllocator) value);        } else if (option == RCVBUF_ALLOCATOR) {            setRecvByteBufAllocator((RecvByteBufAllocator) value);        } else if (option == AUTO_READ) {            setAutoRead((Boolean) value);        } else if (option == AUTO_CLOSE) {            setAutoClose((Boolean) value);        } else if (option == WRITE_BUFFER_HIGH_WATER_MARK) {            setWriteBufferHighWaterMark((Integer) value);        } else if (option == WRITE_BUFFER_LOW_WATER_MARK) {            setWriteBufferLowWaterMark((Integer) value);        } else if (option == WRITE_BUFFER_WATER_MARK) {            setWriteBufferWaterMark((WriteBufferWaterMark) value);        } else if (option == MESSAGE_SIZE_ESTIMATOR) {            setMessageSizeEstimator((MessageSizeEstimator) value);        } else if (option == SINGLE_EVENTEXECUTOR_PER_GROUP) {            setPinEventExecutorPerGroup((Boolean) value);        } else {            return false;        }        return true;    }       

根据传入的属性不行,用不同的方法进行设置,这些属性的值大家可以去单独百度,可能不同的环境配置不同的值对服务器性能有好处。

那继续往下面看,设置attr

2.9 setAttr

setAttr是封装了一个Attribute的类,然后存储key,value,大家具体要看的话,可以看DefaultAttributeMap.java

继续往下看

2.10 addLast

p.addLast(new ChannelInitializer() {            @Override            public void initChannel(final Channel ch) throws Exception {                final ChannelPipeline pipeline = ch.pipeline();                ChannelHandler handler = config.handler();                if (handler != null) {                    pipeline.addLast(handler);                }                ch.eventLoop().execute(new Runnable() {                    @Override                    public void run() {                        pipeline.addLast(new ServerBootstrapAcceptor(                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));                    }                });            }        });

上面这段代码,我们一步步看

  1. 首先,config.handler()是哪里来的?其实就是我们设置的handler,这一点可以从上一篇分析看到
    https://blog.51cto.com/483181/2119149
b..handler(new LoggingHandler(LogLevel.INFO));

所以 pipeline.addLast(handler); 就是把我们设置的handler添加到pipeline里面。

  1. 然后又实例化了一个ServerBootstrapAcceptor,把childHandler那些参数都传了进去,具体在ServerBootstrapAcceptor里面怎么使用这些childHandler的.
ch.eventLoop().execute(new Runnable() {                    @Override                    public void run() {                        pipeline.addLast(new ServerBootstrapAcceptor(                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));                    }                });

ServerBootstrapAcceptor是把客户端连接的channel从bossGroup转移到workGroup,代码如下:
ServerBootstrap.java

@Override        @SuppressWarnings("unchecked")        public void channelRead(ChannelHandlerContext ctx, Object msg) {            final Channel child = (Channel) msg;            child.pipeline().addLast(childHandler);            setChannelOptions(child, childOptions, logger);            try {                childGroup.register(child).addListener(new ChannelFutureListener() {                    @Override                    public void operationComplete(ChannelFuture future) throws Exception {                        if (!future.isSuccess()) {                            forceClose(child, future.cause());                        }                    }                });            } catch (Throwable t) {                forceClose(child, t);            }        }

上面这段代码把客户端的channel读进来转换成一个channel类型,然后调用childGroup,然后把channel注册进去,这样workGroup就接手了channel后面的事情。

那init就看完了,总结一下init做的事情

  1. 设置options,参数有很多,不同的服务器业务可以用不用的参数。
  2. 设置attr
  3. 把handler添加到pipeLine的尾部
  4. 初始化了一个ServerBootstrapAcceptor,里面封装了childHandler的那些参数。

其实看到这里,我们会发现init还只是初始化参数,把handler添加到pipeLine里面,做好一切准备,并没有bind服务器端口。

那我们继续看

ChannelFuture regFuture = config().group().register(channel);

2.12 register

先继续贴一下initAndRegister的代码,因为上面讲的东西有点多,大家可能忘记initAndRegister里面的代码了。

final ChannelFuture initAndRegister() {        Channel channel = null;        try {            channel = channelFactory.newChannel(); //1. NioServerSocketChannel的初始化已经讲了            init(channel); //2. init过程已经讲了        } catch (Throwable t) {        }        ChannelFuture regFuture = config().group().register(channel); //3. 现在讲register        if (regFuture.cause() != null) {            if (channel.isRegistered()) {                channel.close();            } else {                channel.unsafe().closeForcibly();            }        }        return regFuture;    }

如同上面的注释,我们讲register过程

  1. config.group()是什么呢?参考我们上一篇的ServerBootstrap初始化,config.group()指的bossGroup,类型是NioEventLoopGroup
    ServerBootstrap初始化
EventLoopGroup bossGroup = new NioEventLoopGroup();        EventLoopGroup workerGroup = new NioEventLoopGroup();        try {            ServerBootstrap b = new ServerBootstrap();            b.group(bossGroup, workerGroup)

由于NioEventLoopGroup继承自MultithreadEventLoopGroup,所以调用的是MultithreadEventLoopGroup的register(channel)方法,如下:

public ChannelFuture register(Channel channel) {        return next().register(channel);    }@Override    public EventExecutor next() {        return chooser.next();    }       

那next()又是什么呢?在上篇 NioEventLoopGroup实例化 里面我们分析了,NioEventLoopGroup里面初始化了跟传入线程数目相同的NioEventLoop对象,而next()方法有两种算法选出下一个NioEventLoop对象是什么。

这两种算法是PowerOfTwoEventExecutorChooser和GenericEventExecutorChooser,所以我们就可以知道继续会调用NioEventLoop对象的register(channel)对象。

而NioEventLoop类并没有实现register(channel)方法,它继承自SingleThreadEventLoop,它里面有实现register(channel)方法,如下:

public ChannelFuture register(Channel channel) {        return register(new DefaultChannelPromise(channel, this));    }

这个方法里面实例化了一个DefaultChannelPromise对象,它其实就是保存channel和当前的NioEventLoop对象,做了一层封装而已,如下:

public DefaultChannelPromise(Channel channel, EventExecutor executor) {        super(executor);        this.channel = checkNotNull(channel, "channel");    }public DefaultPromise(EventExecutor executor) {        this.executor = checkNotNull(executor, "executor");    }       

所以我们可以暂时不管它,继续往下面走.

@Override    public ChannelFuture register(final ChannelPromise promise) {        ObjectUtil.checkNotNull(promise, "promise");        promise.channel().unsafe().register(this, promise);        return promise;    }

调用的是unsafe.register(this, promise)

那unsafe是什么对象呢?从上面2.6可以看到unsafe()初始化的是NioMessageUnsafe对象

protected AbstractNioUnsafe newUnsafe() {        return new NioMessageUnsafe();    }

由于NioMessageUnsafe并没有重写register(EventLoop eventLoop, ChannelPromise promise)方法,所以追踪它的父类,最后在AbstractUnsafe里面看到了register(EventLoop eventLoop, ChannelPromise promise),如下:
先附上NioMessageUnsafe的继承关系图:

AbstractUnsafe.java

public final void register(EventLoop eventLoop, final ChannelPromise promise) {            ...            AbstractChannel.this.eventLoop = eventLoop;            if (eventLoop.inEventLoop()) {                register0(promise);            } else {                try {                    eventLoop.execute(new Runnable() {                        @Override                        public void run() {                            register0(promise);                        }                    });                } catch (Throwable t) {                    ...                }            }        }

都会走到register0(promise)这个方法里面,继续看register0(promise)

private void register0(ChannelPromise promise) {            try {                ...                boolean firstRegistration = neverRegistered;                doRegister(); //1.                 neverRegistered = false;                registered = true;                // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the                // user may already fire events through the pipeline in the ChannelFutureListener.                pipeline.invokeHandlerAddedIfNeeded();                safeSetSuccess(promise);                pipeline.fireChannelRegistered();                // Only fire a channelActive if the channel has never been registered. This prevents firing                // multiple channel actives if the channel is deregistered and re-registered.                if (isActive()) {                    if (firstRegistration) {                        pipeline.fireChannelActive();                    } else if (config().isAutoRead()) {                        // This channel was registered before and autoRead() is set. This means we need to begin read                        // again so that we process inbound data.                        //                        // See https://github.com/netty/netty/issues/4805                        beginRead();                    }                }            } catch (Throwable t) {                ...            }        }

先看doRegister

2.13 doRegister

这个方法在AbstractChannel里面,是个空实现

/**     * Is called after the {@link Channel} is registered with its {@link EventLoop} as part of the register process.     *     * Sub-classes may override this method     */    protected void doRegister() throws Exception {        // NOOP    }

在AbstractNioChannel里面有重写

@Override    protected void doRegister() throws Exception {        boolean selected = false;        for (;;) {            try {                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);                return;            } catch (CancelledKeyException e) {                ...            }        }    }protected SelectableChannel javaChannel() {        return ch;}       
  1. 首先,ch是ServerSocketChannelImpl类型,这个可以从上面 2.5 NioServerSocketChannel的初始化可以看出来来
public ServerSocketChannel openServerSocketChannel() throws IOException {        return new ServerSocketChannelImpl(this);}

ServerSocketChannelImpl是JDK提供的类,那javaChannel().register(xxx)就是调用JDK nio的方法实现register,那就不继续深入下去了。

  1. 但是这里有个疑惑,调用register的时候传入的ops是0,并没有使用上面4种监听类型的任何一种,这个先记下来。
public static final int OP_READ = 1 << 0;public static final int OP_WRITE = 1 << 2;public static final int OP_CONNECT = 1 << 3;public static final int OP_ACCEPT = 1 << 4;
  1. eventLoop().unwrappedSelector()是什么呢?
    从上一篇NioEventGroupLoop初始化 2.2.3分析可以知道,它是一个KQueueSelectorImpl,继承自Selector

那我们可以这样理解,上面这段代码是把一个Selector对象注册到Java的 Channel里面,这个Channel和我们上面讲的Netty Channel不是一个东西。

继续看register0()

2.14 pipeline.fireChannelRegistered()

private void register0(ChannelPromise promise) {            try {                ...                doRegister(); //1. 把selector注册到Java channel, ops = 0                ...                pipeline.fireChannelRegistered(); //2. 通知handler channel已经注册                                if (isActive()) {                    if (firstRegistration) {                        pipeline.fireChannelActive();                    } else if (config().isAutoRead()) {                        // This channel was registered before and autoRead() is set. This means we need to begin read                        // again so that we process inbound data.                        //                        // See https://github.com/netty/netty/issues/4805                        beginRead();                    }                }                ...            } catch (Throwable t) {                ...            }        }

pipeline里面维护channelHandler的列表,通过链表的方法,如DefaultChannelPipeline.java里面

final AbstractChannelHandlerContext head;final AbstractChannelHandlerContext tail;

然后通知channel registered,如果channelHandler有重写channelRegitstered(ChannelHandlerContext ctx)的话,就会被回调。如LoggingHandler就会打印

然后判断isActive(),isActive()是一个多态方法,对于服务器,它是判断监听是否启动;
NioServerSocketChannle.java

@Override    public boolean isActive() {        return javaChannel().socket().isBound();    }

对于客户端,它是判断TCP连接是否完成
NioSocketChannel.java

@Override    public boolean isActive() {        SocketChannel ch = javaChannel();        return ch.isOpen() && ch.isConnected();    }

我们这里直讲服务器,如果isActive(),那么就会调用 pipeline.fireChannelActive(); 通知channelHander已经active,这样就会回调他们的channelActive方法。

继续看pipeline.fireChannelActive();

DefaultChannelPipeline.java

@Override    public final ChannelPipeline fireChannelActive() {        AbstractChannelHandlerContext.invokeChannelActive(head);        return this;    }

AbstractChannelHandlerContext.invokeChannelActive方法就不看了,就是调用参数的channelActive。由于参数是head,那么我们去看channelActive方法。

DefaultChannelPipeline.java

@Override        public void channelActive(ChannelHandlerContext ctx) throws Exception {            ctx.fireChannelActive();            readIfIsAutoRead();        }private void readIfIsAutoRead() {            if (channel.config().isAutoRead()) {                channel.read();            }        }               

调用的是channel.read(),channel是NioServerSocketChannel,它的实现是在父类AbstractChannel.java里面

@Override    public Channel read() {        pipeline.read();        return this;    }

DefaultChannelPipeline.java

@Override    public final ChannelPipeline read() {        tail.read();        return this;    }

AbstractChannelHandlerContext.java

@Override    public ChannelHandlerContext read() {        final AbstractChannelHandlerContext next = findContextOutbound();        EventExecutor executor = next.executor();        if (executor.inEventLoop()) {            next.invokeRead();        } else {            ...        }        return this;    }private AbstractChannelHandlerContext findContextOutbound() {        AbstractChannelHandlerContext ctx = this;        do {            ctx = ctx.prev;        } while (!ctx.outbound);        return ctx;    }       

首先要寻找findContextOutbound,由于head的inbound=false,outbound=true,所以next=head,那么就是调用head的read方法,如下:
DefaultChannelPipeline.java

@Override        public void read(ChannelHandlerContext ctx) {            unsafe.beginRead();        }

AbstractChannel.java

@Override        public final void beginRead() {            assertEventLoop();            if (!isActive()) {                return;            }            try {                doBeginRead();            } catch (final Exception e) {                ...            }        }

直接看doBeginRead()

AbstractNioChannel.java

@Override    protected void doBeginRead() throws Exception {        // Channel.read() or ChannelHandlerContext.read() was called        final SelectionKey selectionKey = this.selectionKey;        if (!selectionKey.isValid()) {            return;        }        readPending = true;        final int interestOps = selectionKey.interestOps();        if ((interestOps & readInterestOp) == 0) {            selectionKey.interestOps(interestOps | readInterestOp);        }    }

还记得我们初始化NioServerSocketChannel的时候,我们传给父类的readInterestOp吗?没错,就是SelectionKey.OP_ACCEPT,如下:

public NioServerSocketChannel(ServerSocketChannel channel) {        super(null, channel, SelectionKey.OP_ACCEPT);        config = new NioServerSocketChannelConfig(this, javaChannel().socket());    }protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {        super(parent);        this.ch = ch;        this.readInterestOp = readInterestOp;                ...}               

上面doReadBegin就是把我们设置的readInterestOp重新设置到java selector上面,代表我们监听的类型是SelectionKey.OP_ACCEPT,不在是最开始的0了。

到这里,initAndRegister方法就基本讲完了,再贴一次它的代码,加深下印象。

final ChannelFuture initAndRegister() {        Channel channel = null;        try {            channel = channelFactory.newChannel(); //1. 实例化NioServerSocketChannel            init(channel); //2. 初始化        } catch (Throwable t) {        }        ChannelFuture regFuture = config().group().register(channel); //3. 注册selector到Java channel上面,注册类型是0        if (regFuture.cause() != null) {            if (channel.isRegistered()) {                channel.close();            } else {                channel.unsafe().closeForcibly();            }        }        return regFuture;    }

我们再来回忆一下initAndRegister方法

1. 实例化NioServerSocketChannel对象,channelFactory.newChannel()
a. 传入父类的ops是SelectionKey.OP_ACCEPT
b. 它的父类AbstractNioChannel把channel设置成非阻塞,然后把SelectionKey.OP_ACCEPT存起来
c. 父类AbstractChannel初始化了ChannelId
d. AbstractChannel初始化了unsafe,类型是NioMessageUnsafe。
e. AbstractChannel初始化了pipeline,类型是DefaultChannelPipeline,每个channel都有自己的pipleline,它维护了channelHandler列表,如果有事件发生,那么pipeline就负责把事件从头传到尾。

2. init方法
a. 它是在子类ServerBootstrap里面实现,子类Bootstrap实现的是客户端的。
b. setOptions()设置属性,类型有很多,不同的业务场景可以设置不同的属性。
c. addLast把我们设置的channelHandler添加到pipeline
d. 实例化了一个ServerBootstrapAcceptor,里面封装了childChannel,也添加到pipeline里面

3. register
a. register调用的是bossGroup NioEventLoopGroup的register方法,NioEventLoopGroup regitster方法调用的next().regitster,next()调用chooser.next.
b. chooser有两种PowerOfTwoEventExecutorChooser和GenericEventExecutorChooser,它们负责选择NioEventLoopGroup里面下一个NioEventLoop(NioEventLoopGroup里面有nThreads个NioEventLoop,nThreads表示线程数,默认是cpu*2)
c. NioEventLoop.register调用的是它的父类SingleThreadEventLoop.register,所以它调用的是unsafe.register。从上面的初始化就可以知道,unsafe指的是NioMessageUnsafe,所以调用的是NioMessageUnsafe.register
d. NioMessageUnsafe并没有实现register,所以调用的是它的父类AbstractUnsafe.regitster,然后调用register0
e. 在doRegitster里面把selector注册到Java的channel,key=0
f. 调用pipeline.fireChannelRegistered(),通知pipeline维护的channelHander,channel已经注册了,回调了它们的channelRegitstered方法。

那initAndRegister就讲完了,bind过程还没有结束,因为篇幅有点多了,下一篇继续介绍doBind0:

private ChannelFuture doBind(final SocketAddress localAddress) {        final ChannelFuture regFuture = initAndRegister(); //1. 这一篇的内容        final Channel channel = regFuture.channel();        if (regFuture.cause() != null) {            return regFuture;        }        if (regFuture.isDone()) {            // At this point we know that the registration was complete and successful.            ChannelPromise promise = channel.newPromise();            doBind0(regFuture, channel, localAddress, promise); //2. 下一篇讲doBind0()            return promise;        } else {            ...            });            return promise;        }    }
0