千家信息网

Storm-kafka接口怎么实现

发表于:2025-02-07 作者:千家信息网编辑
千家信息网最后更新 2025年02月07日,本篇内容主要讲解"Storm-kafka接口怎么实现",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"Storm-kafka接口怎么实现"吧!阅读背景: 如有
千家信息网最后更新 2025年02月07日Storm-kafka接口怎么实现

本篇内容主要讲解"Storm-kafka接口怎么实现",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"Storm-kafka接口怎么实现"吧!

阅读背景: 如有需要,尽情参看本空间的另外一篇文档

阅读目的:了解Storm 如何来封装kafka接口,如何处理Connection连接的封装性问题

package com.mixbox.storm.kafka;import kafka.javaapi.consumer.SimpleConsumer;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import com.mixbox.storm.kafka.trident.IBrokerReader;import java.util.HashMap;import java.util.HashSet;import java.util.Map;import java.util.Set;/** * 2014/07/22 * 动态的【分区连接】 * @author Yin Shuai */public class DynamicPartitionConnections {        public static final Logger LOG = LoggerFactory                        .getLogger(DynamicPartitionConnections.class);        /**         * 持有了一个 kafka底层的SimpleConsumer对象         * 持有了  具体的分区         *          * @author Yin Shuai         */                        static class ConnectionInfo {                //内部维持了一个SimpleConsumer                SimpleConsumer consumer;                                //分区                Set partitions = new HashSet();                public ConnectionInfo(SimpleConsumer consumer) {                        this.consumer = consumer;                }        }        /**         * 也就是kafka的每一个节点都维持了一个COnnectionInfo,ConnectionInfo         */        Map _connections = new HashMap();        // kafkaConfig        KafkaConfig _config;        /**         * IBrokerReader 基本上 IbroerReader这里初始化的是ZkBrokerReader         */        IBrokerReader _reader;        /**         * @param config         *            kafka配置         * @param brokerReader         *            IBrokerReader-用于拿到当前的接口         */        public DynamicPartitionConnections(KafkaConfig config,                        IBrokerReader brokerReader) {                _config = config;                _reader = brokerReader;        }        /**         * @param partition  分区         * @return         */        public SimpleConsumer register(Partition partition) {                /**                 * 依据你所拥有的partition号,拿到你所对应的Broker                 * GlobalPartitionInformation中有Map                 * partitionMap,记录了分区号与Broker所对应的关系                 */                Broker broker = _reader.getCurrentBrokers().getBrokerFor(                                partition.partition);                return register(broker, partition.partition);        }        /**         * @param host         *            主机         * @param partition         *            分区         * @return 底层的SimpleConsumer 对象,这里存在一个注册的行为,将主机和端口【broker】,和分区【partition】 注册到 connections连接之中         */        public SimpleConsumer register(Broker host, int partition) {                // Map _connections = new HashMap();                //如果连接之中没有包含了Broker,那么建立一个新的连接,并且将这个  主机和连接注册到  _connections之中                if (!_connections.containsKey(host)) {                        _connections.put(host, new ConnectionInfo(new SimpleConsumer(                                        host.host, host.port, _config.socketTimeoutMs,                                        _config.bufferSizeBytes, _config.clientId)));                }                                // ---------   在这里,不管之前有没有都只取一次 -------------                                //当包含了,那就直接取出                ConnectionInfo info = _connections.get(host);                info.partitions.add(partition);                return info.consumer;        }        public SimpleConsumer getConnection(Partition partition) {                // ConnectionInfo 之中封装了一个simpleConsumer                ConnectionInfo info = _connections.get(partition.host);                if (info != null) {                        return info.consumer;                }                return null;        }        /**         * @param port    固定的Broker         * @param partition  固定的分区         */        public void unregister(Broker port, int partition) {                ConnectionInfo info = _connections.get(port);                info.partitions.remove(partition);                if (info.partitions.isEmpty()) {                        info.consumer.close();                        _connections.remove(port);                }        }        public void unregister(Partition partition) {                unregister(partition.host, partition.partition);        }        public void clear() {                for (ConnectionInfo info : _connections.values()) {                        info.consumer.close();                }                _connections.clear();        }}

与前文有关

1: 在DynamicPartitionConnections之中,我们持有了一个 IBrokerReader的接口对象。

2 : 由于IBrokerReader 派生出了

2.1 StaticBrokerReader

2.2 ZBrokerReader

在这个序列的一系列博文之中,ZBrokerReader已经进行了详尽的分析,并且在赋值的过程之中,IBrokerReader也是实例化为ZBrokerReader了。

内部类:

DynamicPartitionConnections 持有了一个 CinnectionInfo的内部类

static class ConnectionInfo {                //内部维持了一个SimpleConsumer                SimpleConsumer consumer;                                //分区                Set partitions = new HashSet();                public ConnectionInfo(SimpleConsumer consumer) {                        this.consumer = consumer;                }        }

1: 对于每一个Connection内部都维持了一个SimpleConsumer ,以及一个 Set集合 partitions

2 :在DynamicPartitionConnections里面我们维持了一个_connections的对象

Map _connections = new HashMap();

3 :在连接维护之中,关键的地方是维护一个 register注册的行为:

public SimpleConsumer register(Broker host, int partition) {

4: 如果_connections之中没有包含Broker,那么将会再建立一个新的连接,并且将Broker和Connection 注册到_connections之中

5:在注册的过程之中,不包含就注册,最后都直接取出SimpleConsumer,这个SimpleConsumer

封装了

new ConnectionInfo(new SimpleConsumer(

host.host, host.port, _config.socketTimeoutMs,

_config.bufferSizeBytes, _config.clientId)):

到此,相信大家对"Storm-kafka接口怎么实现"有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

0