千家信息网

kafka-Storm中如何将日志文件打印到local

发表于:2025-02-08 作者:千家信息网编辑
千家信息网最后更新 2025年02月08日,这篇文章给大家介绍kafka-Storm中如何将日志文件打印到local,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。阅读前提:1 : 您可能需要对 logback 日志系统有所
千家信息网最后更新 2025年02月08日kafka-Storm中如何将日志文件打印到local

这篇文章给大家介绍kafka-Storm中如何将日志文件打印到local,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。

阅读前提:

1 : 您可能需要对 logback 日志系统有所了解

2 :您可能需要对于 kafka 有初步的了解

3:请代码查看之前,请您仔细参考系统的业务图解

由于kafka本身自带了和『Hadoop』的接口,如果需要将kafka中的文件直接迁移到HDFS,请参看本ID的另外一篇博文:

业务系统-kafka-Storm【日志本地化】 - 2 :直接通过kafka将日志传递到HDFS

1: 一个正式环境系统的系统设计图解:

通过kafka集群,在2个相同的topic之下,通过kafka-storm, he kafka-hadoop,2 个Consumer,针对同样的一份数据,我们分流了2个管道:

其一: 实时通道

其二:离线通道

在日志本地化的过程之中,前期,由于日志的清洗,过滤的工作是放在Storm集群之中,也就是说,留存到本地locla的日志。是我们在Storm集群之中进行了清洗的数据。

也就是:

如下图所示:

在kafka之中,通常而言,有如下的 代码 用来处理:

在这里我们针对了2种日志,有两个Consumer用来处理

package com.mixbox.kafka.consumer;public class logSave {        public static void main(String[] args) throws Exception {                Consumer_Thread visitlog = new Consumer_Thread(KafkaProperties.visit);                visitlog.start();                Consumer_Thread orderlog = new Consumer_Thread(KafkaProperties.order);                orderlog.start();        }}

在这里,我们依据不同的原始字段,将不同的数据保存到不同的文件之中。

package com.mixbox.kafka.consumer;import java.io.UnsupportedEncodingException;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.Properties;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import kafka.consumer.ConsumerConfig;import kafka.consumer.ConsumerIterator;import kafka.consumer.KafkaStream;import kafka.javaapi.consumer.ConsumerConnector;import kafka.message.MessageAndMetadata;/** * @author Yin Shuai */public class Consumer_Thread extends Thread {        // 在事实上我们会依据传递的topic名称,来生成不桐的记录机器        // private Logger _log_order = LoggerFactory.getLogger("order");        // private Logger _log_visit = LoggerFactory.getLogger("visit");        private Logger _log = null;        private final ConsumerConnector _consumer;        private final String _topic;        public Consumer_Thread(String topic) {                _consumer = kafka.consumer.Consumer                                .createJavaConsumerConnector(createConsumerConfig());                this._topic = topic;                _log = LoggerFactory.getLogger(_topic);                System.err.println("log的名称" + _topic);        }        private static ConsumerConfig createConsumerConfig() {                Properties props = new Properties();                props.put("zookeeper.connect", KafkaProperties.zkConnect);                // 在这里我们的组ID为logSave                props.put("group.id", KafkaProperties.logSave);                props.put("zookeeper.session.timeout.ms", "100000");                props.put("zookeeper.sync.time.ms", "200");                props.put("auto.commit.interval.ms", "1000");                return new ConsumerConfig(props);        }        public void run() {                Map topicCountMap = new HashMap();                topicCountMap.put(_topic, new Integer(1));                Map>> consumerMap = _consumer                                .createMessageStreams(topicCountMap);                for (KafkaStream kafkaStream : consumerMap.get(_topic)) {                        ConsumerIterator iterator = kafkaStream.iterator();                        while (iterator.hasNext()) {                                MessageAndMetadata next = iterator.next();                                try {                                        // 在这里我们分拆了一个Consumer 来处理visit日志                                        logFile(next);                                        System.out.println("message:"                                                        + new String(next.message(), "utf-8"));                                } catch (UnsupportedEncodingException e) {                                        e.printStackTrace();                                }                        }                }        }        private void logFile(MessageAndMetadata next)                        throws UnsupportedEncodingException {                _log.info(new String(next.message(), "utf-8"));        }}

一个简单的小tips:

logback.xml ,提醒您注意,这里的配置文件太过粗浅。如有需要,请自行填充。

                                                                                                                                                f:/opt/log/test.%d{yyyy-MM-dd}.log                                                                                                                                %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level                                %logger{36}-%msg%n                                                                                e:/logs/error/error.log                                                                                        ERROR                                                ACCEPT                        DENY                                                                        e:/logs/yuanshi-%d{yyyy-MM-dd}.log                        10                                                                        %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level                                %logger{36}-%msg%n                                                        E:\logs\file\file.log                                        INFO                        ACCEPT                        DENY                                                                        e:/logs/venality-%d{yyyy-MM-dd}.log                                                10                                                                        %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level                                %logger{36}-%msg%n                                                                        E:\logs\visitlog\visit.log                                                        %msg%n                                                        INFO                                                        E:\logs\visit.log.%d{yyyy-MM-dd}                                                                                                                                E:\logs\orderlog\order.log                                                        %msg%n                                                                                INFO                                                        E:\logs\order.log.%d{yyyy-MM-dd}                                                                                                                

关于kafka-Storm中如何将日志文件打印到local就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。

0