storm-kafka-client使用的示例分析
发表于:2025-02-03 作者:千家信息网编辑
千家信息网最后更新 2025年02月03日,storm-kafka-client使用的示例分析,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。package hgs.core.sk;
千家信息网最后更新 2025年02月03日storm-kafka-client使用的示例分析
storm-kafka-client使用的示例分析,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。
package hgs.core.sk;import java.util.Map;import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.storm.Config;import org.apache.storm.LocalCluster;import org.apache.storm.StormSubmitter;import org.apache.storm.kafka.spout.ByTopicRecordTranslator;import org.apache.storm.kafka.spout.KafkaSpout;import org.apache.storm.kafka.spout.KafkaSpoutConfig;import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy;import org.apache.storm.task.OutputCollector;import org.apache.storm.task.TopologyContext;import org.apache.storm.topology.OutputFieldsDeclarer;import org.apache.storm.topology.TopologyBuilder;import org.apache.storm.topology.base.BaseRichBolt;import org.apache.storm.tuple.Fields;import org.apache.storm.tuple.Tuple;import org.apache.storm.tuple.Values;//参考如下//https://community.hortonworks.com/articles/87597/how-to-write-topology-with-the-new-kafka-spout-cli.html//https://github.com/apache/storm/blob/master/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyMainNamedTopics.java#L52public class StormKafkaMainTest { public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder(); //该类将传入的kafka记录转换为storm的tuple ByTopicRecordTranslatorbrt = new ByTopicRecordTranslator<>( (r) -> new Values(r.value(),r.topic()),new Fields("values","test7")); //设置要消费的topic即test7 brt.forTopic("test7", (r) -> new Values(r.value(),r.topic()), new Fields("values","test7")); //类似之前的SpoutConfig KafkaSpoutConfig ksc = KafkaSpoutConfig //bootstrapServers 以及topic(test7) .builder("bigdata01:9092,bigdata02:9092,bigdata03:9092", "test7") //设置group.id .setProp(ConsumerConfig.GROUP_ID_CONFIG, "skc-test") //设置开始消费的气势位置 .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.LATEST) //设置提交消费边界的时长间隔 .setOffsetCommitPeriodMs(10_000) //Translator .setRecordTranslator(brt) .build(); builder.setSpout("kafkaspout", new KafkaSpout<>(ksc), 2); builder.setBolt("mybolt1", new MyboltO(), 4).shuffleGrouping("kafkaspout"); Config config = new Config(); config.setNumWorkers(2); config.setNumAckers(0); try { StormSubmitter.submitTopology("storm-kafka-clients", config, builder.createTopology()); } catch (Exception e) { e.printStackTrace(); } /* LocalCluster cu = new LocalCluster(); cu.submitTopology("test", config, builder.createTopology());*/ }}class MyboltO extends BaseRichBolt{ private static final long serialVersionUID = 1L; OutputCollector collector = null; public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } public void execute(Tuple input) { //这里把消息大一出来,在对应的woker下面的日志可以找到打印的内容 String out = input.getString(0); System.out.println(out); //collector.ack(input); } public void declareOutputFields(OutputFieldsDeclarer declarer) { } }
pom.xml文件
4.0.0 hgs core.sk 1.0.0-SNAPSHOT jar core.sk http://maven.apache.org UTF-8 junit junit 3.8.1 test org.apache.storm storm-kafka-client 1.1.3 org.apache.storm storm-core 1.1.3 provided org.apache.kafka kafka_2.11 1.0.0 org.slf4j slf4j-log4j12 org.apache.zookeeper zookeeper org.clojure clojure 1.7.0 org.apache.kafka kafka-clients 1.0.0 maven-assembly-plugin 2.2 hgs.core.sk.StormKafkaMainTest jar-with-dependencies make-assembly package single org.apache.maven.plugins maven-compiler-plugin 1.8
//以下为lambda表达式,因为在上面用大了,所以在这儿记录一下,以免以后看不懂import java.util.UUID;import org.junit.jupiter.api.Test;public class TEst { @Test public void sysConfig() { String[] ags = {"his is my first storm program so i hope it will success", "i love bascketball", "the day of my birthday i was alone"}; String uuid = UUID.randomUUID().toString(); String nexttuple= ags[new Random().nextInt(ags.length)]; System.out.println(nexttuple); } @Test public void lambdaTest() { int b = 100; //该出返回10*a的值、 //"(a) -> 10*a" 相当于 new testinter(); printPerson((a) -> 10*a) ; } void printPerson( testinter t) { //穿过来的t需要一个参数a 即下面借口中定义的方法sysoutitems(int a ) System.out.println(t.sysoutitems(100)); }; }//定义接口,在lambda表达式运用中,必须为借口,并且借口只能有一个方法interface testinter { T sysoutitems(int a ); //void aAndb(int a, int b );}
看完上述内容,你们掌握storm-kafka-client使用的示例分析的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注行业资讯频道,感谢各位的阅读!
方法
借口
内容
消费
示例
分析
更多
表达式
问题
束手无策
为此
位置
原因
参数
对此
技能
接口
文件
日志
时长
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
神州数码服务器安装
云堡垒服务器
网络安全入门教学
oreda数据库怎么用
云领网络技术有限公司官网
c3服务器
数据库ER图1与N
国泰安数据库指标公式
一台电脑能装几个数据库
浪潮最强ai服务器
网络打印机服务器接口
网络安全性测评评估间隔时间
三级网络技术应用题视频
青岛三友游戏软件开发
上海软件开发跳槽南京工资
ru服务器
星球庄园软件开发商如何赢利
溧阳软件开发招聘
女生适合计算机网络技术
互联网和新科技哪个好
删除服务器文件记录
招聘股票交易软件开发工程师
河南睿航软件开发有限公司
云堡垒服务器
联想sr658服务器怎么做阵列
辽宁服务器电源采购
网络安全大数据人工智能
软件开发企业代码仓库
tp路由器服务器无响应
mongo数据库表结构