千家信息网

Storm-kafka中如何理解ZkCoordinator的过程

发表于:2024-11-26 作者:千家信息网编辑
千家信息网最后更新 2024年11月26日,Storm-kafka中如何理解ZkCoordinator的过程,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。梳理ZkCoordin
千家信息网最后更新 2024年11月26日Storm-kafka中如何理解ZkCoordinator的过程

Storm-kafka中如何理解ZkCoordinator的过程,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。

梳理ZkCoordinator的过程

package com.mixbox.storm.kafka;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import com.mixbox.storm.kafka.trident.GlobalPartitionInformation;import java.util.*;import static com.mixbox.storm.kafka.KafkaUtils.taskId;/** *  *  * ZKCoordinator 协调器 *  * @author Yin Shuai */public class ZkCoordinator implements PartitionCoordinator {        public static final Logger LOG = LoggerFactory                        .getLogger(ZkCoordinator.class);        SpoutConfig _spoutConfig;        int _taskIndex;        int _totalTasks;                String _topologyInstanceId;                // 每一个分区对应着一个分区管理器        Map _managers = new HashMap();                //缓存的List        List _cachedList;        //上次刷新的时间        Long _lastRefreshTime = null;                //刷新频率 毫秒        int _refreshFreqMs;        //动态分区连接        DynamicPartitionConnections _connections;                //动态BrokersReader        DynamicBrokersReader _reader;                        ZkState _state;                Map _stormConf;        /**         *          * @param connections         *            动态的 分区连接         * @param stormConf         *            Storm的配置文件         * @param spoutConfig         *            Storm sput的配置文件         * @param state         *            对于ZKState的连接         * @param taskIndex         *            任务         * @param totalTasks         *            总共的任务         * @param topologyInstanceId         *            拓扑的实例ID         */        public ZkCoordinator(DynamicPartitionConnections connections,                        Map stormConf, SpoutConfig spoutConfig, ZkState state,                        int taskIndex, int totalTasks, String topologyInstanceId) {                this(connections, stormConf, spoutConfig, state, taskIndex, totalTasks,                                topologyInstanceId, buildReader(stormConf, spoutConfig));        }        public ZkCoordinator(DynamicPartitionConnections connections,                        Map stormConf, SpoutConfig spoutConfig, ZkState state,                        int taskIndex, int totalTasks, String topologyInstanceId,                        DynamicBrokersReader reader) {                _spoutConfig = spoutConfig;                _connections = connections;                _taskIndex = taskIndex;                _totalTasks = totalTasks;                _topologyInstanceId = topologyInstanceId;                _stormConf = stormConf;                _state = state;                ZkHosts brokerConf = (ZkHosts) spoutConfig.hosts;                _refreshFreqMs = brokerConf.refreshFreqSecs * 1000;                _reader = reader;        }        /**         * @param stormConf         * @param spoutConfig         * @return         */        private static DynamicBrokersReader buildReader(Map stormConf,                        SpoutConfig spoutConfig) {                ZkHosts hosts = (ZkHosts) spoutConfig.hosts;                return new DynamicBrokersReader(stormConf, hosts.brokerZkStr,                                hosts.brokerZkPath, spoutConfig.topic);        }        @Override        public List getMyManagedPartitions() {                if (_lastRefreshTime == null                                || (System.currentTimeMillis() - _lastRefreshTime) > _refreshFreqMs) {                        refresh();                        _lastRefreshTime = System.currentTimeMillis();                }                return _cachedList;        }        /**         * 简单的刷新的行为         *          */        void refresh() {                try {                        LOG.info(taskId(_taskIndex, _totalTasks)                                        + "Refreshing partition manager connections");                        // 拿到所有的分区信息                        GlobalPartitionInformation brokerInfo = _reader.getBrokerInfo();                        // 拿到自己任务的所有分区                        List mine = KafkaUtils.calculatePartitionsForTask(                                        brokerInfo, _totalTasks, _taskIndex);                        // 拿到当前任务的分区                        Set curr = _managers.keySet();                        // 构造一个集合                        Set newPartitions = new HashSet(mine);                        // 在new分区中,移除掉所有 自己拥有的分区                        newPartitions.removeAll(curr);                        // 要删除的分区                        Set deletedPartitions = new HashSet(curr);                        //                        deletedPartitions.removeAll(mine);                        LOG.info(taskId(_taskIndex, _totalTasks)                                        + "Deleted partition managers: "                                        + deletedPartitions.toString());                        for (Partition id : deletedPartitions) {                                PartitionManager man = _managers.remove(id);                                man.close();                        }                        LOG.info(taskId(_taskIndex, _totalTasks)                                        + "New partition managers: " + newPartitions.toString());                        for (Partition id : newPartitions) {                                PartitionManager man = new PartitionManager(_connections,                                                _topologyInstanceId, _state, _stormConf, _spoutConfig,                                                id);                                _managers.put(id, man);                        }                } catch (Exception e) {                        throw new RuntimeException(e);                }                _cachedList = new ArrayList(_managers.values());                LOG.info(taskId(_taskIndex, _totalTasks) + "Finished refreshing");        }        @Override        public PartitionManager getManager(Partition partition) {                return _managers.get(partition);        }}

1 : 首先 ZKCoorDinator 实现 PartitionCoordinator的接口

package com.mixbox.storm.kafka;import java.util.List;/** * @author Yin Shuai */public interface PartitionCoordinator {                /**         * 拿到我管理的分区列表  List{PartitionManager}         * @return         */        List getMyManagedPartitions();                /**         * @param 依据制定的分区partition,去getManager         * @return         */        PartitionManager getManager(Partition partition);}

第一个方法拿到所有的 PartitionManager

第二个方法依据特定的 Partition去得到一个分区管理器

关于 Storm-kafka中如何理解ZkCoordinator的过程问题的解答就分享到这里了,希望以上内容可以对大家有一定的帮助,如果你还有很多疑惑没有解开,可以关注行业资讯频道了解更多相关知识。

0