千家信息网

storm-kafka-client使用的示例分析

发表于:2025-02-03 作者:千家信息网编辑
千家信息网最后更新 2025年02月03日,storm-kafka-client使用的示例分析,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。package hgs.core.sk;
千家信息网最后更新 2025年02月03日storm-kafka-client使用的示例分析

storm-kafka-client使用的示例分析,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。

package hgs.core.sk;import java.util.Map;import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.storm.Config;import org.apache.storm.LocalCluster;import org.apache.storm.StormSubmitter;import org.apache.storm.kafka.spout.ByTopicRecordTranslator;import org.apache.storm.kafka.spout.KafkaSpout;import org.apache.storm.kafka.spout.KafkaSpoutConfig;import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy;import org.apache.storm.task.OutputCollector;import org.apache.storm.task.TopologyContext;import org.apache.storm.topology.OutputFieldsDeclarer;import org.apache.storm.topology.TopologyBuilder;import org.apache.storm.topology.base.BaseRichBolt;import org.apache.storm.tuple.Fields;import org.apache.storm.tuple.Tuple;import org.apache.storm.tuple.Values;//参考如下//https://community.hortonworks.com/articles/87597/how-to-write-topology-with-the-new-kafka-spout-cli.html//https://github.com/apache/storm/blob/master/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyMainNamedTopics.java#L52public class StormKafkaMainTest {                public static void main(String[] args) {                TopologyBuilder builder = new TopologyBuilder();                //该类将传入的kafka记录转换为storm的tuple                ByTopicRecordTranslator brt =                                 new ByTopicRecordTranslator<>( (r) -> new Values(r.value(),r.topic()),new Fields("values","test7"));                //设置要消费的topic即test7                brt.forTopic("test7", (r) -> new Values(r.value(),r.topic()), new Fields("values","test7"));                //类似之前的SpoutConfig                KafkaSpoutConfig ksc = KafkaSpoutConfig                                //bootstrapServers 以及topic(test7)                                .builder("bigdata01:9092,bigdata02:9092,bigdata03:9092", "test7")                                //设置group.id                                .setProp(ConsumerConfig.GROUP_ID_CONFIG, "skc-test")                                //设置开始消费的气势位置                                .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.LATEST)                                //设置提交消费边界的时长间隔                                .setOffsetCommitPeriodMs(10_000)                                //Translator                                .setRecordTranslator(brt)                                .build();                                builder.setSpout("kafkaspout", new KafkaSpout<>(ksc), 2);                builder.setBolt("mybolt1", new MyboltO(), 4).shuffleGrouping("kafkaspout");                       Config config = new Config();       config.setNumWorkers(2);       config.setNumAckers(0);       try {                        StormSubmitter.submitTopology("storm-kafka-clients", config, builder.createTopology());                } catch (Exception e) {                        e.printStackTrace();                }        /*            LocalCluster cu  = new LocalCluster();       cu.submitTopology("test", config, builder.createTopology());*/        }}class  MyboltO extends  BaseRichBolt{        private static final long serialVersionUID = 1L;        OutputCollector collector = null;        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {                this.collector = collector;        }        public void execute(Tuple input) {                //这里把消息大一出来,在对应的woker下面的日志可以找到打印的内容                String out = input.getString(0);                System.out.println(out);                //collector.ack(input);        }        public void declareOutputFields(OutputFieldsDeclarer declarer) {                        }                }

pom.xml文件

  4.0.0  hgs  core.sk  1.0.0-SNAPSHOT  jar  core.sk  http://maven.apache.org      UTF-8              junit      junit      3.8.1      test                                                 org.apache.storm            storm-kafka-client            1.1.3                              org.apache.storm            storm-core              1.1.3              provided                            org.apache.kafka            kafka_2.11            1.0.0                                              org.slf4j                      slf4j-log4j12                                                    org.apache.zookeeper                    zookeeper                                                                             org.clojure            clojure            1.7.0                                    org.apache.kafka            kafka-clients            1.0.0                                                             maven-assembly-plugin                2.2                                                                                                                    hgs.core.sk.StormKafkaMainTest                                                                                                                                                jar-with-dependencies                                                                                                                                        make-assembly                        package                                                    single                                                                                                                 org.apache.maven.plugins                maven-compiler-plugin                                    1.8                    1.8                                        
//以下为lambda表达式,因为在上面用大了,所以在这儿记录一下,以免以后看不懂import java.util.UUID;import org.junit.jupiter.api.Test;public class TEst {        @Test        public void sysConfig() {                String[] ags = {"his is my first storm program so i hope it will success",                                "i love bascketball",                                "the day of my birthday i was alone"};                String uuid = UUID.randomUUID().toString();                String nexttuple= ags[new Random().nextInt(ags.length)];                System.out.println(nexttuple);        }                @Test        public void lambdaTest() {                int b  = 100;                //该出返回10*a的值、                //"(a) -> 10*a" 相当于 new  testinter();                printPerson((a) -> 10*a) ;        }                void printPerson( testinter t) {                //穿过来的t需要一个参数a 即下面借口中定义的方法sysoutitems(int a )                System.out.println(t.sysoutitems(100));        };        }//定义接口,在lambda表达式运用中,必须为借口,并且借口只能有一个方法interface testinter{        T sysoutitems(int a );        //void aAndb(int a, int b );}

看完上述内容,你们掌握storm-kafka-client使用的示例分析的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注行业资讯频道,感谢各位的阅读!

0