如何实现 LoggingMetricsConsumer将指标值输出到metric.log日志文件
发表于:2025-02-03 作者:千家信息网编辑
千家信息网最后更新 2025年02月03日,今天就跟大家聊聊有关如何实现 LoggingMetricsConsumer将指标值输出到metric.log日志文件,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这
千家信息网最后更新 2025年02月03日如何实现 LoggingMetricsConsumer将指标值输出到metric.log日志文件
今天就跟大家聊聊有关如何实现 LoggingMetricsConsumer将指标值输出到metric.log日志文件,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。
前提说明:
storm从0.9.0开始,增加了指标统计框架,用来收集应用程序的特定指标,并将其输出到外部系统。
一般来说,您只需要去实现 LoggingMetricsConsumer,统计将指标值输出到metric.log日志文件之中。
当然,您也可以自定义一个监听的类:只需要去实现IMetricsConsumer接口就可以了。这些类可以在代码里注册(registerMetricsConsumer),也可以在 storm.yaml配置文件中注册:
package com.digitalpebble.storm.crawler;import backtype.storm.Config;import backtype.storm.metric.MetricsConsumerBolt;import backtype.storm.metric.api.IMetricsConsumer;import backtype.storm.task.IErrorReporter;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Tuple;import backtype.storm.utils.Utils;import com.google.common.base.Joiner;import com.google.common.base.Supplier;import com.google.common.collect.ImmutableMap;import com.google.common.collect.ImmutableSortedMap;import org.codehaus.jackson.map.ObjectMapper;import org.codehaus.jackson.map.ObjectWriter;import org.mortbay.jetty.Server;import org.mortbay.jetty.servlet.Context;import org.mortbay.jetty.servlet.ServletHolder;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import javax.servlet.ServletException;import javax.servlet.http.HttpServlet;import javax.servlet.http.HttpServletRequest;import javax.servlet.http.HttpServletResponse;import java.io.IOException;import java.util.*;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.ConcurrentMap;import java.util.concurrent.atomic.AtomicLong;/** * @author Enno Shioji (enno.shioji@peerindex.com) */public class DebugMetricConsumer implements IMetricsConsumer { private static final Logger log = LoggerFactory .getLogger(DebugMetricConsumer.class); private IErrorReporter errorReporter; private Server server; // Make visible to servlet threads private volatile TopologyContext context; private volatile ConcurrentMapmetrics; private volatile ConcurrentMap > metrics_metadata; public void prepare(Map stormConf, Object registrationArgument, TopologyContext context, IErrorReporter errorReporter) { this.context = context; this.errorReporter = errorReporter; this.metrics = new ConcurrentHashMap (); this.metrics_metadata = new ConcurrentHashMap >(); try { // TODO Config file not tested final String PORT_CONFIG_STRING = "topology.metrics.consumers.debug.servlet.port"; Integer port = (Integer) stormConf.get(PORT_CONFIG_STRING); if (port == null) { log.warn("Metrics debug servlet's port not specified, defaulting to 7070. You can specify it via " + PORT_CONFIG_STRING + " in storm.yaml"); port = 7070; } server = startServlet(port); } catch (Exception e) { log.error("Failed to start metrics server", e); throw new AssertionError(e); } } private static final Joiner ON_COLONS = Joiner.on("::"); public void handleDataPoints(TaskInfo taskInfo, Collection dataPoints) { // In order String componentId = taskInfo.srcComponentId; Integer taskId = taskInfo.srcTaskId; Integer updateInterval = taskInfo.updateIntervalSecs; Long timestamp = taskInfo.timestamp; for (DataPoint point : dataPoints) { String metric_name = point.name; try { Map metric = (Map ) point.value; for (Map.Entry entry : metric.entrySet()) { String metricId = ON_COLONS.join(componentId, taskId, metric_name, entry.getKey()); Number val = entry.getValue(); metrics.put(metricId, val); metrics_metadata.put(metricId, ImmutableMap . of("updateInterval", updateInterval, "lastreported", timestamp)); } } catch (RuntimeException e) { // One can easily send something else than a Map // down the __metrics stream and make this part break. // If you ask me either the message should carry type // information or there should be different stream per message // type // This is one of the reasons why I want to write a further // abstraction on this facility errorReporter.reportError(e); metrics_metadata .putIfAbsent("ERROR_METRIC_CONSUMER_" + e.getClass().getSimpleName(), ImmutableMap .of("offending_message_sample", point.value)); } } } private static final ObjectMapper OM = new ObjectMapper(); private Server startServlet(int serverPort) throws Exception { // Setup HTTP server Server server = new Server(serverPort); Context root = new Context(server, "/"); server.start(); HttpServlet servlet = new HttpServlet() { @Override protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { SortedMap metrics = ImmutableSortedMap .copyOf(DebugMetricConsumer.this.metrics); SortedMap > metrics_metadata = ImmutableSortedMap .copyOf(DebugMetricConsumer.this.metrics_metadata); Map toplevel = ImmutableMap .of("retrieved", new Date(), // TODO this call fails with mysterious // exception // "java.lang.IllegalArgumentException: Could not find component common for __metrics" // Mailing list suggests it's a library version // issue but couldn't find anything suspicious // Need to eventually investigate // "sources", // context.getThisSources().toString(), "metrics", metrics, "metric_metadata", metrics_metadata); ObjectWriter prettyPrinter = OM .writerWithDefaultPrettyPrinter(); prettyPrinter.writeValue(resp.getWriter(), toplevel); } }; root.addServlet(new ServletHolder(servlet), "/metrics"); log.info("Started metric server..."); return server; } public void cleanup() { try { server.stop(); } catch (Exception e) { throw new AssertionError(e); } }}
看完上述内容,你们对如何实现 LoggingMetricsConsumer将指标值输出到metric.log日志文件有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注行业资讯频道,感谢大家的支持。
指标
文件
输出
指标值
日志
内容
统计
一般来说
之中
代码
前提
应用程序
接口
更多
框架
知识
程序
篇文章
系统
行业
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
企业软件开发管理方法
炀艺互联网科技开发有限公司
电影存储服务器
租esc服务器
学校校园网络安全宣传活动
湖北通信软件开发服务检测中心
新雅软件开发
阿里云服务器下载速度慢怎么提升
网络安全体会300字百度文库
马云有网络技术吗
肾移植数据库排队查询
杭州咕噜网络技术
学校网络安全规划
抖音视频使用的数据库技术
网络安全手抄报的句子英语
淘宝上的传奇服务器
为什么淘宝助理连不上服务器
常州市公安局网络安全支队
大额软件开发报价单
数据库从哪里进入
南通协同管理软件开发
2017年网络安全观后感
更新ios15无法连接服务器
东莞报修管理软件开发
河南生产管理软件开发
仓加互联网科技有限公司
北京电力应急软件开发价格
古楼警长开展网络安全教育
mysql数据库自动更新怎么办
锁链战记 卡牌数据库