千家信息网

Storm-kafka中如何实现一个对于kafkaBroker动态读取的Class

发表于:2025-02-06 作者:千家信息网编辑
千家信息网最后更新 2025年02月06日,小编给大家分享一下Storm-kafka中如何实现一个对于kafkaBroker动态读取的Class,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面
千家信息网最后更新 2025年02月06日Storm-kafka中如何实现一个对于kafkaBroker动态读取的Class

小编给大家分享一下Storm-kafka中如何实现一个对于kafkaBroker动态读取的Class,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!

实现一个对于kafkaBroker 动态读取的Class - DynamicBrokersReader

DynamicBrokersReader

package com.mixbox.storm.kafka;import backtype.storm.Config;import backtype.storm.utils.Utils;import com.mixbox.storm.kafka.trident.GlobalPartitionInformation;import com.netflix.curator.framework.CuratorFramework;import com.netflix.curator.framework.CuratorFrameworkFactory;import com.netflix.curator.retry.RetryNTimes;import org.json.simple.JSONValue;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.io.IOException;import java.io.UnsupportedEncodingException;import java.util.List;import java.util.Map;/** * 动态的Broker读 我们维护了有一个与zk之间的连接,提供了获取指定topic的每一个partition正在活动着的leader所对应的broker * 这样你有能力知道,当前的这些topic,哪一些broker是活动的 * @author Yin Shuai */public class DynamicBrokersReader {        public static final Logger LOG = LoggerFactory                        .getLogger(DynamicBrokersReader.class);        // 对于Client CuratorFrameWork的封装        private CuratorFramework _curator;        // 在Zk上注册的位置        private String _zkPath;        // 指定的_topic        private String _topic;        public DynamicBrokersReader(Map conf, String zkStr, String zkPath,                        String topic) {                _zkPath = zkPath;                _topic = topic;                try {                        _curator = CuratorFrameworkFactory                                        .newClient(                                                        zkStr,                                                        Utils.getInt(conf                                                                        .get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT)),                                                        15000,                                                        new RetryNTimes(                                                                        Utils.getInt(conf                                                                                        .get(Config.STORM_ZOOKEEPER_RETRY_TIMES)),                                                                        Utils.getInt(conf                                                                                        .get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL))));                } catch (IOException e) {                        // TODO Auto-generated catch block                        e.printStackTrace();                }                _curator.start();        }        public DynamicBrokersReader(String zkPath) {                this._zkPath = zkPath;        }        /**         * 确定指定topic下,每一个partition的leader,所对应的 主机和端口, 并将它们存入到全部分区信息中         *          */        public GlobalPartitionInformation getBrokerInfo() {                GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation();                try {                        // 拿到当前的分区数目                        int numPartitionsForTopic = getNumPartitions();                        /**                         * /brokers/ids                         */                        String brokerInfoPath = brokerPath();                        // 默认的我们的分区数目就只有 0, 1 两个                        for (int partition = 0; partition < numPartitionsForTopic; partition++) {                                // 这里请主要参考分区和领导者的关系                                int leader = getLeaderFor(partition);                                // 拿到领导者以后的zookeeper路径                                String leaderPath = brokerInfoPath + "/" + leader;                                try {                                        byte[] brokerData = _curator.getData().forPath(leaderPath);                                        /**                                         * 在这里, 我们拿到的brokerData为:                                         * {"jmx_port":-1,"timestamp":"1403076810435"                                         * ,"host":"192.168.50.207","version":1,"port":9092} 注意                                         * 这里是字节数组开始转json                                         */                                        Broker hp = getBrokerHost(brokerData);                                        /**                                         * 记录好 每一个分区 partition 所对应的 Broker                                         */                                        globalPartitionInformation.addPartition(partition, hp);                                } catch (org.apache.zookeeper.KeeperException.NoNodeException e) {                                        LOG.error("Node {} does not exist ", leaderPath);                                }                        }                } catch (Exception e) {                        throw new RuntimeException(e);                }                LOG.info("Read partition info from zookeeper: "                                + globalPartitionInformation);                return globalPartitionInformation;        }        /**         * @return 拿到指定topic下的分区数目         */        private int getNumPartitions() {                try {                        String topicBrokersPath = partitionPath();                        List children = _curator.getChildren().forPath(                                        topicBrokersPath);                        return children.size();                } catch (Exception e) {                        throw new RuntimeException(e);                }        }        /**         * @return 拿到的topic在zookeeper注册的分区地址         *         brokers/topics/storm-sentence/partitions         */        public String partitionPath() {                return _zkPath + "/topics/" + _topic + "/partitions";        }        /**         *  持有的是Broker节点的id号码,这个id号是在配置的过程中为每一个Broker分配的         * @return   /brokers/ids         */        public String brokerPath() {                return _zkPath + "/ids";        }        /**         * get /brokers/topics/distributedTopic/partitions/1/state {         * "controller_epoch":4, "isr":[ 1, 0 ], "leader":1, "leader_epoch":1,         * "version":1 }         *          * 说明一下,在kafka之中,每一个分区都会有一个Leader,有0个或者多个的followers, 一个leader会处理这个分区的所有请求         * @param partition         * @return         */        private int getLeaderFor(long partition) {                try {                        String topicBrokersPath = partitionPath();                        byte[] hostPortData = _curator.getData().forPath(                                        topicBrokersPath + "/" + partition + "/state");                        @SuppressWarnings("unchecked")                        Map value = (Map) JSONValue                                        .parse(new String(hostPortData, "UTF-8"));                        Integer leader = ((Number) value.get("leader")).intValue();                        return leader;                } catch (Exception e) {                        throw new RuntimeException(e);                }        }        public void close() {                _curator.close();        }        /**         * [zk: localhost:2181(CONNECTED) 56] get /brokers/ids/0 {         * "host":"localhost", "jmx_port":9999, "port":9092, "version":1 }         *          *          * @param contents         * @return         */        private Broker getBrokerHost(byte[] contents) {                try {                        @SuppressWarnings("unchecked")                        Map value = (Map) JSONValue                                        .parse(new String(contents, "UTF-8"));                        String host = (String) value.get("host");                        Integer port = ((Long) value.get("port")).intValue();                        return new Broker(host, port);                } catch (UnsupportedEncodingException e) {                        throw new RuntimeException(e);                }        }}

对于以上代码须知:

1 : 我们持有了一个ZkPath , 在Storm-kafka的class之中我们默认的是/brokers

2 : _topic , 目前我们是针对的是Topic, 也就是说我们的partition,leader都是针对于单个Topic的

3:

1 int numPartitionsForTopic = getNumPartitions();

针对与一个Topic,首先我们要取当前的分区数,一般的情况,我们在kafka之中默认的分区数为2

2 String brokerInfoPath = brokerPath();

拿到 /brokers/ids 的分区号

3:   for (int partition = 0; partition < numPartitionsForTopic; partition++) {

依次的遍历每一个分区

4:int leader = getLeaderFor(partition);String leaderPath = brokerInfoPath + "/" + leader;byte[] brokerData = _curator.getData().forPath(leaderPath);

再通过分区拿到领导者,以及领导者的路径,最后拿到领导者的数据:

我们举一个小例子

* 在这里, 我们拿到的brokerData为:

* {"jmx_port":-1,"timestamp":"1403076810435"

* ,"host":"192.168.50.207","version":1,"port":9092}

4:Broker hp = getBrokerHost(brokerData);

拿到某一个Topic自己的分区在kafka所对应的Broker,并且其封装到 globalPartitionInformation

5 globalPartitionInformation.addPartition(partition, hp);

GlobalPartitionInformaton底层维护了一个HashMap

简单的来说:DynamicBrokersReader 针对某一个Topic维护了 每一个分区 partition 所对应的 Broker

以上是"Storm-kafka中如何实现一个对于kafkaBroker动态读取的Class"这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注行业资讯频道!

0