千家信息网

dubbo中ForkingClusterInvoker的作用是什么

发表于:2025-01-27 作者:千家信息网编辑
千家信息网最后更新 2025年01月27日,这期内容当中小编将会给大家带来有关dubbo中ForkingClusterInvoker的作用是什么,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。ForkingCl
千家信息网最后更新 2025年01月27日dubbo中ForkingClusterInvoker的作用是什么

这期内容当中小编将会给大家带来有关dubbo中ForkingClusterInvoker的作用是什么,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。

ForkingClusterInvoker

dubbo-2.7.3/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/ForkingClusterInvoker.java

public class ForkingClusterInvoker extends AbstractClusterInvoker {    /**     * Use {@link NamedInternalThreadFactory} to produce {@link org.apache.dubbo.common.threadlocal.InternalThread}     * which with the use of {@link org.apache.dubbo.common.threadlocal.InternalThreadLocal} in {@link RpcContext}.     */    private final ExecutorService executor = Executors.newCachedThreadPool(            new NamedInternalThreadFactory("forking-cluster-timer", true));    public ForkingClusterInvoker(Directory directory) {        super(directory);    }    @Override    @SuppressWarnings({"unchecked", "rawtypes"})    public Result doInvoke(final Invocation invocation, List> invokers, LoadBalance loadbalance) throws RpcException {        try {            checkInvokers(invokers, invocation);            final List> selected;            final int forks = getUrl().getParameter(FORKS_KEY, DEFAULT_FORKS);            final int timeout = getUrl().getParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);            if (forks <= 0 || forks >= invokers.size()) {                selected = invokers;            } else {                selected = new ArrayList<>();                for (int i = 0; i < forks; i++) {                    Invoker invoker = select(loadbalance, invocation, invokers, selected);                    if (!selected.contains(invoker)) {                        //Avoid add the same invoker several times.                        selected.add(invoker);                    }                }            }            RpcContext.getContext().setInvokers((List) selected);            final AtomicInteger count = new AtomicInteger();            final BlockingQueue ref = new LinkedBlockingQueue<>();            for (final Invoker invoker : selected) {                executor.execute(new Runnable() {                    @Override                    public void run() {                        try {                            Result result = invoker.invoke(invocation);                            ref.offer(result);                        } catch (Throwable e) {                            int value = count.incrementAndGet();                            if (value >= selected.size()) {                                ref.offer(e);                            }                        }                    }                });            }            try {                Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);                if (ret instanceof Throwable) {                    Throwable e = (Throwable) ret;                    throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);                }                return (Result) ret;            } catch (InterruptedException e) {                throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e);            }        } finally {            // clear attachments which is binding to current thread.            RpcContext.getContext().clearAttachments();        }    }}
  • ForkingClusterInvoker使用Executors.newCachedThreadPool创建了一个executor;其doInvoke从url获取forks及timeout参数,然后从invokers选出forks个数的invoker,然后放到executor请求执行invoker.invoke(invocation),把Result放到LinkedBlockingQueue,最后使用指定的timeout去poll出第一个返回结果返回,异常的话抛出RpcException

ForkingClusterInvokerTest

dubbo-2.7.3/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/ForkingClusterInvokerTest.java

public class ForkingClusterInvokerTest {    private List> invokers = new ArrayList>();    private URL url = URL.valueOf("test://test:11/test?forks=2");    private Invoker invoker1 = mock(Invoker.class);    private Invoker invoker2 = mock(Invoker.class);    private Invoker invoker3 = mock(Invoker.class);    private RpcInvocation invocation = new RpcInvocation();    private Directory dic;    private Result result = new AppResponse();    @BeforeEach    public void setUp() throws Exception {        dic = mock(Directory.class);        given(dic.getUrl()).willReturn(url);        given(dic.list(invocation)).willReturn(invokers);        given(dic.getInterface()).willReturn(ForkingClusterInvokerTest.class);        invocation.setMethodName("method1");        invokers.add(invoker1);        invokers.add(invoker2);        invokers.add(invoker3);    }    private void resetInvokerToException() {        given(invoker1.invoke(invocation)).willThrow(new RuntimeException());        given(invoker1.getUrl()).willReturn(url);        given(invoker1.isAvailable()).willReturn(true);        given(invoker1.getInterface()).willReturn(ForkingClusterInvokerTest.class);        given(invoker2.invoke(invocation)).willThrow(new RuntimeException());        given(invoker2.getUrl()).willReturn(url);        given(invoker2.isAvailable()).willReturn(true);        given(invoker2.getInterface()).willReturn(ForkingClusterInvokerTest.class);        given(invoker3.invoke(invocation)).willThrow(new RuntimeException());        given(invoker3.getUrl()).willReturn(url);        given(invoker3.isAvailable()).willReturn(true);        given(invoker3.getInterface()).willReturn(ForkingClusterInvokerTest.class);    }    private void resetInvokerToNoException() {        given(invoker1.invoke(invocation)).willReturn(result);        given(invoker1.getUrl()).willReturn(url);        given(invoker1.isAvailable()).willReturn(true);        given(invoker1.getInterface()).willReturn(ForkingClusterInvokerTest.class);        given(invoker2.invoke(invocation)).willReturn(result);        given(invoker2.getUrl()).willReturn(url);        given(invoker2.isAvailable()).willReturn(true);        given(invoker2.getInterface()).willReturn(ForkingClusterInvokerTest.class);        given(invoker3.invoke(invocation)).willReturn(result);        given(invoker3.getUrl()).willReturn(url);        given(invoker3.isAvailable()).willReturn(true);        given(invoker3.getInterface()).willReturn(ForkingClusterInvokerTest.class);    }    @Test    public void testInvokeException() {        resetInvokerToException();        ForkingClusterInvoker invoker = new ForkingClusterInvoker(                dic);        try {            invoker.invoke(invocation);            Assertions.fail();        } catch (RpcException expected) {            Assertions.assertTrue(expected.getMessage().contains("Failed to forking invoke provider"));            assertFalse(expected.getCause() instanceof RpcException);        }    }    @Test    public void testClearRpcContext() {        resetInvokerToException();        ForkingClusterInvoker invoker = new ForkingClusterInvoker(                dic);        String attachKey = "attach";        String attachValue = "value";        RpcContext.getContext().setAttachment(attachKey, attachValue);        Map attachments = RpcContext.getContext().getAttachments();        Assertions.assertTrue(attachments != null && attachments.size() == 1, "set attachment failed!");        try {            invoker.invoke(invocation);            Assertions.fail();        } catch (RpcException expected) {            Assertions.assertTrue(expected.getMessage().contains("Failed to forking invoke provider"), "Succeeded to forking invoke provider !");            assertFalse(expected.getCause() instanceof RpcException);        }        Map afterInvoke = RpcContext.getContext().getAttachments();        Assertions.assertTrue(afterInvoke != null && afterInvoke.size() == 0, "clear attachment failed!");    }    @Test()    public void testInvokeNoException() {        resetInvokerToNoException();        ForkingClusterInvoker invoker = new ForkingClusterInvoker(                dic);        Result ret = invoker.invoke(invocation);        Assertions.assertSame(result, ret);    }}
  • ForkingClusterInvokerTest验证了testInvokeException、testClearRpcContext两个场景

小结

ForkingClusterInvoker使用Executors.newCachedThreadPool创建了一个executor;其doInvoke从url获取forks及timeout参数,然后从invokers选出forks个数的invoker,然后放到executor请求执行invoker.invoke(invocation),把Result放到LinkedBlockingQueue,最后使用指定的timeout去poll出第一个返回结果返回,异常的话抛出RpcException

上述就是小编为大家分享的dubbo中ForkingClusterInvoker的作用是什么了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注行业资讯频道。

作用 个数 内容 参数 结果 选出 分析 专业 两个 中小 内容丰富 场景 小结 就是 文章 更多 知识 篇文章 行业 角度 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 深圳服务器散热器订制 服务器机箱钥匙 武汉催收软件开发公司排名 服务器上有固态和机械硬盘 酒店网络安全管理方案 一键删除数据库有什么影响 医保系统显示无法连接服务器 第46届湖北省网络安全集训队 朝阳区智能网络技术服务有哪些 湖南众益互联网科技 女生学软件开发专业 物资管理软件开发项目 软件开发中的组织构架 怎样查看数据库的名称 杨浦区品牌软件开发诚信经营 微信云数据库可以关联表吗 互联网网络安全管理 苏州dns服务器在哪个区 网络安全简报范华 微信私聊信息在服务器存多久 2019网络安全大赛怎么比 服务器主机能组群晖么 电视家 自定义服务器 网吧语音软件开发 ntp 时钟服务器 什么数据库适合高更新 检查更新为什么一直找不到服务器 象云北京网络技术有限公司 世界上最牛逼的网络安全工程师 北京智珠网络技术有限公司注册
0