Storm-kafka中如何封装DynamicBrokerReader类
发表于:2025-02-05 作者:千家信息网编辑
千家信息网最后更新 2025年02月05日,这篇文章主要介绍Storm-kafka中如何封装DynamicBrokerReader类,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!在细节上把握 DynamicBrokerR
千家信息网最后更新 2025年02月05日Storm-kafka中如何封装DynamicBrokerReader类
这篇文章主要介绍Storm-kafka中如何封装DynamicBrokerReader类,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!
在细节上把握 DynamicBrokerReder的封装类 - ZkBrokerReader
package com.mixbox.storm.kafka.trident;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import com.mixbox.storm.kafka.DynamicBrokersReader;import com.mixbox.storm.kafka.ZkHosts;import java.util.Map;/** * 2014/07/22 * 在ZK中间拿到 GlobalPartitionInformation * * ZkBrokerReader 是对于DynamicBrokersReader的一个简单的封装 * @author Yin Shuai */public class ZkBrokerReader implements IBrokerReader { public static final Logger LOG = LoggerFactory .getLogger(ZkBrokerReader.class); GlobalPartitionInformation cachedBrokers; DynamicBrokersReader reader; long lastRefreshTimeMs; long refreshMillis; /** * * @param conf * @param topic * 指定topic的zkBrokerReader * @param hosts */ public ZkBrokerReader(Map conf, String topic, ZkHosts hosts) { reader = new DynamicBrokersReader(conf, hosts.brokerZkStr, hosts.brokerZkPath, topic); cachedBrokers = reader.getBrokerInfo(); lastRefreshTimeMs = System.currentTimeMillis(); refreshMillis = hosts.refreshFreqSecs * 1000L; } @Override public GlobalPartitionInformation getCurrentBrokers() { long currTime = System.currentTimeMillis(); // 很简单, 指定了你多长时间开始去刷新Brokerlibiao if (currTime > lastRefreshTimeMs + refreshMillis) { LOG.info("brokers need refreshing because " + refreshMillis + "ms have expired"); cachedBrokers = reader.getBrokerInfo(); lastRefreshTimeMs = currTime; } return cachedBrokers; } @Override public void close() { reader.close(); }}
总览我们的Code :
ZkBrokerReader 是对于 DynamicBrokersReader的一个简单封装,ZkBrokerReader之中持有2个主要的Class
1 GlobalPartitionInformatio cachedBroker;
2 DynamicBrokersReader reader;
3 long lastRefreshTimeMs; 最新的刷新时间
lastRefreshTimeMs = System.currentTimeMillis(); 最新的刷新时间为系统的当前时间
4 long refreshMillis
refreshMillis = host.refreshFreqSecs * 1000L 设定刷新的毫秒数为
5
public GlobalPartitionInformation getCurrentBrokers() { long currTime = System.currentTimeMillis(); // 很简单, 指定了你多长时间开始去刷新Brokerlibiao if (currTime > lastRefreshTimeMs + refreshMillis) { LOG.info("brokers need refreshing because " + refreshMillis + "ms have expired"); cachedBrokers = reader.getBrokerInfo(); lastRefreshTimeMs = currTime; } return cachedBrokers; }
每一次调用getCurrentBrokers,首先会取System.currentTimeMillis 当当前的系统时间超过了 最早的刷新时间+刷新
的间隔,就会再次的去跟新:
cachedBrokers = reader.getBrokerInfo(); getBrokerInfo()方法每调用一次,也就重新在zk之中重新去取
一次。
ZkBrokerReader是对于DynamicBrokerReader的一个封装,DynamicBrokerReader的Dynamic性质并不程序动态的因数,而只是简单在读取ZK数据的过程之中,Zk数据已经动态的发生变化?
以上是"Storm-kafka中如何封装DynamicBrokerReader类"这篇文章的所有内容,感谢各位的阅读!希望分享的内容对大家有帮助,更多相关知识,欢迎关注行业资讯频道!
时间
封装
之中
内容
动态
数据
篇文章
系统
价值
兴趣
再次
只是
因数
小伙
小伙伴
性质
方法
更多
知识
程序
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
长沙游戏软件开发学校
国资网络安全公司
江苏机电软件开发技术规范
互联网智能星期七科技美容
网络安全法应履行不包括
杨浦区品质软件开发代理品牌
堕落玩偶说连接到服务器
怎样学软件开发技术
数据库读取数据用到哪些方法
阿里云服务器有哪些参数
西门子触摸屏怎样访问服务器
怎么退出软件开发管理
联想无线 管理 服务器
深圳有哪些软件开发培训学校
数据库技术 北外 作业
楼宇信息传输网络技术
南京优信网络安全技术研究院
广州智能边缘计算服务器
应用软件开发区海边
国外网络安全公益视频
校园防网络安全诈骗
图像质量评价数据库
软件开发市场的环境分析
数据库目前有什么优势
网络安全案例素材
大学用的app软件开发
实施破坏网络安全的行为
安阳软件开发规定
服务器不定时重启跟硬件关系大吗
数据库的delete