千家信息网

dubbo的WrappedChannelHandler有什么作用

发表于:2025-02-06 作者:千家信息网编辑
千家信息网最后更新 2025年02月06日,本篇内容介绍了"dubbo的WrappedChannelHandler有什么作用"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大
千家信息网最后更新 2025年02月06日dubbo的WrappedChannelHandler有什么作用

本篇内容介绍了"dubbo的WrappedChannelHandler有什么作用"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

本文主要研究一下dubbo的WrappedChannelHandler

WrappedChannelHandler

dubbo-2.7.3/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/WrappedChannelHandler.java

public class WrappedChannelHandler implements ChannelHandlerDelegate {    protected static final Logger logger = LoggerFactory.getLogger(WrappedChannelHandler.class);    protected static final ExecutorService SHARED_EXECUTOR = Executors.newCachedThreadPool(new NamedThreadFactory("DubboSharedHandler", true));    protected final ExecutorService executor;    protected final ChannelHandler handler;    protected final URL url;    public WrappedChannelHandler(ChannelHandler handler, URL url) {        this.handler = handler;        this.url = url;        executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);        String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;        if (CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) {            componentKey = CONSUMER_SIDE;        }        DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();        dataStore.put(componentKey, Integer.toString(url.getPort()), executor);    }    public void close() {        try {            if (executor != null) {                executor.shutdown();            }        } catch (Throwable t) {            logger.warn("fail to destroy thread pool of server: " + t.getMessage(), t);        }    }    @Override    public void connected(Channel channel) throws RemotingException {        handler.connected(channel);    }    @Override    public void disconnected(Channel channel) throws RemotingException {        handler.disconnected(channel);    }    @Override    public void sent(Channel channel, Object message) throws RemotingException {        handler.sent(channel, message);    }    @Override    public void received(Channel channel, Object message) throws RemotingException {        handler.received(channel, message);    }    @Override    public void caught(Channel channel, Throwable exception) throws RemotingException {        handler.caught(channel, exception);    }    public ExecutorService getExecutor() {        return executor;    }    @Override    public ChannelHandler getHandler() {        if (handler instanceof ChannelHandlerDelegate) {            return ((ChannelHandlerDelegate) handler).getHandler();        } else {            return handler;        }    }    public URL getUrl() {        return url;    }    public ExecutorService getExecutorService() {        ExecutorService cexecutor = executor;        if (cexecutor == null || cexecutor.isShutdown()) {            cexecutor = SHARED_EXECUTOR;        }        return cexecutor;    }}
  • WrappedChannelHandler的构造根据ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url)获取ExecutorService,然后放到dataStore中

ExecutionChannelHandler

dubbo-2.7.3/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/execution/ExecutionChannelHandler.java

public class ExecutionChannelHandler extends WrappedChannelHandler {    public ExecutionChannelHandler(ChannelHandler handler, URL url) {        super(handler, url);    }    @Override    public void received(Channel channel, Object message) throws RemotingException {        ExecutorService executor = getExecutorService();        if (message instanceof Request) {            try {                executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));            } catch (Throwable t) {                // FIXME: when the thread pool is full, SERVER_THREADPOOL_EXHAUSTED_ERROR cannot return properly,                // therefore the consumer side has to wait until gets timeout. This is a temporary solution to prevent                // this scenario from happening, but a better solution should be considered later.                if (t instanceof RejectedExecutionException) {                    Request request = (Request) message;                    if (request.isTwoWay()) {                        String msg = "Server side(" + url.getIp() + "," + url.getPort()                                + ") thread pool is exhausted, detail msg:" + t.getMessage();                        Response response = new Response(request.getId(), request.getVersion());                        response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);                        response.setErrorMessage(msg);                        channel.send(response);                        return;                    }                }                throw new ExecutionException(message, channel, getClass() + " error when process received event.", t);            }        } else {            handler.received(channel, message);        }    }}
  • ExecutionChannelHandler继承了WrappedChannelHandler,其received会创建ChannelEventRunnable,然后放到executor去执行

ChannelEventRunnable

dubbo-2.7.3/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/ChannelEventRunnable.java

public class ChannelEventRunnable implements Runnable {    private static final Logger logger = LoggerFactory.getLogger(ChannelEventRunnable.class);    private final ChannelHandler handler;    private final Channel channel;    private final ChannelState state;    private final Throwable exception;    private final Object message;    public ChannelEventRunnable(Channel channel, ChannelHandler handler, ChannelState state) {        this(channel, handler, state, null);    }    public ChannelEventRunnable(Channel channel, ChannelHandler handler, ChannelState state, Object message) {        this(channel, handler, state, message, null);    }    public ChannelEventRunnable(Channel channel, ChannelHandler handler, ChannelState state, Throwable t) {        this(channel, handler, state, null, t);    }    public ChannelEventRunnable(Channel channel, ChannelHandler handler, ChannelState state, Object message, Throwable exception) {        this.channel = channel;        this.handler = handler;        this.state = state;        this.message = message;        this.exception = exception;    }    @Override    public void run() {        if (state == ChannelState.RECEIVED) {            try {                handler.received(channel, message);            } catch (Exception e) {                logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel                        + ", message is " + message, e);            }        } else {            switch (state) {            case CONNECTED:                try {                    handler.connected(channel);                } catch (Exception e) {                    logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);                }                break;            case DISCONNECTED:                try {                    handler.disconnected(channel);                } catch (Exception e) {                    logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);                }                break;            case SENT:                try {                    handler.sent(channel, message);                } catch (Exception e) {                    logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel                            + ", message is " + message, e);                }                break;            case CAUGHT:                try {                    handler.caught(channel, exception);                } catch (Exception e) {                    logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel                            + ", message is: " + message + ", exception is " + exception, e);                }                break;            default:                logger.warn("unknown state: " + state + ", message is " + message);            }        }    }    /**     * ChannelState     *     *     */    public enum ChannelState {        /**         * CONNECTED         */        CONNECTED,        /**         * DISCONNECTED         */        DISCONNECTED,        /**         * SENT         */        SENT,        /**         * RECEIVED         */        RECEIVED,        /**         * CAUGHT         */        CAUGHT    }}
  • ChannelEventRunnable实现了Runnable接口,其run方法根据不同的ChannelState做不同处理

小结

  • WrappedChannelHandler的构造根据ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url)获取ExecutorService,然后放到dataStore中

  • ExecutionChannelHandler继承了WrappedChannelHandler,其received会创建ChannelEventRunnable,然后放到executor去执行

  • ChannelEventRunnable实现了Runnable接口,其run方法根据不同的ChannelState做不同处理

"dubbo的WrappedChannelHandler有什么作用"的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注网站,小编将为大家输出更多高质量的实用文章!

0