// 调用方式如下 EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(4); // io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int, java.util.concurrent.ThreadFactory) /** * Create a new instance using the specified number of threads, the given {@link ThreadFactory} and the * {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}. */ public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) { this(nThreads, threadFactory, SelectorProvider.provider()); } public NioEventLoopGroup( int nThreads, Executor executor, final SelectorProvider selectorProvider) { this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE); } public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) { super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject()); } // io.netty.channel.MultithreadEventLoopGroup#MultithreadEventLoopGroup(int, java.util.concurrent.Executor, java.lang.Object...) protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) { // 默认线程是 cpu * 2 super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args); } // io.netty.util.concurrent.MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, java.util.concurrent.Executor, java.lang.Object...) /** * Create a new instance. * * @param nThreads the number of threads that will be used by this instance. * @param executor the Executor to use, or {@code null} if the default should be used. * @param args arguments which will passed to each {@link #newChild(Executor, Object...)} call */ protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) { this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args); } // io.netty.util.concurrent.MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, java.util.concurrent.Executor, io.netty.util.concurrent.EventExecutorChooserFactory, java.lang.Object...) /** * Create a new instance. * * @param nThreads the number of threads that will be used by this instance. * @param executor the Executor to use, or {@code null} if the default should be used. * @param chooserFactory the {@link EventExecutorChooserFactory} to use. * @param args arguments which will passed to each {@link #newChild(Executor, Object...)} call */ protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { if (nThreads <= 0) { throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads)); } // 创建一个执行器,该执行器每提交一个任务,就创建一个线程来运行,即并没有队列的概念 if (executor == null) { executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); } // 使用一个数组来保存整个可用的线程池 children = new EventExecutor[nThreads]; for (int i = 0; i < nThreads; i ++) { boolean success = false; try { // 为每个child创建一个线程运行, 该方法由子类实现 children[i] = newChild(executor, args); success = true; } catch (Exception e) { // TODO: Think about if this is a good exception type throw new IllegalStateException("failed to create a child event loop", e); } finally { if (!success) { // 如果创建失败,则把已经创建好的线程池关闭掉 // 不过值得注意的是,当某个线程池创建失败后,并没有立即停止后续创建工作,即无 return 操作,这是为啥? // 实际上,发生异常时,Exeception 已经被抛出,此处无需关注 for (int j = 0; j < i; j ++) { children[j].shutdownGracefully(); } for (int j = 0; j < i; j ++) { EventExecutor e = children[j]; try { while (!e.isTerminated()) { e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } } catch (InterruptedException interrupted) { // Let the caller handle the interruption. Thread.currentThread().interrupt(); break; } } } } } // 创建选择器,猜测是做负载均衡时使用 // 此处的chooser默认是 DefaultEventExecutorChooserFactory chooser = chooserFactory.newChooser(children); final FutureListener
// 1. channel的设定 // io.netty.bootstrap.AbstractBootstrap#channel /** * The {@link Class} which is used to create {@link Channel} instances from. * You either use this or {@link #channelFactory(io.netty.channel.ChannelFactory)} if your * {@link Channel} implementation has no no-args constructor. */ public B channel(Class extends C> channelClass) { if (channelClass == null) { throw new NullPointerException("channelClass"); } // 默认使用构造器反射的方式创建 channel return channelFactory(new ReflectiveChannelFactory(channelClass)); } // io.netty.bootstrap.AbstractBootstrap#channelFactory(io.netty.channel.ChannelFactory extends C>) /** * {@link io.netty.channel.ChannelFactory} which is used to create {@link Channel} instances from * when calling {@link #bind()}. This method is usually only used if {@link #channel(Class)} * is not working for you because of some more complex needs. If your {@link Channel} implementation * has a no-args constructor, its highly recommend to just use {@link #channel(Class)} for * simplify your code. */ @SuppressWarnings({ "unchecked", "deprecation" }) public B channelFactory(io.netty.channel.ChannelFactory extends C> channelFactory) { return channelFactory((ChannelFactory) channelFactory); } // io.netty.bootstrap.AbstractBootstrap#channelFactory(io.netty.bootstrap.ChannelFactory extends C>) /** * @deprecated Use {@link #channelFactory(io.netty.channel.ChannelFactory)} instead. */ @Deprecated public B channelFactory(ChannelFactory extends C> channelFactory) { if (channelFactory == null) { throw new NullPointerException("channelFactory"); } if (this.channelFactory != null) { throw new IllegalStateException("channelFactory set already"); } this.channelFactory = channelFactory; return self(); } @SuppressWarnings("unchecked") private B self() { return (B) this; } // 2. option 参数选项设置, 它会承包各种特殊配置的设置, 是一个通用配置项设置的入口 /** * Allow to specify a {@link ChannelOption} which is used for the {@link Channel} instances once they got * created. Use a value of {@code null} to remove a previous set {@link ChannelOption}. */ public B option(ChannelOption option, T value) { if (option == null) { throw new NullPointerException("option"); } // options 是一个 new LinkedHashMap, Object>(), 即非线程安全的容器, 所以设置值时要求使用 synchronized 保证线程安全 // value 为null时代表要将该选项设置删除, 如果key相同,后面的配置将会覆盖前面的配置 if (value == null) { synchronized (options) { options.remove(option); } } else { synchronized (options) { options.put(option, value); } } return self(); } // 3. childHandler 添加channelHandler, 这是一个最重要的一个方法, 它会影响到后面的业务处理统筹 // 调用该方法仅将 channelHandler的上下文加入进来, 实际还未进行真正的添加操作 .childHandler(new ChannelInitializer() { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) // 设置tcp协议的请求等待队列 .childHandler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new LoggingHandler(LogLevel.INFO)); p.addLast("encoder", new MessageEncoder()); p.addLast("decoder", new MessageDecoder()); p.addLast(new EchoServerHandler()); } }); /** * Set the {@link ChannelHandler} which is used to serve the request for the {@link Channel}'s. */ public ServerBootstrap childHandler(ChannelHandler childHandler) { if (childHandler == null) { throw new NullPointerException("childHandler"); } // 仅将 channelHandler 绑定到netty的上下文中 this.childHandler = childHandler; return this; } // 4. bossGroup, workGroup 如何被分配 ? /** * Set the {@link EventLoopGroup} for the parent (acceptor) and the child (client). These * {@link EventLoopGroup}'s are used to handle all the events and IO for {@link ServerChannel} and * {@link Channel}'s. */ public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) { // parentGroup 是给acceptor使用的, 主要用于对socket连接的接入,所以一般一个线程也够了 super.group(parentGroup); if (childGroup == null) { throw new NullPointerException("childGroup"); } if (this.childGroup != null) { throw new IllegalStateException("childGroup set already"); } // childGroup 主要用于接入后的socket的事件的处理,一般要求数量较多,视业务属性决定 this.childGroup = childGroup; return this; }
// io.netty.bootstrap.AbstractBootstrap#bind(int) /** * Create a new {@link Channel} and bind it. */ public ChannelFuture bind(int inetPort) { return bind(new InetSocketAddress(inetPort)); } // io.netty.bootstrap.AbstractBootstrap#bind(java.net.SocketAddress) /** * Create a new {@link Channel} and bind it. */ public ChannelFuture bind(SocketAddress localAddress) { // 先验证各种参数是否设置完整, 如线程池是否设置, channelHandler 是否设置... validate(); if (localAddress == null) { throw new NullPointerException("localAddress"); } // 绑定tcp端口 return doBind(localAddress); } private ChannelFuture doBind(final SocketAddress localAddress) { // 1. 创建一些channel使用, 与eventloop绑定, 统一管理嘛 final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; } if (regFuture.isDone()) { // At this point we know that the registration was complete and successful. ChannelPromise promise = channel.newPromise(); // 2. 注册成功之后, 开始实际的 bind() 操作, 实际就是调用 channel.bind() // doBind0() 是一个异步的操作,所以使用的一个 promise 作为结果驱动 doBind0(regFuture, channel, localAddress, promise); return promise; } else { // Registration future is almost always fulfilled already, but just in case it's not. final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null) { // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an // IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause); } else { // Registration was successful, so set the correct executor to use. // See https://github.com/netty/netty/issues/2586 promise.registered(); doBind0(regFuture, channel, localAddress, promise); } } }); return promise; } }
// 以下我们先看下执行框架 // io.netty.bootstrap.AbstractBootstrap#initAndRegister final ChannelFuture initAndRegister() { Channel channel = null; try { // 即根据前面设置的channel 使用反射创建一个实例出来 // 即此处将会实例化出一个 ServerSocketChannel 出来 // 所以如果你想用jdk的nio实现,则设置channel时使用 NioServerSocketChannel.class即可, 而你想使用其他更优化的实现时比如EpollServerSocketChannel时,改变一下即可 // 而此处的 channelFactory 就是一个反射的实现 ReflectiveChannelFactory, 它会调用如上channel的无参构造方法实例化 // 重点工作就需要在这个无参构造器中完成,我们接下来看看 channel = channelFactory.newChannel(); // 初始化channel的一些公共参数, 相当于做一些属性的继承, 因为后续它将不再依赖 ServerBootstrap, 它需要有独立自主能力 init(channel); } catch (Throwable t) { if (channel != null) { // channel can be null if newChannel crashed (eg SocketException("too many open files")) channel.unsafe().closeForcibly(); // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t); } // 注册创建好的 channel 到eventLoop中 ChannelFuture regFuture = config().group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } // If we are here and the promise is not failed, it's one of the following cases: // 1) If we attempted registration from the event loop, the registration has been completed at this point. // i.e. It's safe to attempt bind() or connect() now because the channel has been registered. // 2) If we attempted registration from the other thread, the registration request has been successfully // added to the event loop's task queue for later execution. // i.e. It's safe to attempt bind() or connect() now: // because bind() or connect() will be executed *after* the scheduled registration task is executed // because register(), bind(), and connect() are all bound to the same thread. return regFuture; } // 1. 先看看 NioServerSocketChannel 的构造过程 // io.netty.channel.socket.nio.NioServerSocketChannel#NioServerSocketChannel() /** * Create a new instance */ public NioServerSocketChannel() { // newSocket 简单说就是创建一个本地socket, api调用: SelectorProvider.provider().openServerSocketChannel() // 但此时本 socket 并未和任何端口绑定 this(newSocket(DEFAULT_SELECTOR_PROVIDER)); } /** * Create a new instance using the given {@link ServerSocketChannel}. */ public NioServerSocketChannel(ServerSocketChannel channel) { // 注册 OP_ACCEPT 事件 super(null, channel, SelectionKey.OP_ACCEPT); // 此处的 javaChannel() 实际就是 channel, 这样调用只是为统一吧 // 创建一个新的 socket 传入 NioServerSocketChannelConfig 中 // 主要用于一些 RecvByteBufAllocator 的设置,及channel的保存 config = new NioServerSocketChannelConfig(this, javaChannel().socket()); } // io.netty.channel.nio.AbstractNioChannel#AbstractNioChannel /** * Create a new instance * * @param parent the parent {@link Channel} by which this instance was created. May be {@code null} * @param ch the underlying {@link SelectableChannel} on which it operates * @param readInterestOp the ops to set to receive data from the {@link SelectableChannel} */ protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { // 先让父类初始化必要的上下文 super(parent); // 保留 channel 信息,并设置非阻塞标识 this.ch = ch; this.readInterestOp = readInterestOp; try { ch.configureBlocking(false); } catch (IOException e) { try { ch.close(); } catch (IOException e2) { if (logger.isWarnEnabled()) { logger.warn( "Failed to close a partially initialized socket.", e2); } } throw new ChannelException("Failed to enter non-blocking mode.", e); } } // io.netty.channel.AbstractChannel#AbstractChannel(io.netty.channel.Channel) /** * Creates a new instance. * * @param parent * the parent of this channel. {@code null} if there's no parent. */ protected AbstractChannel(Channel parent) { // 初始化上下文 this.parent = parent; // DefaultChannelId id = newId(); // NioMessageUnsafe unsafe = newUnsafe(); // new DefaultChannelPipeline(this); // 比较重要,将会初始化 head, tail 节点 pipeline = newChannelPipeline(); } // io.netty.channel.DefaultChannelPipeline#DefaultChannelPipeline protected DefaultChannelPipeline(Channel channel) { this.channel = ObjectUtil.checkNotNull(channel, "channel"); succeededFuture = new SucceededChannelFuture(channel, null); voidPromise = new VoidChannelPromise(channel, true); // 初始化 head, tail tail = new TailContext(this); head = new HeadContext(this); // 构成双向链表 head.next = tail; tail.prev = head; } // 2. 初始化channel, 有个最重要的动作是将 Acceptor 接入到 pipeline 中 // io.netty.bootstrap.ServerBootstrap#init @Override void init(Channel channel) throws Exception { final Map, Object> options = options0(); // 根据前面的设置, 将各种属性copy过来, 放到 config 字段中 // 同样, 因为 options 和 attrs 都不是线程安全的, 所以都要上锁操作 synchronized (options) { setChannelOptions(channel, options, logger); } final Map, Object> attrs = attrs0(); synchronized (attrs) { for (Entry, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey key = (AttributeKey) e.getKey(); channel.attr(key).set(e.getValue()); } } // 此处的pipeline, 就是在 NioServerSocketChannel 中初始化好head,tail的pipeline ChannelPipeline p = channel.pipeline(); // childGroup 实际就是外部的 workGroup final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry, Object>[] currentChildOptions; final Entry, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size())); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size())); } // 这个就比较重要了, 关联 ServerBootstrapAcceptor // 主动添加一个 initializer, 它将作为第一个被调用的 channelInitializer 存在 // 而 channelInitializer 只会被调用一次 p.addLast(new ChannelInitializer() { @Override public void initChannel(final Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { // 添加 Acceptor 到 pipeline 中, 形成一个 head -> ServerBootstrapAcceptor -> tail 的pipeline pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); // 此操作过后,当前pipeline中,就只有此一handler }
// io.netty.channel.MultithreadEventLoopGroup#register(io.netty.channel.Channel) @Override public ChannelFuture register(Channel channel) { // next() 相当于是一个负载均衡器, 会选择出一个合适的 eventloop 出来, 默认是round-robin return next().register(channel); } // io.netty.channel.MultithreadEventLoopGroup#next @Override public EventLoop next() { return (EventLoop) super.next(); } // io.netty.util.concurrent.MultithreadEventExecutorGroup#next @Override public EventExecutor next() { // 使用前面创建的 PowerOfTwoEventExecutorChooser 进行调用 // 默认实现为轮询 return chooser.next(); } // io.netty.util.concurrent.DefaultEventExecutorChooserFactory.PowerOfTwoEventExecutorChooser#next @Override public EventExecutor next() { return executors[idx.getAndIncrement() & executors.length - 1]; } // io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.Channel) @Override public ChannelFuture register(Channel channel) { // 使用 DefaultChannelPromise 封装channel, 再注册到 eventloop 中 return register(new DefaultChannelPromise(channel, this)); } @Override public ChannelFuture register(final ChannelPromise promise) { ObjectUtil.checkNotNull(promise, "promise"); // NioMessageUnsafe promise.channel().unsafe().register(this, promise); return promise; } // io.netty.channel.AbstractChannel.AbstractUnsafe#register @Override public final void register(EventLoop eventLoop, final ChannelPromise promise) { if (eventLoop == null) { throw new NullPointerException("eventLoop"); } if (isRegistered()) { promise.setFailure(new IllegalStateException("registered to an event loop already")); return; } if (!isCompatible(eventLoop)) { promise.setFailure( new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); return; } AbstractChannel.this.eventLoop = eventLoop; // inEventLoop() 判断当前线程是否在 eventLoop 中 // 判断方式为直接比较 eventloop 线程也当前线程是否是同一个即可 Thread.currentThread() == this.thread; // 核心注册方法 register0() if (eventLoop.inEventLoop()) { register0(promise); } else { // 不在 eventLoop 中, 则异步提交任务给 eventloop 处理 try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } } // register0() 做真正的注册 // io.netty.channel.AbstractChannel.AbstractUnsafe#register0 private void register0(ChannelPromise promise) { try { // check if the channel is still open as it could be closed in the mean time when the register // call was outside of the eventLoop if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } boolean firstRegistration = neverRegistered; // 具体的注册逻辑由子类实现, NioServerSocketChannel doRegister(); neverRegistered = false; registered = true; // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the // user may already fire events through the pipeline in the ChannelFutureListener. // 几个扩展点: fireHandlerAdded() -> fireChannelRegistered() -> fireChannelActive() // part1: fireChannelAdded(), 它将会回调上面的 ServerBootstrapAcceptor 的添加 channelInitializer pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); // part2: fireChannelRegistered() pipeline.fireChannelRegistered(); // Only fire a channelActive if the channel has never been registered. This prevents firing // multiple channel actives if the channel is deregistered and re-registered. if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); } else if (config().isAutoRead()) { // This channel was registered before and autoRead() is set. This means we need to begin read // again so that we process inbound data. // // See https://github.com/netty/netty/issues/4805 beginRead(); } } } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } // io.netty.channel.nio.AbstractNioChannel#doRegister @Override protected void doRegister() throws Exception { boolean selected = false; // 进行注册即是 JDK 的 ServerSocketChannel.register() 过程 // 即 netty 与 socket 建立了关系连接, ops=0, 代表监听所有读事件 for (;;) { try { // 一直注册直到成功 // 此处 ops=0, 即不关注任何事件哦, 那么前面的 OP_ACCEPT 和这里又是什么关系呢? selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); return; } catch (CancelledKeyException e) { if (!selected) { // Force the Selector to select now as the "canceled" SelectionKey may still be // cached and not removed because no Select.select(..) operation was called yet. eventLoop().selectNow(); selected = true; } else { // We forced a select operation on the selector before but the SelectionKey is still cached // for whatever reason. JDK bug ? throw e; } } } }
// io.netty.channel.nio.NioEventLoop#run @Overrideprotected void run() {// 一个死循环检测任务, 这就 eventloop 的大杀器哦for (;;) {try {switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {case SelectStrategy.CONTINUE:continue;// 有任务时执行任务, 否则阻塞等待网络事件, 或被唤醒case SelectStrategy.SELECT:// select.select(), 带超时限制select(wakenUp.getAndSet(false));// 'wakenUp.compareAndSet(false, true)' is always evaluated// before calling 'selector.wakeup()' to reduce the wake-up// overhead. (Selector.wakeup() is an expensive operation.)//// However, there is a race condition in this approach.// The race condition is triggered when 'wakenUp' is set to// true too early.//// 'wakenUp' is set to true too early if:// 1) Selector is waken up between 'wakenUp.set(false)' and// 'selector.select(...)'. (BAD)// 2) Selector is waken up between 'selector.select(...)' and// 'if (wakenUp.get()) { ... }'. (OK)//// In the first case, 'wakenUp' is set to true and the// following 'selector.select(...)' will wake up immediately.// Until 'wakenUp' is set to false again in the next round,// 'wakenUp.compareAndSet(false, true)' will fail, and therefore// any attempt to wake up the Selector will fail, too, causing// the following 'selector.select(...)' call to block// unnecessarily.//// To fix this problem, we wake up the selector again if wakenUp// is true immediately after selector.select(...).// It is inefficient in that it wakes up the selector for both// the first case (BAD - wake-up required) and the second case// (OK - no wake-up required).if (wakenUp.get()) { selector.wakeup(); }// fall throughdefault: } cancelledKeys = 0; needsToSelectAgain = false;// ioRatio 为io操作的占比, 和运行任务相比, 默认为 50:50final int ioRatio = this.ioRatio;if (ioRatio == 100) {try {// step1. 运行io操作 processSelectedKeys(); } finally {// Ensure we always run tasks.// step2. 运行task任务 runAllTasks(); } } else {final long ioStartTime = System.nanoTime();try { processSelectedKeys(); } finally {// Ensure we always run tasks.final long ioTime = System.nanoTime() - ioStartTime;// 运行任务的最长时间runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } } catch (Throwable t) { handleLoopException(t); }// Always handle shutdown even if the loop processing threw an exception.try {if (isShuttingDown()) { closeAll();if (confirmShutdown()) {return; } } } catch (Throwable t) { handleLoopException(t); } } }// select, 事件循环的依据private void select(boolean oldWakenUp) throws IOException { Selector selector = this.selector;try {int selectCnt = 0;long currentTimeNanos = System.nanoTime();// 带超时限制, 默认最大超时1s, 但当有延时任务处理时, 以它为标准long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);for (;;) {long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;if (timeoutMillis <= 0) {// 超时则立即返回if (selectCnt == 0) { selector.selectNow(); selectCnt = 1; }break; }// If a task was submitted when wakenUp value was true, the task didn't get a chance to call// Selector#wakeup. So we need to check task queue again before executing select operation.// If we don't, the task might be pended until select operation was timed out.// It might be pended until idle timeout if IdleStateHandler existed in pipeline.if (hasTasks() && wakenUp.compareAndSet(false, true)) { selector.selectNow(); selectCnt = 1;break; }int selectedKeys = selector.select(timeoutMillis); selectCnt ++;if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {// - Selected something,// - waken up by user, or// - the task queue has a pending task.// - a scheduled task is ready for processingbreak; }if (Thread.interrupted()) {// Thread was interrupted so reset selected keys and break so we not run into a busy loop.// As this is most likely a bug in the handler of the user or it's client library we will// also log it.//// See https://github.com/netty/netty/issues/2426if (logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely because " + "Thread.currentThread().interrupt() was called. Use " + "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop."); } selectCnt = 1;break; }long time = System.nanoTime();if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {// timeoutMillis elapsed without anything selected.selectCnt = 1; } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {// The selector returned prematurely many times in a row.// Rebuild the selector to work around the problem. logger.warn("Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.", selectCnt, selector); rebuildSelector(); selector = this.selector;// Select again to populate selectedKeys. selector.selectNow(); selectCnt = 1;break; } currentTimeNanos = time; }if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {if (logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.", selectCnt - 1, selector); } } } catch (CancelledKeyException e) {if (logger.isDebugEnabled()) { logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?", selector, e); }// Harmless exception - log anyway } }