千家信息网

dubbo中ExecutionDispatcher的作用是什么

发表于:2025-02-03 作者:千家信息网编辑
千家信息网最后更新 2025年02月03日,本篇文章为大家展示了dubbo中ExecutionDispatcher的作用是什么,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。ExecutionDispat
千家信息网最后更新 2025年02月03日dubbo中ExecutionDispatcher的作用是什么

本篇文章为大家展示了dubbo中ExecutionDispatcher的作用是什么,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。

ExecutionDispatcher

dubbo-2.7.3/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/execution/ExecutionDispatcher.java

public class ExecutionDispatcher implements Dispatcher {    public static final String NAME = "execution";    @Override    public ChannelHandler dispatch(ChannelHandler handler, URL url) {        return new ExecutionChannelHandler(handler, url);    }}
  • ExecutionDispatcher实现了Dispatcher接口,其dispatch方法返回的是ExecutionChannelHandler

ExecutionChannelHandler

dubbo-2.7.3/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/execution/ExecutionChannelHandler.java

public class ExecutionChannelHandler extends WrappedChannelHandler {    public ExecutionChannelHandler(ChannelHandler handler, URL url) {        super(handler, url);    }    @Override    public void received(Channel channel, Object message) throws RemotingException {        ExecutorService executor = getExecutorService();        if (message instanceof Request) {            try {                executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));            } catch (Throwable t) {                // FIXME: when the thread pool is full, SERVER_THREADPOOL_EXHAUSTED_ERROR cannot return properly,                // therefore the consumer side has to wait until gets timeout. This is a temporary solution to prevent                // this scenario from happening, but a better solution should be considered later.                if (t instanceof RejectedExecutionException) {                    Request request = (Request) message;                    if (request.isTwoWay()) {                        String msg = "Server side(" + url.getIp() + "," + url.getPort()                                + ") thread pool is exhausted, detail msg:" + t.getMessage();                        Response response = new Response(request.getId(), request.getVersion());                        response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);                        response.setErrorMessage(msg);                        channel.send(response);                        return;                    }                }                throw new ExecutionException(message, channel, getClass() + " error when process received event.", t);            }        } else {            handler.received(channel, message);        }    }}
  • ExecutionChannelHandler继承了WrappedChannelHandler,其received方法判断message是否是Request类型,如果是则创建ChannelEventRunnable放到线程池里头执行,如果不是则直接执行handler.received

PerformanceServerTest

dubbo-2.7.3/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/PerformanceServerTest.java

public class PerformanceServerTest  {    private static final Logger logger = LoggerFactory.getLogger(PerformanceServerTest.class);    private static ExchangeServer server = null;    private static void restartServer(int times, int alive, int sleep) throws Exception {        if (server != null && !server.isClosed()) {            server.close();            Thread.sleep(100);        }        for (int i = 0; i < times; i++) {            logger.info("restart times:" + i);            server = statServer();            if (alive > 0) Thread.sleep(alive);            server.close();            if (sleep > 0) Thread.sleep(sleep);        }        server = statServer();    }    private static ExchangeServer statServer() throws Exception {        final int port = PerformanceUtils.getIntProperty("port", 9911);        final String transporter = PerformanceUtils.getProperty(Constants.TRANSPORTER_KEY, Constants.DEFAULT_TRANSPORTER);        final String serialization = PerformanceUtils.getProperty(Constants.SERIALIZATION_KEY, Constants.DEFAULT_REMOTING_SERIALIZATION);        final String threadpool = PerformanceUtils.getProperty(THREADPOOL_KEY, DEFAULT_THREADPOOL);        final int threads = PerformanceUtils.getIntProperty(THREADS_KEY, DEFAULT_THREADS);        final int iothreads = PerformanceUtils.getIntProperty(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS);        final int buffer = PerformanceUtils.getIntProperty(BUFFER_KEY, DEFAULT_BUFFER_SIZE);        final String channelHandler = PerformanceUtils.getProperty(Constants.DISPATCHER_KEY, ExecutionDispatcher.NAME);        // Start server        ExchangeServer server = Exchangers.bind("exchange://0.0.0.0:" + port + "?transporter="                + transporter + "&serialization="                + serialization + "&threadpool=" + threadpool                + "&threads=" + threads + "&iothreads=" + iothreads + "&buffer=" + buffer + "&channel.handler=" + channelHandler, new ExchangeHandlerAdapter() {            public String telnet(Channel channel, String message) throws RemotingException {                return "echo: " + message + "\r\ntelnet> ";            }            public CompletableFuture reply(ExchangeChannel channel, Object request) throws RemotingException {                if ("environment".equals(request)) {                    return CompletableFuture.completedFuture(PerformanceUtils.getEnvironment());                }                if ("scene".equals(request)) {                    List scene = new ArrayList();                    scene.add("Transporter: " + transporter);                    scene.add("Service Threads: " + threads);                    return CompletableFuture.completedFuture(scene);                }                return CompletableFuture.completedFuture(request);            }        });        return server;    }    private static ExchangeServer statTelnetServer(int port) throws Exception {        // Start server        ExchangeServer telnetserver = Exchangers.bind("exchange://0.0.0.0:" + port, new ExchangeHandlerAdapter() {            public String telnet(Channel channel, String message) throws RemotingException {                if (message.equals("help")) {                    return "support cmd: \r\n\tstart \r\n\tstop \r\n\tshutdown \r\n\trestart times [alive] [sleep] \r\ntelnet>";                } else if (message.equals("stop")) {                    logger.info("server closed:" + server);                    server.close();                    return "stop server\r\ntelnet>";                } else if (message.startsWith("start")) {                    try {                        restartServer(0, 0, 0);                    } catch (Exception e) {                        e.printStackTrace();                    }                    return "start server\r\ntelnet>";                } else if (message.startsWith("shutdown")) {                    System.exit(0);                    return "start server\r\ntelnet>";                } else if (message.startsWith("channels")) {                    return "server.getExchangeChannels():" + server.getExchangeChannels().size() + "\r\ntelnet>";                } else if (message.startsWith("restart ")) { //r times [sleep] r 10 or r 10 100                    String[] args = message.split(" ");                    int times = Integer.parseInt(args[1]);                    int alive = args.length > 2 ? Integer.parseInt(args[2]) : 0;                    int sleep = args.length > 3 ? Integer.parseInt(args[3]) : 100;                    try {                        restartServer(times, alive, sleep);                    } catch (Exception e) {                        e.printStackTrace();                    }                    return "restart server,times:" + times + " stop alive time: " + alive + ",sleep time: " + sleep + " usage:r times [alive] [sleep] \r\ntelnet>";                } else {                    return "echo: " + message + "\r\ntelnet> ";                }            }        });        return telnetserver;    }    @Test    public void testServer() throws Exception {        // Read port from property        if (PerformanceUtils.getProperty("port", null) == null) {            logger.warn("Please set -Dport=9911");            return;        }        final int port = PerformanceUtils.getIntProperty("port", 9911);        final boolean telnet = PerformanceUtils.getBooleanProperty("telnet", true);        if (telnet) statTelnetServer(port + 1);        server = statServer();        synchronized (PerformanceServerTest.class) {            while (true) {                try {                    PerformanceServerTest.class.wait();                } catch (InterruptedException e) {                }            }        }    }}
  • PerformanceServerTest的statServer方法使用PerformanceUtils.getProperty(Constants.DISPATCHER_KEY, ExecutionDispatcher.NAME)获取channelHandler,找不到则使用ExecutionDispatcher.NAME

上述内容就是dubbo中ExecutionDispatcher的作用是什么,你们学到知识或技能了吗?如果还想学到更多技能或者丰富自己的知识储备,欢迎关注行业资讯频道。

0