Storm如何和Kafka进行整合
发表于:2025-02-04 作者:千家信息网编辑
千家信息网最后更新 2025年02月04日,这篇文章将为大家详细讲解有关Storm如何和Kafka进行整合,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。对于Storm 如何和Kafka进行整合p
千家信息网最后更新 2025年02月04日Storm如何和Kafka进行整合
这篇文章将为大家详细讲解有关Storm如何和Kafka进行整合,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。
对于Storm 如何和Kafka进行整合
package com.mixbox.storm.kafka;import backtype.storm.Config;import backtype.storm.metric.api.IMetric;import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichSpout;import kafka.message.Message;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import com.mixbox.storm.kafka.PartitionManager.KafkaMessageId;import java.util.*;/** * @author Yin Shuai */public class KafkaSpout extends BaseRichSpout { public static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class); /** * 内部类,Message和Offset的偏移量对象 * * @author Yin Shuai */ public static class MessageAndRealOffset { public Message msg; public long offset; public MessageAndRealOffset(Message msg, long offset) { this.msg = msg; this.offset = offset; } } /** * 发射的枚举类 * @author Yin Shuai */ static enum EmitState { EMITTED_MORE_LEFT, EMITTED_END, NO_EMITTED } String _uuid = UUID.randomUUID().toString(); SpoutConfig _spoutConfig; SpoutOutputCollector _collector; // 分区的协调器,getMyManagedPartitions 拿到我所管理的分区 PartitionCoordinator _coordinator; // 动态的分区链接:保存到kafka各个节点的连接,以及负责的topic的partition号码 DynamicPartitionConnections _connections; // 提供了从zookeeper读写kafka 消费者信息的功能 ZkState _state; // 上次更新的毫秒数 long _lastUpdateMs = 0; // 当前的分区 int _currPartitionIndex = 0; public KafkaSpout(SpoutConfig spoutConf) { _spoutConfig = spoutConf; } @SuppressWarnings("unchecked") @Override public void open(Map conf, final TopologyContext context, final SpoutOutputCollector collector) { _collector = collector; ListzkServers = _spoutConfig.zkServers; // 初始化的时候如果zkServers 为空,那么初始化 默认的配置Zookeeper if (zkServers == null) { zkServers = new ArrayList () { { add("192.168.50.144"); add("192.168.50.169"); add("192.168.50.207"); } }; // zkServers = // (List )conf.get(Config.STORM_ZOOKEEPER_SERVERS); System.out.println(" 使用的是Storm默认配置的Zookeeper List : " + zkServers); } Integer zkPort = _spoutConfig.zkPort; // 在这里我们也同时 来检查zookeeper的端口是否为空 if (zkPort == null) { zkPort = 2181; // zkPort = ((Number) // conf.get(Config.STORM_ZOOKEEPER_PORT)).intValue(); } Map stateConf = new HashMap(conf); stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, zkServers); stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, zkPort); stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT, _spoutConfig.zkRoot); // 通过保存的配置文件,我们持有了一个zookeeper的state,支持节点内容的创建和删除 _state = new ZkState(stateConf); // 对于连接的维护 _connections = new DynamicPartitionConnections(_spoutConfig, KafkaUtils.makeBrokerReader(conf, _spoutConfig)); // using TransactionalState like this is a hack // 拿到总共的任务次数 int totalTasks = context .getComponentTasks(context.getThisComponentId()).size(); // 判断当前的主机是否是静态的statichost if (_spoutConfig.hosts instanceof StaticHosts) { _coordinator = new StaticCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), totalTasks, _uuid); // 当你拿到的spoutConfig是zkhost的时候 } else { _coordinator = new ZkCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), totalTasks, _uuid); } context.registerMetric("kafkaOffset", new IMetric() { KafkaUtils.KafkaOffsetMetric _kafkaOffsetMetric = new KafkaUtils.KafkaOffsetMetric( _spoutConfig.topic, _connections); @Override public Object getValueAndReset() { List pms = _coordinator .getMyManagedPartitions(); Set latestPartitions = new HashSet(); for (PartitionManager pm : pms) { latestPartitions.add(pm.getPartition()); } _kafkaOffsetMetric.refreshPartitions(latestPartitions); for (PartitionManager pm : pms) { _kafkaOffsetMetric.setLatestEmittedOffset( pm.getPartition(), pm.lastCompletedOffset()); } return _kafkaOffsetMetric.getValueAndReset(); } }, _spoutConfig.metricsTimeBucketSizeInSecs); context.registerMetric("kafkaPartition", new IMetric() { @Override public Object getValueAndReset() { List pms = _coordinator .getMyManagedPartitions(); Map concatMetricsDataMaps = new HashMap(); for (PartitionManager pm : pms) { concatMetricsDataMaps.putAll(pm.getMetricsDataMap()); } return concatMetricsDataMaps; } }, _spoutConfig.metricsTimeBucketSizeInSecs); } @Override public void close() { _state.close(); } @Override public void nextTuple() { // Storm-spout 是从kafka 消费数据,把 kafka 的 consumer // 当成是一个spout,并且向其他的bolt的发送数据 // 拿到当前我管理的这些PartitionsManager List managers = _coordinator.getMyManagedPartitions(); for (int i = 0; i < managers.size(); i++) { // 对于每一个分区的 PartitionManager // in case the number of managers decreased // 当前的分区 _currPartitionIndex = _currPartitionIndex % managers.size(); // 拿到当前的分区,并且发送,这里把SpoutOutputCollector传递进去了,由他发射元祖 EmitState state = managers.get(_currPartitionIndex) .next(_collector); // 如果发送状态为:发送-还有剩余 if (state != EmitState.EMITTED_MORE_LEFT) { _currPartitionIndex = (_currPartitionIndex + 1) % managers.size(); } // 如果发送的状态为: 发送-没有剩余 if (state != EmitState.NO_EMITTED) { break; } } long now = System.currentTimeMillis(); if ((now - _lastUpdateMs) > _spoutConfig.stateUpdateIntervalMs) { commit(); } } @Override public void ack(Object msgId) { KafkaMessageId id = (KafkaMessageId) msgId; PartitionManager m = _coordinator.getManager(id.partition); if (m != null) { m.ack(id.offset); } } @Override public void fail(Object msgId) { KafkaMessageId id = (KafkaMessageId) msgId; PartitionManager m = _coordinator.getManager(id.partition); if (m != null) { m.fail(id.offset); } } @Override public void deactivate() { // 停止工作 commit(); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { System.out.println(_spoutConfig.scheme.getOutputFields()); declarer.declare(_spoutConfig.scheme.getOutputFields()); } private void commit() { _lastUpdateMs = System.currentTimeMillis(); for (PartitionManager manager : _coordinator.getMyManagedPartitions()) { manager.commit(); } }}
在粗浅的代码阅读之后,在这里进行详细的分析:
1 KafkaSpout之中持有了一个 MessageAndRealOffset 的内部类
public static class MessageAndRealOffset{ public Message msg; public long offset; public MessageAndRealOffset(Message msg,long offset) { this.msg = msg; this.offset = offset; }}
2 在Spout之中我们还持有了一个PartitionCoordinator的分区协调器,默认的情况我们实例化的对象
是ZKCoordinator
关于Storm如何和Kafka进行整合就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。
整合
内容
配置
之中
对象
数据
文章
时候
更多
知识
篇文章
节点
协调器
消费
管理
不错
粗浅
主机
代码
任务
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
was数据库连接池不够用
花生壳管理 服务器
计算机网络安全论文wifi
网络安全法明确了公司在
分布式数据库如何建立
网络安全法域名
博兴快消品管理软件开发公司
腾讯的软件开发者
ftp客户端与服务器器端
网络安全宣传团委书记讲话
手机 配置 数据库
辽宁月考app服务器地址
当前网络安全工作的困难
新三板网络安全科技企业
广西社区智慧养老软件开发
python网络数据库
网络技术前端工程师
数据库中表关联用哪个字段
吉林省土地调查数据库
中山数据链软件开发销售厂
模拟主持网络安全
建设数据库技术选型关键因素
新点投标文件制作软件开发
数据库服务名字
关于网络安全的一篇英语作文
我的世界服务器 红石
sql 数据库备份还原
文件共享服务器安全吗
软件开发岗电话面试经验
高邮市汇聚网络技术有限公司