Storm-kafka中如何理解ZkCoordinator的过程
发表于:2024-11-26 作者:千家信息网编辑
千家信息网最后更新 2024年11月26日,Storm-kafka中如何理解ZkCoordinator的过程,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。梳理ZkCoordin
千家信息网最后更新 2024年11月26日Storm-kafka中如何理解ZkCoordinator的过程
Storm-kafka中如何理解ZkCoordinator的过程,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。
梳理ZkCoordinator的过程
package com.mixbox.storm.kafka;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import com.mixbox.storm.kafka.trident.GlobalPartitionInformation;import java.util.*;import static com.mixbox.storm.kafka.KafkaUtils.taskId;/** * * * ZKCoordinator 协调器 * * @author Yin Shuai */public class ZkCoordinator implements PartitionCoordinator { public static final Logger LOG = LoggerFactory .getLogger(ZkCoordinator.class); SpoutConfig _spoutConfig; int _taskIndex; int _totalTasks; String _topologyInstanceId; // 每一个分区对应着一个分区管理器 Map_managers = new HashMap(); //缓存的List List _cachedList; //上次刷新的时间 Long _lastRefreshTime = null; //刷新频率 毫秒 int _refreshFreqMs; //动态分区连接 DynamicPartitionConnections _connections; //动态BrokersReader DynamicBrokersReader _reader; ZkState _state; Map _stormConf; /** * * @param connections * 动态的 分区连接 * @param stormConf * Storm的配置文件 * @param spoutConfig * Storm sput的配置文件 * @param state * 对于ZKState的连接 * @param taskIndex * 任务 * @param totalTasks * 总共的任务 * @param topologyInstanceId * 拓扑的实例ID */ public ZkCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig spoutConfig, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId) { this(connections, stormConf, spoutConfig, state, taskIndex, totalTasks, topologyInstanceId, buildReader(stormConf, spoutConfig)); } public ZkCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig spoutConfig, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId, DynamicBrokersReader reader) { _spoutConfig = spoutConfig; _connections = connections; _taskIndex = taskIndex; _totalTasks = totalTasks; _topologyInstanceId = topologyInstanceId; _stormConf = stormConf; _state = state; ZkHosts brokerConf = (ZkHosts) spoutConfig.hosts; _refreshFreqMs = brokerConf.refreshFreqSecs * 1000; _reader = reader; } /** * @param stormConf * @param spoutConfig * @return */ private static DynamicBrokersReader buildReader(Map stormConf, SpoutConfig spoutConfig) { ZkHosts hosts = (ZkHosts) spoutConfig.hosts; return new DynamicBrokersReader(stormConf, hosts.brokerZkStr, hosts.brokerZkPath, spoutConfig.topic); } @Override public List getMyManagedPartitions() { if (_lastRefreshTime == null || (System.currentTimeMillis() - _lastRefreshTime) > _refreshFreqMs) { refresh(); _lastRefreshTime = System.currentTimeMillis(); } return _cachedList; } /** * 简单的刷新的行为 * */ void refresh() { try { LOG.info(taskId(_taskIndex, _totalTasks) + "Refreshing partition manager connections"); // 拿到所有的分区信息 GlobalPartitionInformation brokerInfo = _reader.getBrokerInfo(); // 拿到自己任务的所有分区 List mine = KafkaUtils.calculatePartitionsForTask( brokerInfo, _totalTasks, _taskIndex); // 拿到当前任务的分区 Set curr = _managers.keySet(); // 构造一个集合 Set newPartitions = new HashSet (mine); // 在new分区中,移除掉所有 自己拥有的分区 newPartitions.removeAll(curr); // 要删除的分区 Set deletedPartitions = new HashSet (curr); // deletedPartitions.removeAll(mine); LOG.info(taskId(_taskIndex, _totalTasks) + "Deleted partition managers: " + deletedPartitions.toString()); for (Partition id : deletedPartitions) { PartitionManager man = _managers.remove(id); man.close(); } LOG.info(taskId(_taskIndex, _totalTasks) + "New partition managers: " + newPartitions.toString()); for (Partition id : newPartitions) { PartitionManager man = new PartitionManager(_connections, _topologyInstanceId, _state, _stormConf, _spoutConfig, id); _managers.put(id, man); } } catch (Exception e) { throw new RuntimeException(e); } _cachedList = new ArrayList (_managers.values()); LOG.info(taskId(_taskIndex, _totalTasks) + "Finished refreshing"); } @Override public PartitionManager getManager(Partition partition) { return _managers.get(partition); }}
1 : 首先 ZKCoorDinator 实现 PartitionCoordinator的接口
package com.mixbox.storm.kafka;import java.util.List;/** * @author Yin Shuai */public interface PartitionCoordinator { /** * 拿到我管理的分区列表 List{PartitionManager} * @return */ ListgetMyManagedPartitions(); /** * @param 依据制定的分区partition,去getManager * @return */ PartitionManager getManager(Partition partition);}
第一个方法拿到所有的 PartitionManager
第二个方法依据特定的 Partition去得到一个分区管理器
关于 Storm-kafka中如何理解ZkCoordinator的过程问题的解答就分享到这里了,希望以上内容可以对大家有一定的帮助,如果你还有很多疑惑没有解开,可以关注行业资讯频道了解更多相关知识。
任务
过程
动态
方法
问题
管理
文件
更多
帮助
解答
配置
易行
简单易行
信息
内容
实例
小伙
小伙伴
拓扑
接口
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
护苗网络安全短片读后感
服务器机柜的来历
深入开展网络安全自查
软件开发工具实验
orcal数据库用户名怎么查
报价合同发货自制数据库
北京网络安全监察部门
网络安全启明星晨股价
医学影像与计算机网络技术
网络安全和信息化协调处处长
常用服务器管理口地址密码
销售软件开发订制
时序数据库iotdb主要特性
域控服务器管理员密码锁定
新罗县网络安全
mysql数据库数据类型
各大厂商服务器保存位置
网吧游戏在云服务器
废弃电脑怎么做共享服务器
linq过滤重复数据库
绿信科技互联网
晶体学数据库
高级网络技术是什么
外企搭建中文服务器
昆明信息安全特训营网络安全培训
浙江直销软件开发公司哪家好
严格落实信息网络安全
什么时候才有网络小说服务器
网络安全对小学生
网络技术职业分析