千家信息网

如何实现 LoggingMetricsConsumer将指标值输出到metric.log日志文件

发表于:2025-02-03 作者:千家信息网编辑
千家信息网最后更新 2025年02月03日,今天就跟大家聊聊有关如何实现 LoggingMetricsConsumer将指标值输出到metric.log日志文件,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这
千家信息网最后更新 2025年02月03日如何实现 LoggingMetricsConsumer将指标值输出到metric.log日志文件

今天就跟大家聊聊有关如何实现 LoggingMetricsConsumer将指标值输出到metric.log日志文件,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。

前提说明:

storm从0.9.0开始,增加了指标统计框架,用来收集应用程序的特定指标,并将其输出到外部系统。

一般来说,您只需要去实现 LoggingMetricsConsumer,统计将指标值输出到metric.log日志文件之中。

当然,您也可以自定义一个监听的类:只需要去实现IMetricsConsumer接口就可以了。这些类可以在代码里注册(registerMetricsConsumer),也可以在 storm.yaml配置文件中注册:

package com.digitalpebble.storm.crawler;import backtype.storm.Config;import backtype.storm.metric.MetricsConsumerBolt;import backtype.storm.metric.api.IMetricsConsumer;import backtype.storm.task.IErrorReporter;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Tuple;import backtype.storm.utils.Utils;import com.google.common.base.Joiner;import com.google.common.base.Supplier;import com.google.common.collect.ImmutableMap;import com.google.common.collect.ImmutableSortedMap;import org.codehaus.jackson.map.ObjectMapper;import org.codehaus.jackson.map.ObjectWriter;import org.mortbay.jetty.Server;import org.mortbay.jetty.servlet.Context;import org.mortbay.jetty.servlet.ServletHolder;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import javax.servlet.ServletException;import javax.servlet.http.HttpServlet;import javax.servlet.http.HttpServletRequest;import javax.servlet.http.HttpServletResponse;import java.io.IOException;import java.util.*;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.ConcurrentMap;import java.util.concurrent.atomic.AtomicLong;/** * @author Enno Shioji (enno.shioji@peerindex.com) */public class DebugMetricConsumer implements IMetricsConsumer {        private static final Logger log = LoggerFactory                        .getLogger(DebugMetricConsumer.class);        private IErrorReporter errorReporter;        private Server server;        // Make visible to servlet threads        private volatile TopologyContext context;        private volatile ConcurrentMap metrics;        private volatile ConcurrentMap> metrics_metadata;        public void prepare(Map stormConf, Object registrationArgument,                        TopologyContext context, IErrorReporter errorReporter) {                this.context = context;                this.errorReporter = errorReporter;                this.metrics = new ConcurrentHashMap();                this.metrics_metadata = new ConcurrentHashMap>();                try {                        // TODO Config file not tested                        final String PORT_CONFIG_STRING = "topology.metrics.consumers.debug.servlet.port";                        Integer port = (Integer) stormConf.get(PORT_CONFIG_STRING);                        if (port == null) {                                log.warn("Metrics debug servlet's port not specified, defaulting to 7070. You can specify it via "                                                + PORT_CONFIG_STRING + " in storm.yaml");                                port = 7070;                        }                        server = startServlet(port);                } catch (Exception e) {                        log.error("Failed to start metrics server", e);                        throw new AssertionError(e);                }        }        private static final Joiner ON_COLONS = Joiner.on("::");        public void handleDataPoints(TaskInfo taskInfo,                        Collection dataPoints) {                // In order                String componentId = taskInfo.srcComponentId;                Integer taskId = taskInfo.srcTaskId;                Integer updateInterval = taskInfo.updateIntervalSecs;                Long timestamp = taskInfo.timestamp;                for (DataPoint point : dataPoints) {                        String metric_name = point.name;                        try {                                Map metric = (Map) point.value;                                for (Map.Entry entry : metric.entrySet()) {                                        String metricId = ON_COLONS.join(componentId, taskId,                                                        metric_name, entry.getKey());                                        Number val = entry.getValue();                                        metrics.put(metricId, val);                                        metrics_metadata.put(metricId, ImmutableMap                                                        . of("updateInterval",                                                                        updateInterval, "lastreported", timestamp));                                }                        } catch (RuntimeException e) {                                // One can easily send something else than a Map                                // down the __metrics stream and make this part break.                                // If you ask me either the message should carry type                                // information or there should be different stream per message                                // type                                // This is one of the reasons why I want to write a further                                // abstraction on this facility                                errorReporter.reportError(e);                                metrics_metadata                                                .putIfAbsent("ERROR_METRIC_CONSUMER_"                                                                + e.getClass().getSimpleName(), ImmutableMap                                                                .of("offending_message_sample", point.value));                        }                }        }        private static final ObjectMapper OM = new ObjectMapper();        private Server startServlet(int serverPort) throws Exception {                // Setup HTTP server                Server server = new Server(serverPort);                Context root = new Context(server, "/");                server.start();                HttpServlet servlet = new HttpServlet() {                        @Override                        protected void doGet(HttpServletRequest req,                                        HttpServletResponse resp) throws ServletException,                                        IOException {                                SortedMap metrics = ImmutableSortedMap                                                .copyOf(DebugMetricConsumer.this.metrics);                                SortedMap> metrics_metadata = ImmutableSortedMap                                                .copyOf(DebugMetricConsumer.this.metrics_metadata);                                Map toplevel = ImmutableMap                                                .of("retrieved",                                                                new Date(),                                                                // TODO this call fails with mysterious                                                                // exception                                                                // "java.lang.IllegalArgumentException: Could not find component common for __metrics"                                                                // Mailing list suggests it's a library version                                                                // issue but couldn't find anything suspicious                                                                // Need to eventually investigate                                                                // "sources",                                                                // context.getThisSources().toString(),                                                                "metrics", metrics, "metric_metadata",                                                                metrics_metadata);                                ObjectWriter prettyPrinter = OM                                                .writerWithDefaultPrettyPrinter();                                prettyPrinter.writeValue(resp.getWriter(), toplevel);                        }                };                root.addServlet(new ServletHolder(servlet), "/metrics");                log.info("Started metric server...");                return server;        }        public void cleanup() {                try {                        server.stop();                } catch (Exception e) {                        throw new AssertionError(e);                }        }}

看完上述内容,你们对如何实现 LoggingMetricsConsumer将指标值输出到metric.log日志文件有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注行业资讯频道,感谢大家的支持。

0