千家信息网

Storm如何和Kafka进行整合

发表于:2025-02-04 作者:千家信息网编辑
千家信息网最后更新 2025年02月04日,这篇文章将为大家详细讲解有关Storm如何和Kafka进行整合,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。对于Storm 如何和Kafka进行整合p
千家信息网最后更新 2025年02月04日Storm如何和Kafka进行整合

这篇文章将为大家详细讲解有关Storm如何和Kafka进行整合,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。

对于Storm 如何和Kafka进行整合

package com.mixbox.storm.kafka;import backtype.storm.Config;import backtype.storm.metric.api.IMetric;import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichSpout;import kafka.message.Message;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import com.mixbox.storm.kafka.PartitionManager.KafkaMessageId;import java.util.*;/** * @author Yin Shuai */public class KafkaSpout extends BaseRichSpout {        public static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class);        /**         * 内部类,Message和Offset的偏移量对象         *          * @author Yin Shuai         */        public static class MessageAndRealOffset {                public Message msg;                public long offset;                public MessageAndRealOffset(Message msg, long offset) {                        this.msg = msg;                        this.offset = offset;                }        }        /**         * 发射的枚举类         * @author Yin Shuai         */        static enum EmitState {                EMITTED_MORE_LEFT, EMITTED_END, NO_EMITTED        }        String _uuid = UUID.randomUUID().toString();                SpoutConfig _spoutConfig;                SpoutOutputCollector _collector;        // 分区的协调器,getMyManagedPartitions 拿到我所管理的分区        PartitionCoordinator _coordinator;        // 动态的分区链接:保存到kafka各个节点的连接,以及负责的topic的partition号码        DynamicPartitionConnections _connections;        // 提供了从zookeeper读写kafka 消费者信息的功能        ZkState _state;        // 上次更新的毫秒数        long _lastUpdateMs = 0;        // 当前的分区        int _currPartitionIndex = 0;        public KafkaSpout(SpoutConfig spoutConf) {                _spoutConfig = spoutConf;        }        @SuppressWarnings("unchecked")        @Override        public void open(Map conf, final TopologyContext context,                        final SpoutOutputCollector collector) {                _collector = collector;                List zkServers = _spoutConfig.zkServers;                // 初始化的时候如果zkServers 为空,那么初始化 默认的配置Zookeeper                if (zkServers == null) {                        zkServers = new ArrayList() {                                {                                        add("192.168.50.144");                                        add("192.168.50.169");                                        add("192.168.50.207");                                }                        };                        // zkServers =                        // (List)conf.get(Config.STORM_ZOOKEEPER_SERVERS);                        System.out.println(" 使用的是Storm默认配置的Zookeeper List : " + zkServers);                }                Integer zkPort = _spoutConfig.zkPort;                // 在这里我们也同时 来检查zookeeper的端口是否为空                if (zkPort == null) {                        zkPort = 2181;                        // zkPort = ((Number)                        // conf.get(Config.STORM_ZOOKEEPER_PORT)).intValue();                }                Map stateConf = new HashMap(conf);                stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, zkServers);                stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, zkPort);                stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT, _spoutConfig.zkRoot);                // 通过保存的配置文件,我们持有了一个zookeeper的state,支持节点内容的创建和删除                _state = new ZkState(stateConf);                // 对于连接的维护                _connections = new DynamicPartitionConnections(_spoutConfig,                                KafkaUtils.makeBrokerReader(conf, _spoutConfig));                // using TransactionalState like this is a hack                // 拿到总共的任务次数                int totalTasks = context                                .getComponentTasks(context.getThisComponentId()).size();                // 判断当前的主机是否是静态的statichost                if (_spoutConfig.hosts instanceof StaticHosts) {                        _coordinator = new StaticCoordinator(_connections, conf,                                        _spoutConfig, _state, context.getThisTaskIndex(),                                        totalTasks, _uuid);                        // 当你拿到的spoutConfig是zkhost的时候                } else {                        _coordinator = new ZkCoordinator(_connections, conf, _spoutConfig,                                        _state, context.getThisTaskIndex(), totalTasks, _uuid);                }                context.registerMetric("kafkaOffset", new IMetric() {                        KafkaUtils.KafkaOffsetMetric _kafkaOffsetMetric = new KafkaUtils.KafkaOffsetMetric(                                        _spoutConfig.topic, _connections);                        @Override                        public Object getValueAndReset() {                                List pms = _coordinator                                                .getMyManagedPartitions();                                Set latestPartitions = new HashSet();                                for (PartitionManager pm : pms) {                                        latestPartitions.add(pm.getPartition());                                }                                _kafkaOffsetMetric.refreshPartitions(latestPartitions);                                for (PartitionManager pm : pms) {                                        _kafkaOffsetMetric.setLatestEmittedOffset(                                                        pm.getPartition(), pm.lastCompletedOffset());                                }                                return _kafkaOffsetMetric.getValueAndReset();                        }                }, _spoutConfig.metricsTimeBucketSizeInSecs);                context.registerMetric("kafkaPartition", new IMetric() {                        @Override                        public Object getValueAndReset() {                                List pms = _coordinator                                                .getMyManagedPartitions();                                Map concatMetricsDataMaps = new HashMap();                                for (PartitionManager pm : pms) {                                        concatMetricsDataMaps.putAll(pm.getMetricsDataMap());                                }                                return concatMetricsDataMaps;                        }                }, _spoutConfig.metricsTimeBucketSizeInSecs);        }        @Override        public void close() {                _state.close();        }        @Override        public void nextTuple() {                // Storm-spout 是从kafka 消费数据,把 kafka 的 consumer                // 当成是一个spout,并且向其他的bolt的发送数据                // 拿到当前我管理的这些PartitionsManager                List managers = _coordinator.getMyManagedPartitions();                for (int i = 0; i < managers.size(); i++) {                        // 对于每一个分区的 PartitionManager                        // in case the number of managers decreased                        // 当前的分区                        _currPartitionIndex = _currPartitionIndex % managers.size();                        // 拿到当前的分区,并且发送,这里把SpoutOutputCollector传递进去了,由他发射元祖                        EmitState state = managers.get(_currPartitionIndex)                                        .next(_collector);                        // 如果发送状态为:发送-还有剩余                        if (state != EmitState.EMITTED_MORE_LEFT) {                                _currPartitionIndex = (_currPartitionIndex + 1)                                                % managers.size();                        }                        // 如果发送的状态为: 发送-没有剩余                        if (state != EmitState.NO_EMITTED) {                                break;                        }                }                long now = System.currentTimeMillis();                if ((now - _lastUpdateMs) > _spoutConfig.stateUpdateIntervalMs) {                        commit();                }        }        @Override        public void ack(Object msgId) {                KafkaMessageId id = (KafkaMessageId) msgId;                PartitionManager m = _coordinator.getManager(id.partition);                if (m != null) {                        m.ack(id.offset);                }        }        @Override        public void fail(Object msgId) {                KafkaMessageId id = (KafkaMessageId) msgId;                PartitionManager m = _coordinator.getManager(id.partition);                if (m != null) {                        m.fail(id.offset);                }        }        @Override        public void deactivate() {                // 停止工作                commit();        }        @Override        public void declareOutputFields(OutputFieldsDeclarer declarer) {                System.out.println(_spoutConfig.scheme.getOutputFields());                declarer.declare(_spoutConfig.scheme.getOutputFields());        }        private void commit() {                _lastUpdateMs = System.currentTimeMillis();                for (PartitionManager manager : _coordinator.getMyManagedPartitions()) {                        manager.commit();                }        }}

在粗浅的代码阅读之后,在这里进行详细的分析:

1 KafkaSpout之中持有了一个 MessageAndRealOffset 的内部类

public static class MessageAndRealOffset{    public Message msg;        public long offset;        public MessageAndRealOffset(Message msg,long offset)    {        this.msg = msg;        this.offset = offset;    }}

2 在Spout之中我们还持有了一个PartitionCoordinator的分区协调器,默认的情况我们实例化的对象

是ZKCoordinator


关于Storm如何和Kafka进行整合就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。

0