千家信息网

dubbo中ConnectionOrderedDispatcher的作用是什么

发表于:2024-11-20 作者:千家信息网编辑
千家信息网最后更新 2024年11月20日,本篇文章为大家展示了dubbo中ConnectionOrderedDispatcher的作用是什么,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。dubbo-2
千家信息网最后更新 2024年11月20日dubbo中ConnectionOrderedDispatcher的作用是什么

本篇文章为大家展示了dubbo中ConnectionOrderedDispatcher的作用是什么,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。

dubbo-2.7.3/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/connection/ConnectionOrderedDispatcher.java

public class ConnectionOrderedDispatcher implements Dispatcher {    public static final String NAME = "connection";    @Override    public ChannelHandler dispatch(ChannelHandler handler, URL url) {        return new ConnectionOrderedChannelHandler(handler, url);    }}
  • ConnectionOrderedDispatcher实现了Dispatcher接口,其dispatch方法返回的是ConnectionOrderedChannelHandler

ConnectionOrderedChannelHandler

dubbo-2.7.3/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/connection/ConnectionOrderedChannelHandler.java

public class ConnectionOrderedChannelHandler extends WrappedChannelHandler {    protected final ThreadPoolExecutor connectionExecutor;    private final int queuewarninglimit;    public ConnectionOrderedChannelHandler(ChannelHandler handler, URL url) {        super(handler, url);        String threadName = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);        connectionExecutor = new ThreadPoolExecutor(1, 1,                0L, TimeUnit.MILLISECONDS,                new LinkedBlockingQueue(url.getPositiveParameter(CONNECT_QUEUE_CAPACITY, Integer.MAX_VALUE)),                new NamedThreadFactory(threadName, true),                new AbortPolicyWithReport(threadName, url)        );  // FIXME There's no place to release connectionExecutor!        queuewarninglimit = url.getParameter(CONNECT_QUEUE_WARNING_SIZE, DEFAULT_CONNECT_QUEUE_WARNING_SIZE);    }    @Override    public void connected(Channel channel) throws RemotingException {        try {            checkQueueLength();            connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));        } catch (Throwable t) {            throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);        }    }    @Override    public void disconnected(Channel channel) throws RemotingException {        try {            checkQueueLength();            connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));        } catch (Throwable t) {            throw new ExecutionException("disconnected event", channel, getClass() + " error when process disconnected event .", t);        }    }    @Override    public void received(Channel channel, Object message) throws RemotingException {        ExecutorService executor = getExecutorService();        try {            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));        } catch (Throwable t) {            //fix, reject exception can not be sent to consumer because thread pool is full, resulting in consumers waiting till timeout.            if (message instanceof Request && t instanceof RejectedExecutionException) {                Request request = (Request) message;                if (request.isTwoWay()) {                    String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();                    Response response = new Response(request.getId(), request.getVersion());                    response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);                    response.setErrorMessage(msg);                    channel.send(response);                    return;                }            }            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);        }    }    @Override    public void caught(Channel channel, Throwable exception) throws RemotingException {        ExecutorService executor = getExecutorService();        try {            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));        } catch (Throwable t) {            throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);        }    }    private void checkQueueLength() {        if (connectionExecutor.getQueue().size() > queuewarninglimit) {            logger.warn(new IllegalThreadStateException("connectionordered channel handler `queue size: " + connectionExecutor.getQueue().size() + " exceed the warning limit number :" + queuewarninglimit));        }    }}
  • ConnectionOrderedChannelHandler继承了WrappedChannelHandler,其构造器创建了corePoolSize及maximumPoolSize均为1,queue为LinkedBlockingQueue的connectionExecutor

  • 其connected、disconnected方法均是使用connectionExecutor来执行新创建的ChannelEventRunnable;这两个方法均会先执行checkQueueLength来判断queue大小是否大于queuewarninglimit,大于的话则打印warn日志

  • 其received、caught均是通过父类的getExecutorService获取线程池,然后执行创建的ChannelEventRunnable;received方法在捕获到异常时RejectedExecutionException且message是Request,而且request是twoWay的时候会返回SERVER_THREADPOOL_EXHAUSTED_ERROR

ConnectChannelHandlerTest

dubbo-2.7.3/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/handler/ConnectChannelHandlerTest.java

public class ConnectChannelHandlerTest extends WrappedChannelHandlerTest {    @BeforeEach    public void setUp() throws Exception {        handler = new ConnectionOrderedChannelHandler(new BizChannelHander(true), url);    }    @Test    public void test_Connect_Blocked() throws RemotingException {        handler = new ConnectionOrderedChannelHandler(new BizChannelHander(false), url);        ThreadPoolExecutor executor = (ThreadPoolExecutor) getField(handler, "connectionExecutor", 1);        Assertions.assertEquals(1, executor.getMaximumPoolSize());        int runs = 20;        int taskCount = runs * 2;        for (int i = 0; i < runs; i++) {            handler.connected(new MockedChannel());            handler.disconnected(new MockedChannel());            Assertions.assertTrue(executor.getActiveCount() <= 1, executor.getActiveCount() + " must <=1");        }        //queue.size         Assertions.assertEquals(taskCount - 1, executor.getQueue().size());        for (int i = 0; i < taskCount; i++) {            if (executor.getCompletedTaskCount() < taskCount) {                sleep(100);            }        }        Assertions.assertEquals(taskCount, executor.getCompletedTaskCount());    }    @Test //biz error should not throw and affect biz thread.    public void test_Connect_Biz_Error() throws RemotingException {        handler = new ConnectionOrderedChannelHandler(new BizChannelHander(true), url);        handler.connected(new MockedChannel());    }    @Test //biz error should not throw and affect biz thread.    public void test_Disconnect_Biz_Error() throws RemotingException {        handler = new ConnectionOrderedChannelHandler(new BizChannelHander(true), url);        handler.disconnected(new MockedChannel());    }    @Test    public void test_Connect_Execute_Error() throws RemotingException {        Assertions.assertThrows(ExecutionException.class, () -> {            handler = new ConnectionOrderedChannelHandler(new BizChannelHander(false), url);            ThreadPoolExecutor executor = (ThreadPoolExecutor) getField(handler, "connectionExecutor", 1);            executor.shutdown();            handler.connected(new MockedChannel());        });    }    @Test    public void test_Disconnect_Execute_Error() throws RemotingException {        Assertions.assertThrows(ExecutionException.class, () -> {            handler = new ConnectionOrderedChannelHandler(new BizChannelHander(false), url);            ThreadPoolExecutor executor = (ThreadPoolExecutor) getField(handler, "connectionExecutor", 1);            executor.shutdown();            handler.disconnected(new MockedChannel());        });    }    //throw  ChannelEventRunnable.runtimeExeception(int logger) not in execute exception    @Test//(expected = RemotingException.class)    public void test_MessageReceived_Biz_Error() throws RemotingException {        handler.received(new MockedChannel(), "");    }    //throw  ChannelEventRunnable.runtimeExeception(int logger) not in execute exception    @Test    public void test_Caught_Biz_Error() throws RemotingException {        handler.caught(new MockedChannel(), new BizException());    }    @Test    public void test_Received_InvokeInExecuter() throws RemotingException {        Assertions.assertThrows(ExecutionException.class, () -> {            handler = new ConnectionOrderedChannelHandler(new BizChannelHander(false), url);            ThreadPoolExecutor executor = (ThreadPoolExecutor) getField(handler, "SHARED_EXECUTOR", 1);            executor.shutdown();            executor = (ThreadPoolExecutor) getField(handler, "executor", 1);            executor.shutdown();            handler.received(new MockedChannel(), "");        });    }    /**     * Events do not pass through the thread pool and execute directly on the IO     */    @SuppressWarnings("deprecation")    @Disabled("Heartbeat is processed in HeartbeatHandler not WrappedChannelHandler.")    @Test    public void test_Received_Event_invoke_direct() throws RemotingException {        handler = new ConnectionOrderedChannelHandler(new BizChannelHander(false), url);        ThreadPoolExecutor executor = (ThreadPoolExecutor) getField(handler, "SHARED_EXECUTOR", 1);        executor.shutdown();        executor = (ThreadPoolExecutor) getField(handler, "executor", 1);        executor.shutdown();        Request req = new Request();        req.setHeartbeat(true);        final AtomicInteger count = new AtomicInteger(0);        handler.received(new MockedChannel() {            @Override            public void send(Object message) throws RemotingException {                Assertions.assertTrue(((Response) message).isHeartbeat(), "response.heartbeat");                count.incrementAndGet();            }        }, req);        Assertions.assertEquals(1, count.get(), "channel.send must be invoke");    }}
  • ConnectChannelHandlerTest在setup时创建的是ConnectionOrderedChannelHandler,然后进行了test_Connect_Blocked、test_Connect_Biz_Error、test_Disconnect_Biz_Error、test_Connect_Execute_Error、test_Disconnect_Execute_Error、test_MessageReceived_Biz_Error、test_Caught_Biz_Error、test_Received_InvokeInExecuter、test_Received_Event_invoke_direct

小结

  • ConnectionOrderedDispatcher实现了Dispatcher接口,其dispatch方法返回的是ConnectionOrderedChannelHandler;ConnectionOrderedChannelHandler继承了WrappedChannelHandler,其构造器创建了corePoolSize及maximumPoolSize均为1,queue为LinkedBlockingQueue的connectionExecutor

  • ConnectionOrderedChannelHandler的connected、disconnected方法均是使用connectionExecutor来执行新创建的ChannelEventRunnable;这两个方法均会先执行checkQueueLength来判断queue大小是否大于queuewarninglimit,大于的话则打印warn日志

  • ConnectionOrderedChannelHandler的received、caught均是通过父类的getExecutorService获取线程池,然后执行创建的ChannelEventRunnable;received方法在捕获到异常时RejectedExecutionException且message是Request,而且request是twoWay的时候会返回SERVER_THREADPOOL_EXHAUSTED_ERROR

上述内容就是dubbo中ConnectionOrderedDispatcher的作用是什么,你们学到知识或技能了吗?如果还想学到更多技能或者丰富自己的知识储备,欢迎关注行业资讯频道。

0