如何进行storm1.1.3与kafka1.0.0整合
发表于:2025-02-05 作者:千家信息网编辑
千家信息网最后更新 2025年02月05日,本篇文章给大家分享的是有关如何进行storm1.1.3与kafka1.0.0整合,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。packa
千家信息网最后更新 2025年02月05日如何进行storm1.1.3与kafka1.0.0整合
本篇文章给大家分享的是有关如何进行storm1.1.3与kafka1.0.0整合,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。
package hgs.core.sk;import java.util.Map;import org.apache.storm.Config;import org.apache.storm.LocalCluster;import org.apache.storm.StormSubmitter;import org.apache.storm.kafka.BrokerHosts;import org.apache.storm.kafka.KafkaSpout;import org.apache.storm.kafka.SpoutConfig;import org.apache.storm.kafka.ZkHosts;import org.apache.storm.task.OutputCollector;import org.apache.storm.task.TopologyContext;import org.apache.storm.topology.OutputFieldsDeclarer;import org.apache.storm.topology.TopologyBuilder;import org.apache.storm.topology.base.BaseRichBolt;import org.apache.storm.tuple.Tuple;@SuppressWarnings("deprecation")public class StormKafkaMainTest { public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder(); //zookeeper链接地址 BrokerHosts hosts = new ZkHosts("bigdata01:2181,bigdata02:2181,bigdata03:2181"); //KafkaSpout需要一个config,参数代表的意义1:zookeeper链接,2:消费kafka的topic,3,4:记录消费offset的zookeeper地址 ,这里会保存在 zookeeper //集群的/test7/consume下面 SpoutConfig sconfig = new SpoutConfig(hosts, "test7", "/test7", "consume"); //消费的时候忽略offset从头开始消费,这里可以注释掉,因为消费的offset在zookeeper中可以找到 sconfig.ignoreZkOffsets=true; //sconfig.scheme = new SchemeAsMultiScheme( new StringScheme() ); builder.setSpout("kafkaspout", new KafkaSpout(sconfig), 1); builder.setBolt("mybolt1", new MyboltO(), 1).shuffleGrouping("kafkaspout"); Config config = new Config(); config.setNumWorkers(1); try { StormSubmitter.submitTopology("storm----kafka--test", config, builder.createTopology()); } catch (Exception e) { e.printStackTrace(); } /* LocalCluster cu = new LocalCluster(); cu.submitTopology("test", config, builder.createTopology());*/ }}class MyboltO extends BaseRichBolt{ private static final long serialVersionUID = 1L; OutputCollector collector = null; public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } public void execute(Tuple input) { //这里把消息大一出来,在对应的woker下面的日志可以找到打印的内容 //因为得到的内容是byte数组,所以需要转换 String out = new String((byte[])input.getValue(0)); System.out.println(out); collector.ack(input); } public void declareOutputFields(OutputFieldsDeclarer declarer) { } }
pom.xml文件的依赖
4.0.0 hgs core.sk 1.0.0-SNAPSHOT jar core.sk http://maven.apache.org UTF-8 junit junit 3.8.1 test org.apache.storm storm-kafka 1.1.3 org.apache.storm storm-core 1.1.3 provided org.apache.kafka kafka_2.11 1.0.0 org.slf4j slf4j-log4j12 org.apache.zookeeper zookeeper org.clojure clojure 1.7.0 org.apache.kafka kafka-clients 1.0.0 maven-assembly-plugin 2.2 hgs.core.sk.StormKafkaMainTest jar-with-dependencies make-assembly package single org.apache.maven.plugins maven-compiler-plugin 1.8
以上就是如何进行storm1.1.3与kafka1.0.0整合,小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注行业资讯频道。
消费
整合
内容
地址
更多
知识
篇文章
链接
实用
从头
代表
参数
就是
工作会
意义
数组
文件
文章
日志
时候
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
江苏现代软件开发产业化
数据库的等待时间是什么
公安局网络安全支队在哪里
徽磬石坊网络技术有限公司
win7配置成打印服务器
中国数据库软件开发
软件开发的方法论
医院网络安全领导机构职能
数据库面试自我介绍
服务器播放mp4
网络安全学情分析
世界500 强网络安全公司
阎良区软件开发哪家好
数据库应用技术高职高专
护苗网络安全培训课
封天神魔登录不了服务器
虚拟机启动服务器卡死怎么解决
软件开发承揽合同
软件开发竖屏怎么设置
冷水江天气预报软件开发
app本地数据库同步云数据库
网络安全的影响个人层面
张家港运营网络技术要多少钱
升级网络安全
金华专业的模具软件开发
带你一图读懂《网络安全法》
禁止以任何方式对数据库
wincc画面服务器备份
网络安全文案一年级
sql数据库提取年份