千家信息网

dubbo中NettyServer的作用是什么

发表于:2025-02-03 作者:千家信息网编辑
千家信息网最后更新 2025年02月03日,dubbo中NettyServer的作用是什么,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。AbstractServerdubbo-2.
千家信息网最后更新 2025年02月03日dubbo中NettyServer的作用是什么

dubbo中NettyServer的作用是什么,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。

AbstractServer

dubbo-2.7.3/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractServer.java

public abstract class AbstractServer extends AbstractEndpoint implements Server {    protected static final String SERVER_THREAD_POOL_NAME = "DubboServerHandler";    private static final Logger logger = LoggerFactory.getLogger(AbstractServer.class);    ExecutorService executor;    private InetSocketAddress localAddress;    private InetSocketAddress bindAddress;    private int accepts;    private int idleTimeout;    public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {        super(url, handler);        localAddress = getUrl().toInetSocketAddress();        String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());        int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());        if (url.getParameter(ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {            bindIp = ANYHOST_VALUE;        }        bindAddress = new InetSocketAddress(bindIp, bindPort);        this.accepts = url.getParameter(ACCEPTS_KEY, DEFAULT_ACCEPTS);        this.idleTimeout = url.getParameter(IDLE_TIMEOUT_KEY, DEFAULT_IDLE_TIMEOUT);        try {            doOpen();            if (logger.isInfoEnabled()) {                logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());            }        } catch (Throwable t) {            throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()                    + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);        }        //fixme replace this with better method        DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();        executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));    }    protected abstract void doOpen() throws Throwable;    protected abstract void doClose() throws Throwable;    @Override    public void reset(URL url) {        if (url == null) {            return;        }        try {            if (url.hasParameter(ACCEPTS_KEY)) {                int a = url.getParameter(ACCEPTS_KEY, 0);                if (a > 0) {                    this.accepts = a;                }            }        } catch (Throwable t) {            logger.error(t.getMessage(), t);        }        try {            if (url.hasParameter(IDLE_TIMEOUT_KEY)) {                int t = url.getParameter(IDLE_TIMEOUT_KEY, 0);                if (t > 0) {                    this.idleTimeout = t;                }            }        } catch (Throwable t) {            logger.error(t.getMessage(), t);        }        try {            if (url.hasParameter(THREADS_KEY)                    && executor instanceof ThreadPoolExecutor && !executor.isShutdown()) {                ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;                int threads = url.getParameter(THREADS_KEY, 0);                int max = threadPoolExecutor.getMaximumPoolSize();                int core = threadPoolExecutor.getCorePoolSize();                if (threads > 0 && (threads != max || threads != core)) {                    if (threads < core) {                        threadPoolExecutor.setCorePoolSize(threads);                        if (core == max) {                            threadPoolExecutor.setMaximumPoolSize(threads);                        }                    } else {                        threadPoolExecutor.setMaximumPoolSize(threads);                        if (core == max) {                            threadPoolExecutor.setCorePoolSize(threads);                        }                    }                }            }        } catch (Throwable t) {            logger.error(t.getMessage(), t);        }        super.setUrl(getUrl().addParameters(url.getParameters()));    }    @Override    public void send(Object message, boolean sent) throws RemotingException {        Collection channels = getChannels();        for (Channel channel : channels) {            if (channel.isConnected()) {                channel.send(message, sent);            }        }    }    @Override    public void close() {        if (logger.isInfoEnabled()) {            logger.info("Close " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());        }        ExecutorUtil.shutdownNow(executor, 100);        try {            super.close();        } catch (Throwable e) {            logger.warn(e.getMessage(), e);        }        try {            doClose();        } catch (Throwable e) {            logger.warn(e.getMessage(), e);        }    }    @Override    public void close(int timeout) {        ExecutorUtil.gracefulShutdown(executor, timeout);        close();    }    @Override    public InetSocketAddress getLocalAddress() {        return localAddress;    }    public InetSocketAddress getBindAddress() {        return bindAddress;    }    public int getAccepts() {        return accepts;    }    public int getIdleTimeout() {        return idleTimeout;    }    @Override    public void connected(Channel ch) throws RemotingException {        // If the server has entered the shutdown process, reject any new connection        if (this.isClosing() || this.isClosed()) {            logger.warn("Close new channel " + ch + ", cause: server is closing or has been closed. For example, receive a new connect request while in shutdown process.");            ch.close();            return;        }        Collection channels = getChannels();        if (accepts > 0 && channels.size() > accepts) {            logger.error("Close channel " + ch + ", cause: The server " + ch.getLocalAddress() + " connections greater than max config " + accepts);            ch.close();            return;        }        super.connected(ch);    }    @Override    public void disconnected(Channel ch) throws RemotingException {        Collection channels = getChannels();        if (channels.isEmpty()) {            logger.warn("All clients has disconnected from " + ch.getLocalAddress() + ". You can graceful shutdown now.");        }        super.disconnected(ch);    }}
  • AbstractServer的构造器会从url读取bindAddress、accepts、idleTimeout,然后执行doOpen方法;close方法会关闭executor,执行父类close方法,然后执行doClose方法;connected方法会先判断channels是否超出accepts值,超过则直接close;disconnected则执行父类disconnected方法

NettyServer

dubbo-2.7.3/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyServer.java

public class NettyServer extends AbstractServer implements Server {    private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);    /**     * the cache for alive worker channel.     *      */    private Map channels;    /**     * netty server bootstrap.     */    private ServerBootstrap bootstrap;    /**     * the boss channel that receive connections and dispatch these to worker channel.     */        private io.netty.channel.Channel channel;    private EventLoopGroup bossGroup;    private EventLoopGroup workerGroup;    public NettyServer(URL url, ChannelHandler handler) throws RemotingException {        // you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants.        // the handler will be warped: MultiMessageHandler->HeartbeatHandler->handler        super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));    }    /**     * Init and start netty server     *     * @throws Throwable     */    @Override    protected void doOpen() throws Throwable {        bootstrap = new ServerBootstrap();        bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));        workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),                new DefaultThreadFactory("NettyServerWorker", true));        final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);        channels = nettyServerHandler.getChannels();        bootstrap.group(bossGroup, workerGroup)                .channel(NioServerSocketChannel.class)                .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)                .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)                .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)                .childHandler(new ChannelInitializer() {                    @Override                    protected void initChannel(NioSocketChannel ch) throws Exception {                        // FIXME: should we use getTimeout()?                        int idleTimeout = UrlUtils.getIdleTimeout(getUrl());                        NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);                        ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug                                .addLast("decoder", adapter.getDecoder())                                .addLast("encoder", adapter.getEncoder())                                .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))                                .addLast("handler", nettyServerHandler);                    }                });        // bind        ChannelFuture channelFuture = bootstrap.bind(getBindAddress());        channelFuture.syncUninterruptibly();        channel = channelFuture.channel();    }    @Override    protected void doClose() throws Throwable {        try {            if (channel != null) {                // unbind.                channel.close();            }        } catch (Throwable e) {            logger.warn(e.getMessage(), e);        }        try {            Collection channels = getChannels();            if (channels != null && channels.size() > 0) {                for (org.apache.dubbo.remoting.Channel channel : channels) {                    try {                        channel.close();                    } catch (Throwable e) {                        logger.warn(e.getMessage(), e);                    }                }            }        } catch (Throwable e) {            logger.warn(e.getMessage(), e);        }        try {            if (bootstrap != null) {                bossGroup.shutdownGracefully();                workerGroup.shutdownGracefully();            }        } catch (Throwable e) {            logger.warn(e.getMessage(), e);        }        try {            if (channels != null) {                channels.clear();            }        } catch (Throwable e) {            logger.warn(e.getMessage(), e);        }    }    @Override    public Collection getChannels() {        Collection chs = new HashSet();        for (Channel channel : this.channels.values()) {            if (channel.isConnected()) {                chs.add(channel);            } else {                channels.remove(NetUtils.toAddressString(channel.getRemoteAddress()));            }        }        return chs;    }    @Override    public Channel getChannel(InetSocketAddress remoteAddress) {        return channels.get(NetUtils.toAddressString(remoteAddress));    }    @Override    public boolean canHandleIdle() {        return true;    }    @Override    public boolean isBound() {        return channel.isActive();    }}
  • NettyServer继承了AbstractServer,其实现了doOpen、doClose方法;doOpen方法会创建netty的ServerBootstrap、bossGroup、workerGroup;doClose方法会关闭channel,关闭bossGroup、workerGroup

NettyTransporter

dubbo-2.7.3/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyTransporter.java

public class NettyTransporter implements Transporter {    public static final String NAME = "netty";    @Override    public Server bind(URL url, ChannelHandler listener) throws RemotingException {        return new NettyServer(url, listener);    }    @Override    public Client connect(URL url, ChannelHandler listener) throws RemotingException {        return new NettyClient(url, listener);    }}
  • NettyTransporter实现了Transporter接口,其bind方法创建的是NettyServer

小结

NettyServer继承了AbstractServer,其实现了doOpen、doClose方法;doOpen方法会创建netty的ServerBootstrap、bossGroup、workerGroup;doClose方法会关闭channel,关闭bossGroup、workerGroup

看完上述内容,你们掌握dubbo中NettyServer的作用是什么的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注行业资讯频道,感谢各位的阅读!

0