千家信息网

dubbo中MonitorFilter的作用是什么

发表于:2024-11-14 作者:千家信息网编辑
千家信息网最后更新 2024年11月14日,这篇文章主要介绍"dubbo中MonitorFilter的作用是什么",在日常操作中,相信很多人在dubbo中MonitorFilter的作用是什么问题上存在疑惑,小编查阅了各式资料,整理出简单好用的
千家信息网最后更新 2024年11月14日dubbo中MonitorFilter的作用是什么

这篇文章主要介绍"dubbo中MonitorFilter的作用是什么",在日常操作中,相信很多人在dubbo中MonitorFilter的作用是什么问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"dubbo中MonitorFilter的作用是什么"的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

本文主要研究一下dubbo的MonitorFilter

MonitorFilter

dubbo-2.7.2/dubbo-monitor/dubbo-monitor-api/src/main/java/org/apache/dubbo/monitor/support/MonitorFilter.java

@Activate(group = {PROVIDER, CONSUMER})public class MonitorFilter extends ListenableFilter {    private static final Logger logger = LoggerFactory.getLogger(MonitorFilter.class);    private static final String MONITOR_FILTER_START_TIME = "monitor_filter_start_time";    public MonitorFilter() {        super.listener = new MonitorListener();    }    /**     * The Concurrent counter     */    private final ConcurrentMap concurrents = new ConcurrentHashMap();    /**     * The MonitorFactory     */    private MonitorFactory monitorFactory;    public void setMonitorFactory(MonitorFactory monitorFactory) {        this.monitorFactory = monitorFactory;    }    /**     * The invocation interceptor,it will collect the invoke data about this invocation and send it to monitor center     *     * @param invoker    service     * @param invocation invocation.     * @return {@link Result} the invoke result     * @throws RpcException     */    @Override    public Result invoke(Invoker invoker, Invocation invocation) throws RpcException {        if (invoker.getUrl().hasParameter(MONITOR_KEY)) {            invocation.setAttachment(MONITOR_FILTER_START_TIME, String.valueOf(System.currentTimeMillis()));            getConcurrent(invoker, invocation).incrementAndGet(); // count up        }        return invoker.invoke(invocation); // proceed invocation chain    }    // concurrent counter    private AtomicInteger getConcurrent(Invoker invoker, Invocation invocation) {        String key = invoker.getInterface().getName() + "." + invocation.getMethodName();        AtomicInteger concurrent = concurrents.get(key);        if (concurrent == null) {            concurrents.putIfAbsent(key, new AtomicInteger());            concurrent = concurrents.get(key);        }        return concurrent;    }    //......}
  • MonitorFilter继承了ListenableFilter,其invoke方法在invoker的URL中包含有monitor参数时会给invocation设置monitor_filter_start_time的attachment,然后递增当前并发的次数;其创建的listener为MonitorListener

MonitorListener

dubbo-2.7.2/dubbo-monitor/dubbo-monitor-api/src/main/java/org/apache/dubbo/monitor/support/MonitorFilter.java

    class MonitorListener implements Listener {        @Override        public void onResponse(Result result, Invoker invoker, Invocation invocation) {            if (invoker.getUrl().hasParameter(MONITOR_KEY)) {                collect(invoker, invocation, result, RpcContext.getContext().getRemoteHost(), Long.valueOf(invocation.getAttachment(MONITOR_FILTER_START_TIME)), false);                getConcurrent(invoker, invocation).decrementAndGet(); // count down            }        }        @Override        public void onError(Throwable t, Invoker invoker, Invocation invocation) {            if (invoker.getUrl().hasParameter(MONITOR_KEY)) {                collect(invoker, invocation, null, RpcContext.getContext().getRemoteHost(), Long.valueOf(invocation.getAttachment(MONITOR_FILTER_START_TIME)), true);                getConcurrent(invoker, invocation).decrementAndGet(); // count down            }        }        /**         * The collector logic, it will be handled by the default monitor         *         * @param invoker         * @param invocation         * @param result     the invoke result         * @param remoteHost the remote host address         * @param start      the timestamp the invoke begin         * @param error      if there is an error on the invoke         */        private void collect(Invoker invoker, Invocation invocation, Result result, String remoteHost, long start, boolean error) {            try {                URL monitorUrl = invoker.getUrl().getUrlParameter(MONITOR_KEY);                Monitor monitor = monitorFactory.getMonitor(monitorUrl);                if (monitor == null) {                    return;                }                URL statisticsURL = createStatisticsUrl(invoker, invocation, result, remoteHost, start, error);                monitor.collect(statisticsURL);            } catch (Throwable t) {                logger.warn("Failed to monitor count service " + invoker.getUrl() + ", cause: " + t.getMessage(), t);            }        }        /**         * Create statistics url         *         * @param invoker         * @param invocation         * @param result         * @param remoteHost         * @param start         * @param error         * @return         */        private URL createStatisticsUrl(Invoker invoker, Invocation invocation, Result result, String remoteHost, long start, boolean error) {            // ---- service statistics ----            long elapsed = System.currentTimeMillis() - start; // invocation cost            int concurrent = getConcurrent(invoker, invocation).get(); // current concurrent count            String application = invoker.getUrl().getParameter(APPLICATION_KEY);            String service = invoker.getInterface().getName(); // service name            String method = RpcUtils.getMethodName(invocation); // method name            String group = invoker.getUrl().getParameter(GROUP_KEY);            String version = invoker.getUrl().getParameter(VERSION_KEY);            int localPort;            String remoteKey, remoteValue;            if (CONSUMER_SIDE.equals(invoker.getUrl().getParameter(SIDE_KEY))) {                // ---- for service consumer ----                localPort = 0;                remoteKey = MonitorService.PROVIDER;                remoteValue = invoker.getUrl().getAddress();            } else {                // ---- for service provider ----                localPort = invoker.getUrl().getPort();                remoteKey = MonitorService.CONSUMER;                remoteValue = remoteHost;            }            String input = "", output = "";            if (invocation.getAttachment(INPUT_KEY) != null) {                input = invocation.getAttachment(INPUT_KEY);            }            if (result != null && result.getAttachment(OUTPUT_KEY) != null) {                output = result.getAttachment(OUTPUT_KEY);            }            return new URL(COUNT_PROTOCOL, NetUtils.getLocalHost(), localPort, service + PATH_SEPARATOR + method, MonitorService.APPLICATION, application, MonitorService.INTERFACE, service, MonitorService.METHOD, method, remoteKey, remoteValue, error ? MonitorService.FAILURE : MonitorService.SUCCESS, "1", MonitorService.ELAPSED, String.valueOf(elapsed), MonitorService.CONCURRENT, String.valueOf(concurrent), INPUT_KEY, input, OUTPUT_KEY, output, GROUP_KEY, group, VERSION_KEY, version);        }    }
  • MonitorListener实现了Listener接口,其onResponse及onError方法在invoker的URL中包含有monitor参数时会上报指标,然后递减并发次数

实例

dubbo-2.7.2/dubbo-monitor/dubbo-monitor-api/src/test/java/org/apache/dubbo/monitor/support/MonitorFilterTest.java

public class MonitorFilterTest {    private volatile URL lastStatistics;    private volatile Invocation lastInvocation;    private final Invoker serviceInvoker = new Invoker() {        @Override        public Class getInterface() {            return MonitorService.class;        }        public URL getUrl() {            try {                return URL.valueOf("dubbo://" + NetUtils.getLocalHost() + ":20880?" + APPLICATION_KEY + "=abc&" + SIDE_KEY + "=" + CONSUMER_SIDE + "&" + MONITOR_KEY + "=" + URLEncoder.encode("dubbo://" + NetUtils.getLocalHost() + ":7070", "UTF-8"));            } catch (UnsupportedEncodingException e) {                throw new IllegalStateException(e.getMessage(), e);            }        }        @Override        public boolean isAvailable() {            return false;        }        public Result invoke(Invocation invocation) throws RpcException {            lastInvocation = invocation;            return AsyncRpcResult.newDefaultAsyncResult(invocation);        }        @Override        public void destroy() {        }    };    private MonitorFactory monitorFactory = new MonitorFactory() {        @Override        public Monitor getMonitor(final URL url) {            return new Monitor() {                public URL getUrl() {                    return url;                }                @Override                public boolean isAvailable() {                    return true;                }                @Override                public void destroy() {                }                public void collect(URL statistics) {                    MonitorFilterTest.this.lastStatistics = statistics;                }                public List lookup(URL query) {                    return Arrays.asList(MonitorFilterTest.this.lastStatistics);                }            };        }    };    @Test    public void testFilter() throws Exception {        MonitorFilter monitorFilter = new MonitorFilter();        monitorFilter.setMonitorFactory(monitorFactory);        Invocation invocation = new RpcInvocation("aaa", new Class[0], new Object[0]);        RpcContext.getContext().setRemoteAddress(NetUtils.getLocalHost(), 20880).setLocalAddress(NetUtils.getLocalHost(), 2345);        Result result = monitorFilter.invoke(serviceInvoker, invocation);        result.thenApplyWithContext((r) -> {            monitorFilter.listener().onResponse(r, serviceInvoker, invocation);            return r;        });        while (lastStatistics == null) {            Thread.sleep(10);        }        Assertions.assertEquals("abc", lastStatistics.getParameter(MonitorService.APPLICATION));        Assertions.assertEquals(MonitorService.class.getName(), lastStatistics.getParameter(MonitorService.INTERFACE));        Assertions.assertEquals("aaa", lastStatistics.getParameter(MonitorService.METHOD));        Assertions.assertEquals(NetUtils.getLocalHost() + ":20880", lastStatistics.getParameter(MonitorService.PROVIDER));        Assertions.assertEquals(NetUtils.getLocalHost(), lastStatistics.getAddress());        Assertions.assertEquals(null, lastStatistics.getParameter(MonitorService.CONSUMER));        Assertions.assertEquals(1, lastStatistics.getParameter(MonitorService.SUCCESS, 0));        Assertions.assertEquals(0, lastStatistics.getParameter(MonitorService.FAILURE, 0));        Assertions.assertEquals(1, lastStatistics.getParameter(MonitorService.CONCURRENT, 0));        Assertions.assertEquals(invocation, lastInvocation);    }    @Test    public void testSkipMonitorIfNotHasKey() {        MonitorFilter monitorFilter = new MonitorFilter();        MonitorFactory mockMonitorFactory = mock(MonitorFactory.class);        monitorFilter.setMonitorFactory(mockMonitorFactory);        Invocation invocation = new RpcInvocation("aaa", new Class[0], new Object[0]);        Invoker invoker = mock(Invoker.class);        given(invoker.getUrl()).willReturn(URL.valueOf("dubbo://" + NetUtils.getLocalHost() + ":20880?" + APPLICATION_KEY + "=abc&" + SIDE_KEY + "=" + CONSUMER_SIDE));        monitorFilter.invoke(invoker, invocation);        verify(mockMonitorFactory, never()).getMonitor(any(URL.class));    }    @Test    public void testGenericFilter() throws Exception {        MonitorFilter monitorFilter = new MonitorFilter();        monitorFilter.setMonitorFactory(monitorFactory);        Invocation invocation = new RpcInvocation("$invoke", new Class[]{String.class, String[].class, Object[].class}, new Object[]{"xxx", new String[]{}, new Object[]{}});        RpcContext.getContext().setRemoteAddress(NetUtils.getLocalHost(), 20880).setLocalAddress(NetUtils.getLocalHost(), 2345);        Result result = monitorFilter.invoke(serviceInvoker, invocation);        result.thenApplyWithContext((r) -> {            monitorFilter.listener().onResponse(r, serviceInvoker, invocation);            return r;        });        while (lastStatistics == null) {            Thread.sleep(10);        }        Assertions.assertEquals("abc", lastStatistics.getParameter(MonitorService.APPLICATION));        Assertions.assertEquals(MonitorService.class.getName(), lastStatistics.getParameter(MonitorService.INTERFACE));        Assertions.assertEquals("xxx", lastStatistics.getParameter(MonitorService.METHOD));        Assertions.assertEquals(NetUtils.getLocalHost() + ":20880", lastStatistics.getParameter(MonitorService.PROVIDER));        Assertions.assertEquals(NetUtils.getLocalHost(), lastStatistics.getAddress());        Assertions.assertEquals(null, lastStatistics.getParameter(MonitorService.CONSUMER));        Assertions.assertEquals(1, lastStatistics.getParameter(MonitorService.SUCCESS, 0));        Assertions.assertEquals(0, lastStatistics.getParameter(MonitorService.FAILURE, 0));        Assertions.assertEquals(1, lastStatistics.getParameter(MonitorService.CONCURRENT, 0));        Assertions.assertEquals(invocation, lastInvocation);    }    @Test    public void testSafeFailForMonitorCollectFail() {        MonitorFilter monitorFilter = new MonitorFilter();        MonitorFactory mockMonitorFactory = mock(MonitorFactory.class);        Monitor mockMonitor = mock(Monitor.class);        Mockito.doThrow(new RuntimeException()).when(mockMonitor).collect(any(URL.class));        monitorFilter.setMonitorFactory(mockMonitorFactory);        given(mockMonitorFactory.getMonitor(any(URL.class))).willReturn(mockMonitor);        Invocation invocation = new RpcInvocation("aaa", new Class[0], new Object[0]);        monitorFilter.invoke(serviceInvoker, invocation);    }}
  • MonitorFilterTest验证了testFilter、testSkipMonitorIfNotHasKey、testGenericFilter、testSafeFailForMonitorCollectFail这几个场景

小结

MonitorFilter继承了ListenableFilter,其invoke方法在invoker的URL中包含有monitor参数时会给invocation设置monitor_filter_start_time的attachment,然后递增当前并发的次数;其创建的listener为MonitorListener;MonitorListener实现了Listener接口,其onResponse及onError方法在invoker的URL中包含有monitor参数时会上报指标,然后递减并发次数

到此,关于"dubbo中MonitorFilter的作用是什么"的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注网站,小编会继续努力为大家带来更多实用的文章!

0