千家信息网

PartitionManager分区管理器怎么使用

发表于:2025-02-07 作者:千家信息网编辑
千家信息网最后更新 2025年02月07日,这篇文章主要介绍"PartitionManager分区管理器怎么使用",在日常操作中,相信很多人在PartitionManager分区管理器怎么使用问题上存在疑惑,小编查阅了各式资料,整理出简单好用的
千家信息网最后更新 2025年02月07日PartitionManager分区管理器怎么使用

这篇文章主要介绍"PartitionManager分区管理器怎么使用",在日常操作中,相信很多人在PartitionManager分区管理器怎么使用问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"PartitionManager分区管理器怎么使用"的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

阅读背景:对于java内部类有一个粗浅的认识

阅读目的:了解kafka 分区是如何在Storm接口之中进行管理的

最终主题:详尽的梳理PartitionManager的整个过程

package com.mixbox.storm.kafka;import backtype.storm.Config;import backtype.storm.metric.api.CombinedMetric;import backtype.storm.metric.api.CountMetric;import backtype.storm.metric.api.MeanReducer;import backtype.storm.metric.api.ReducedMetric;import backtype.storm.spout.SpoutOutputCollector;import com.google.common.collect.ImmutableMap;import com.mixbox.storm.kafka.KafkaSpout.EmitState;import com.mixbox.storm.kafka.KafkaSpout.MessageAndRealOffset;import com.mixbox.storm.kafka.trident.MaxMetric;import kafka.api.OffsetRequest;import kafka.javaapi.consumer.SimpleConsumer;import kafka.javaapi.message.ByteBufferMessageSet;import kafka.message.MessageAndOffset;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.util.*;/** * 分区的管理器 *  * @author Yin Shuai *  */public class PartitionManager {        public static final Logger LOG = LoggerFactory                        .getLogger(PartitionManager.class);        private final CombinedMetric _fetchAPILatencyMax;        private final ReducedMetric _fetchAPILatencyMean;        private final CountMetric _fetchAPICallCount;        private final CountMetric _fetchAPIMessageCount;        /**         * kafka MessageID 封装了 partition 和offset         *          * @author Yin Shuai         */        static class KafkaMessageId {                public Partition partition;                public long offset;                public KafkaMessageId(Partition partition, long offset) {                        this.partition = partition;                        this.offset = offset;                }        }        // 被发送的偏移量        Long _emittedToOffset;        SortedSet _pending = new TreeSet();        // 已经提交的        Long _committedTo;        // 等待去发射        LinkedList _waitingToEmit = new LinkedList();        // 分区        Partition _partition;        // Storm Spout的配置文件        SpoutConfig _spoutConfig;        // topology 的实例ID        String _topologyInstanceId;        // kafka 底层的消费者ID        SimpleConsumer _consumer;        // 动态的分区Connection        DynamicPartitionConnections _connections;        //ZKState 状态的维护        ZkState _state;        //Storm的配置文件        Map _stormConf;        //        @SuppressWarnings("unchecked")        public PartitionManager(DynamicPartitionConnections connections,                        String topologyInstanceId, ZkState state, Map stormConf,                        SpoutConfig spoutConfig, Partition id) {                _partition = id;                _connections = connections;                _spoutConfig = spoutConfig;                _topologyInstanceId = topologyInstanceId;                _consumer = connections.register(id.host, id.partition);                _state = state;                _stormConf = stormConf;                String jsonTopologyId = null;                Long jsonOffset = null;                String path = committedPath();                try {                                                Map json = _state.readJSON(path);                        LOG.info("Read partition information from: " + path + " --> "                                        + json);                        if (json != null) {                                jsonTopologyId = (String) ((Map) json                                                .get("topology")).get("id");                                jsonOffset = (Long) json.get("offset");                        }                } catch (Throwable e) {                        LOG.warn("Error reading and/or parsing at ZkNode: " + path, e);                }                if (jsonTopologyId == null || jsonOffset == null) { // failed to parse                                                                                                                        // JSON?                        _committedTo = KafkaUtils.getOffset(_consumer, spoutConfig.topic,                                        id.partition, spoutConfig);                        LOG.info("No partition information found, using configuration to determine offset");                } else if (!topologyInstanceId.equals(jsonTopologyId)                                && spoutConfig.forceFromStart) {                        _committedTo = KafkaUtils.getOffset(_consumer, spoutConfig.topic,                                        id.partition, spoutConfig.startOffsetTime);                        LOG.info("Topology change detected and reset from start forced, using configuration to determine offset");                } else {                        _committedTo = jsonOffset;                        LOG.info("Read last commit offset from zookeeper: " + _committedTo                                        + "; old topology_id: " + jsonTopologyId                                        + " - new topology_id: " + topologyInstanceId);                }                LOG.info("Starting " + _partition + " from offset " + _committedTo);                _emittedToOffset = _committedTo;                _fetchAPILatencyMax = new CombinedMetric(new MaxMetric());                _fetchAPILatencyMean = new ReducedMetric(new MeanReducer());                _fetchAPICallCount = new CountMetric();                _fetchAPIMessageCount = new CountMetric();        }        public Map getMetricsDataMap() {                Map ret = new HashMap();                ret.put(_partition + "/fetchAPILatencyMax",                                _fetchAPILatencyMax.getValueAndReset());                ret.put(_partition + "/fetchAPILatencyMean",                                _fetchAPILatencyMean.getValueAndReset());                ret.put(_partition + "/fetchAPICallCount",                                _fetchAPICallCount.getValueAndReset());                ret.put(_partition + "/fetchAPIMessageCount",                                _fetchAPIMessageCount.getValueAndReset());                return ret;        }        // returns false if it's reached the end of current batch        public EmitState next(SpoutOutputCollector collector) {                                //等待去发送的 为空了。                if (_waitingToEmit.isEmpty()) {                        // 开始装载                        fill();                }                                while (true) {                                                //检索并移除List中间的第一个元素                        MessageAndRealOffset toEmit = _waitingToEmit.pollFirst();                                                //要发送的为空的时候, 没有发生的                        if (toEmit == null) {                                return EmitState.NO_EMITTED;                        }                                                // 这里的tups                        Iterable> tups = KafkaUtils.generateTuples(                                        _spoutConfig, toEmit.msg);                        if (tups != null) {                                for (List tup : tups) {                                        collector.emit(tup, new KafkaMessageId(_partition,                                                        toEmit.offset));                                }                                break;                        } else {                                ack(toEmit.offset);                        }                }                if (!_waitingToEmit.isEmpty()) {                        return EmitState.EMITTED_MORE_LEFT;                } else {                        return EmitState.EMITTED_END;                }        }                /**         * 填充的行为         * 这里真正的决定了你有哪些数据需要去发送         */        private void fill() {                long start = System.nanoTime();                                                /*                 *  拿到MessageSet                 */                ByteBufferMessageSet msgs = KafkaUtils.fetchMessages(_spoutConfig,                                _consumer, _partition, _emittedToOffset);                        long end = System.nanoTime();                        long millis = (end - start) / 1000000;                                _fetchAPILatencyMax.update(millis);                _fetchAPILatencyMean.update(millis);                _fetchAPICallCount.incr();                                int numMessages = countMessages(msgs);                                _fetchAPIMessageCount.incrBy(numMessages);                if (numMessages > 0) {                        LOG.info("Fetched " + numMessages + " messages from: " + _partition);                }                                for (MessageAndOffset msg : msgs) {                        _pending.add(_emittedToOffset);                        _waitingToEmit.add(new MessageAndRealOffset(msg.message(),                                        _emittedToOffset));                        _emittedToOffset = msg.nextOffset();                }                                if (numMessages > 0) {                        LOG.info("Added " + numMessages + " messages from: " + _partition                                        + " to internal buffers");                }        }        private int countMessages(ByteBufferMessageSet messageSet) {                int counter = 0;                for (MessageAndOffset messageAndOffset : messageSet) {                        counter = counter + 1;                }                return counter;        }        public void ack(Long offset) {                _pending.remove(offset);        }        public void fail(Long offset) {                // TODO: should it use in-memory ack set to skip anything that's been                // acked but not committed???                // things might get crazy with lots of timeouts                if (_emittedToOffset > offset) {                        _emittedToOffset = offset;                        _pending.tailSet(offset).clear();                }        }        public void commit() {                                // 最新完成的偏移量                long lastCompletedOffset = lastCompletedOffset();                                //写最新的完全的偏移量到zk,的某个分区,到某一个topology                if (lastCompletedOffset != lastCommittedOffset()) {                        LOG.info("Writing last completed offset (" + lastCompletedOffset                                        + ") to ZK for " + _partition + " for topology: "                                        + _topologyInstanceId);                                                                        Map data = ImmutableMap                                        .builder()                                        .put("topology",                                                        ImmutableMap.of("id", _topologyInstanceId, "name",                                                                        _stormConf.get(Config.TOPOLOGY_NAME)))                                        .put("offset", lastCompletedOffset)                                        .put("partition", _partition.partition)                                        .put("broker",                                                        ImmutableMap.of("host", _partition.host.host,                                                                        "port", _partition.host.port))                                        .put("topic", _spoutConfig.topic).build();                                                // 直接JSON 写入                        _state.writeJSON(committedPath(), data);                                                _committedTo = lastCompletedOffset;                                                LOG.info("Wrote last completed offset (" + lastCompletedOffset                                        + ") to ZK for " + _partition + " for topology: "                                        + _topologyInstanceId);                } else {                        LOG.info("No new offset for " + _partition + " for topology: "                                        + _topologyInstanceId);                }        }                //提交的路径        private String committedPath() {                return "/" + _spoutConfig.zkRoot + "/" + _spoutConfig.id + "/"                                + _partition.getId();        }        //拿到最新的分区便宜量        public long queryPartitionOffsetLatestTime() {                return KafkaUtils.getOffset(_consumer, _spoutConfig.topic,                                _partition.partition, OffsetRequest.LatestTime());        }        //最新的提交的便宜量        public long lastCommittedOffset() {                return _committedTo;        }        public long lastCompletedOffset() {                if (_pending.isEmpty()) {                        return _emittedToOffset;                } else {                        return _pending.first();                }        }        //拿到最新的分区        public Partition getPartition() {                return _partition;        }        public void close() {                _connections.unregister(_partition.host, _partition.partition);        }}

1 PartitionManager封装了一个Static 的class kafkaMessageId,并且封装了某个分区和偏移量

static class KafkaMessageId {                public Partition partition;                public long offset;                public KafkaMessageId(Partition partition, long offset) {                        this.partition = partition;                        this.offset = offset;                }        }

2: 在PartitionManager中同时持有了一下的实例变量:

2.1 已经发射的数据 pending

2.2 已经提交的 committedTo

2.3 等待去发射的 _waitingToEmit

2.4 具体的分区 _partition

其中 _waitingToEmit 是一个LinkedList

3 : PartitionManager 在初始化的时候,需要传递的参数是

topologyInstanceId

DynamicPartitionConnections

ZkState

Map

SpoutConfig

Partition

SimpleConsumer 对象,SimpleConsumer对象将在 DynamicPartitionConnections中

通过register的方法进行注册

到此,关于"PartitionManager分区管理器怎么使用"的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注网站,小编会继续努力为大家带来更多实用的文章!

0