千家信息网

Storm-kafka中如何封装DynamicBrokerReader类

发表于:2025-02-05 作者:千家信息网编辑
千家信息网最后更新 2025年02月05日,这篇文章主要介绍Storm-kafka中如何封装DynamicBrokerReader类,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!在细节上把握 DynamicBrokerR
千家信息网最后更新 2025年02月05日Storm-kafka中如何封装DynamicBrokerReader类

这篇文章主要介绍Storm-kafka中如何封装DynamicBrokerReader类,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!

在细节上把握 DynamicBrokerReder的封装类 - ZkBrokerReader

package com.mixbox.storm.kafka.trident;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import com.mixbox.storm.kafka.DynamicBrokersReader;import com.mixbox.storm.kafka.ZkHosts;import java.util.Map;/** * 2014/07/22 * 在ZK中间拿到 GlobalPartitionInformation *  * ZkBrokerReader 是对于DynamicBrokersReader的一个简单的封装 * @author Yin Shuai */public class ZkBrokerReader implements IBrokerReader {        public static final Logger LOG = LoggerFactory                        .getLogger(ZkBrokerReader.class);        GlobalPartitionInformation cachedBrokers;                        DynamicBrokersReader reader;                        long lastRefreshTimeMs;                long refreshMillis;        /**         *          * @param conf         * @param topic         *            指定topic的zkBrokerReader         * @param hosts         */        public ZkBrokerReader(Map conf, String topic, ZkHosts hosts) {                reader = new DynamicBrokersReader(conf, hosts.brokerZkStr,                                hosts.brokerZkPath, topic);                cachedBrokers = reader.getBrokerInfo();                lastRefreshTimeMs = System.currentTimeMillis();                refreshMillis = hosts.refreshFreqSecs * 1000L;        }        @Override        public GlobalPartitionInformation getCurrentBrokers() {                long currTime = System.currentTimeMillis();                // 很简单, 指定了你多长时间开始去刷新Brokerlibiao                if (currTime > lastRefreshTimeMs + refreshMillis) {                        LOG.info("brokers need refreshing because " + refreshMillis                                        + "ms have expired");                        cachedBrokers = reader.getBrokerInfo();                        lastRefreshTimeMs = currTime;                }                return cachedBrokers;        }        @Override        public void close() {                reader.close();        }}

总览我们的Code :

ZkBrokerReader 是对于 DynamicBrokersReader的一个简单封装,ZkBrokerReader之中持有2个主要的Class


1 GlobalPartitionInformatio cachedBroker;

2 DynamicBrokersReader reader;

3 long lastRefreshTimeMs; 最新的刷新时间

lastRefreshTimeMs = System.currentTimeMillis();    最新的刷新时间为系统的当前时间

4 long refreshMillis

refreshMillis = host.refreshFreqSecs * 1000L  设定刷新的毫秒数为

5

public GlobalPartitionInformation getCurrentBrokers() {                long currTime = System.currentTimeMillis();                // 很简单, 指定了你多长时间开始去刷新Brokerlibiao                if (currTime > lastRefreshTimeMs + refreshMillis) {                        LOG.info("brokers need refreshing because " + refreshMillis                                        + "ms have expired");                        cachedBrokers = reader.getBrokerInfo();                        lastRefreshTimeMs = currTime;                }                return cachedBrokers;        }

每一次调用getCurrentBrokers,首先会取System.currentTimeMillis 当当前的系统时间超过了 最早的刷新时间+刷新

的间隔,就会再次的去跟新:

cachedBrokers = reader.getBrokerInfo(); getBrokerInfo()方法每调用一次,也就重新在zk之中重新去取

一次。

ZkBrokerReader是对于DynamicBrokerReader的一个封装,DynamicBrokerReader的Dynamic性质并不程序动态的因数,而只是简单在读取ZK数据的过程之中,Zk数据已经动态的发生变化?

以上是"Storm-kafka中如何封装DynamicBrokerReader类"这篇文章的所有内容,感谢各位的阅读!希望分享的内容对大家有帮助,更多相关知识,欢迎关注行业资讯频道!

0