千家信息网

dubbo中AllDispatcher的作用是什么

发表于:2025-01-31 作者:千家信息网编辑
千家信息网最后更新 2025年01月31日,dubbo中AllDispatcher的作用是什么,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。Dispatcherdu
千家信息网最后更新 2025年01月31日dubbo中AllDispatcher的作用是什么

dubbo中AllDispatcher的作用是什么,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。

Dispatcher

dubbo-2.7.3/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/Dispatcher.java

@SPI(AllDispatcher.NAME)public interface Dispatcher {    /**     * dispatch the message to threadpool.     *     * @param handler     * @param url     * @return channel handler     */    @Adaptive({Constants.DISPATCHER_KEY, "dispather", "channel.handler"})    // The last two parameters are reserved for compatibility with the old configuration    ChannelHandler dispatch(ChannelHandler handler, URL url);}
  • Dispatcher接口定义了dispatch方法,返回ChannelHandler

AllDispatcher

dubbo-2.7.3/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/all/AllDispatcher.java

public class AllDispatcher implements Dispatcher {    public static final String NAME = "all";    @Override    public ChannelHandler dispatch(ChannelHandler handler, URL url) {        return new AllChannelHandler(handler, url);    }}
  • AllDispatcher实现了Dispatcher接口,其dispatch方法返回的是AllChannelHandler

AllChannelHandler

dubbo-2.7.3/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/all/AllChannelHandler.java

public class AllChannelHandler extends WrappedChannelHandler {    public AllChannelHandler(ChannelHandler handler, URL url) {        super(handler, url);    }    @Override    public void connected(Channel channel) throws RemotingException {        ExecutorService executor = getExecutorService();        try {            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));        } catch (Throwable t) {            throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);        }    }    @Override    public void disconnected(Channel channel) throws RemotingException {        ExecutorService executor = getExecutorService();        try {            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));        } catch (Throwable t) {            throw new ExecutionException("disconnect event", channel, getClass() + " error when process disconnected event .", t);        }    }    @Override    public void received(Channel channel, Object message) throws RemotingException {        ExecutorService executor = getExecutorService();        try {            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));        } catch (Throwable t) {            //TODO A temporary solution to the problem that the exception information can not be sent to the opposite end after the thread pool is full. Need a refactoring            //fix The thread pool is full, refuses to call, does not return, and causes the consumer to wait for time out                if(message instanceof Request && t instanceof RejectedExecutionException){                        Request request = (Request)message;                        if(request.isTwoWay()){                                String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();                                Response response = new Response(request.getId(), request.getVersion());                                response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);                                response.setErrorMessage(msg);                                channel.send(response);                                return;                        }                }            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);        }    }    @Override    public void caught(Channel channel, Throwable exception) throws RemotingException {        ExecutorService executor = getExecutorService();        try {            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));        } catch (Throwable t) {            throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);        }    }}
  • AllChannelHandler继承了WrappedChannelHandler,其connected、disconnected、received、caught均是通过父类的getExecutorService获取线程池,然后执行创建的ChannelEventRunnable;received方法在捕获到异常时RejectedExecutionException且message是Request,而且request是twoWay的时候会返回SERVER_THREADPOOL_EXHAUSTED_ERROR

小结

  • Dispatcher接口定义了dispatch方法,返回ChannelHandler

  • AllChannelHandler继承了WrappedChannelHandler,其connected、disconnected、received、caught均是通过父类的getExecutorService获取线程池,然后执行创建的ChannelEventRunnable

  • AllChannelHandler的received方法在捕获到异常时RejectedExecutionException且message是Request,而且request是twoWay的时候会返回SERVER_THREADPOOL_EXHAUSTED_ERROR

看完上述内容是否对您有帮助呢?如果还想对相关知识有进一步的了解或阅读更多相关文章,请关注行业资讯频道,感谢您对的支持。

0