Storm-kafka中如何理解ZkCoordinator的过程
发表于:2025-02-10 作者:千家信息网编辑
千家信息网最后更新 2025年02月10日,Storm-kafka中如何理解ZkCoordinator的过程,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。梳理ZkCoordin
千家信息网最后更新 2025年02月10日Storm-kafka中如何理解ZkCoordinator的过程
Storm-kafka中如何理解ZkCoordinator的过程,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。
梳理ZkCoordinator的过程
package com.mixbox.storm.kafka;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import com.mixbox.storm.kafka.trident.GlobalPartitionInformation;import java.util.*;import static com.mixbox.storm.kafka.KafkaUtils.taskId;/** * * * ZKCoordinator 协调器 * * @author Yin Shuai */public class ZkCoordinator implements PartitionCoordinator { public static final Logger LOG = LoggerFactory .getLogger(ZkCoordinator.class); SpoutConfig _spoutConfig; int _taskIndex; int _totalTasks; String _topologyInstanceId; // 每一个分区对应着一个分区管理器 Map_managers = new HashMap(); //缓存的List List _cachedList; //上次刷新的时间 Long _lastRefreshTime = null; //刷新频率 毫秒 int _refreshFreqMs; //动态分区连接 DynamicPartitionConnections _connections; //动态BrokersReader DynamicBrokersReader _reader; ZkState _state; Map _stormConf; /** * * @param connections * 动态的 分区连接 * @param stormConf * Storm的配置文件 * @param spoutConfig * Storm sput的配置文件 * @param state * 对于ZKState的连接 * @param taskIndex * 任务 * @param totalTasks * 总共的任务 * @param topologyInstanceId * 拓扑的实例ID */ public ZkCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig spoutConfig, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId) { this(connections, stormConf, spoutConfig, state, taskIndex, totalTasks, topologyInstanceId, buildReader(stormConf, spoutConfig)); } public ZkCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig spoutConfig, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId, DynamicBrokersReader reader) { _spoutConfig = spoutConfig; _connections = connections; _taskIndex = taskIndex; _totalTasks = totalTasks; _topologyInstanceId = topologyInstanceId; _stormConf = stormConf; _state = state; ZkHosts brokerConf = (ZkHosts) spoutConfig.hosts; _refreshFreqMs = brokerConf.refreshFreqSecs * 1000; _reader = reader; } /** * @param stormConf * @param spoutConfig * @return */ private static DynamicBrokersReader buildReader(Map stormConf, SpoutConfig spoutConfig) { ZkHosts hosts = (ZkHosts) spoutConfig.hosts; return new DynamicBrokersReader(stormConf, hosts.brokerZkStr, hosts.brokerZkPath, spoutConfig.topic); } @Override public List getMyManagedPartitions() { if (_lastRefreshTime == null || (System.currentTimeMillis() - _lastRefreshTime) > _refreshFreqMs) { refresh(); _lastRefreshTime = System.currentTimeMillis(); } return _cachedList; } /** * 简单的刷新的行为 * */ void refresh() { try { LOG.info(taskId(_taskIndex, _totalTasks) + "Refreshing partition manager connections"); // 拿到所有的分区信息 GlobalPartitionInformation brokerInfo = _reader.getBrokerInfo(); // 拿到自己任务的所有分区 List mine = KafkaUtils.calculatePartitionsForTask( brokerInfo, _totalTasks, _taskIndex); // 拿到当前任务的分区 Set curr = _managers.keySet(); // 构造一个集合 Set newPartitions = new HashSet (mine); // 在new分区中,移除掉所有 自己拥有的分区 newPartitions.removeAll(curr); // 要删除的分区 Set deletedPartitions = new HashSet (curr); // deletedPartitions.removeAll(mine); LOG.info(taskId(_taskIndex, _totalTasks) + "Deleted partition managers: " + deletedPartitions.toString()); for (Partition id : deletedPartitions) { PartitionManager man = _managers.remove(id); man.close(); } LOG.info(taskId(_taskIndex, _totalTasks) + "New partition managers: " + newPartitions.toString()); for (Partition id : newPartitions) { PartitionManager man = new PartitionManager(_connections, _topologyInstanceId, _state, _stormConf, _spoutConfig, id); _managers.put(id, man); } } catch (Exception e) { throw new RuntimeException(e); } _cachedList = new ArrayList (_managers.values()); LOG.info(taskId(_taskIndex, _totalTasks) + "Finished refreshing"); } @Override public PartitionManager getManager(Partition partition) { return _managers.get(partition); }}
1 : 首先 ZKCoorDinator 实现 PartitionCoordinator的接口
package com.mixbox.storm.kafka;import java.util.List;/** * @author Yin Shuai */public interface PartitionCoordinator { /** * 拿到我管理的分区列表 List{PartitionManager} * @return */ ListgetMyManagedPartitions(); /** * @param 依据制定的分区partition,去getManager * @return */ PartitionManager getManager(Partition partition);}
第一个方法拿到所有的 PartitionManager
第二个方法依据特定的 Partition去得到一个分区管理器
关于 Storm-kafka中如何理解ZkCoordinator的过程问题的解答就分享到这里了,希望以上内容可以对大家有一定的帮助,如果你还有很多疑惑没有解开,可以关注行业资讯频道了解更多相关知识。
任务
过程
动态
方法
问题
管理
文件
更多
帮助
解答
配置
易行
简单易行
信息
内容
实例
小伙
小伙伴
拓扑
接口
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
数据库技术的发展心得
广东科信网络技术有限公司环评
数据库安装成功后如何设置密码
达梦数据库2148
数据库 赋值
鞋样设计软件开发
基因数据库数据抓取
关系数据库表中对数据的增删改查
电脑不开机拷数据库
网络安全靠大家部队
新时达服务器历史故障记录清除
三级网络技术题库版
计算机网络技术广西大专
ntp时间同步服务器厂家比较
怎么和云服务器沟通
商务信息技术网络安全
数据库删除表指令
网络技术课程实训心得与体会
腾讯云轻量服务器做2008系统
网安网络安全是什么
浦东新区网络技术咨询大概费用
数据存储服务器组建
新乡服务器机箱报价
网络安全手抄报知识语句
趣步链iwc系统软件开发
草图插件是用什么软件开发的
工业网络技术的就业前景
企业数据库维护技术
邮件地址和服务器名字符号
网络安全审查员要求