千家信息网

如何进行storm1.1.3与kafka1.0.0整合

发表于:2025-02-05 作者:千家信息网编辑
千家信息网最后更新 2025年02月05日,本篇文章给大家分享的是有关如何进行storm1.1.3与kafka1.0.0整合,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。packa
千家信息网最后更新 2025年02月05日如何进行storm1.1.3与kafka1.0.0整合

本篇文章给大家分享的是有关如何进行storm1.1.3与kafka1.0.0整合,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。

package hgs.core.sk;import java.util.Map;import org.apache.storm.Config;import org.apache.storm.LocalCluster;import org.apache.storm.StormSubmitter;import org.apache.storm.kafka.BrokerHosts;import org.apache.storm.kafka.KafkaSpout;import org.apache.storm.kafka.SpoutConfig;import org.apache.storm.kafka.ZkHosts;import org.apache.storm.task.OutputCollector;import org.apache.storm.task.TopologyContext;import org.apache.storm.topology.OutputFieldsDeclarer;import org.apache.storm.topology.TopologyBuilder;import org.apache.storm.topology.base.BaseRichBolt;import org.apache.storm.tuple.Tuple;@SuppressWarnings("deprecation")public class StormKafkaMainTest {                public static void main(String[] args) {                TopologyBuilder builder = new TopologyBuilder();                //zookeeper链接地址                BrokerHosts hosts = new ZkHosts("bigdata01:2181,bigdata02:2181,bigdata03:2181");                //KafkaSpout需要一个config,参数代表的意义1:zookeeper链接,2:消费kafka的topic,3,4:记录消费offset的zookeeper地址 ,这里会保存在 zookeeper                //集群的/test7/consume下面                SpoutConfig sconfig = new SpoutConfig(hosts, "test7", "/test7", "consume");                //消费的时候忽略offset从头开始消费,这里可以注释掉,因为消费的offset在zookeeper中可以找到                sconfig.ignoreZkOffsets=true;                //sconfig.scheme = new SchemeAsMultiScheme( new StringScheme() );                builder.setSpout("kafkaspout", new KafkaSpout(sconfig), 1);                builder.setBolt("mybolt1", new MyboltO(), 1).shuffleGrouping("kafkaspout");                       Config config = new Config();       config.setNumWorkers(1);       try {                        StormSubmitter.submitTopology("storm----kafka--test", config, builder.createTopology());                } catch (Exception e) {                        e.printStackTrace();                }        /*            LocalCluster cu  = new LocalCluster();       cu.submitTopology("test", config, builder.createTopology());*/        }}class  MyboltO extends  BaseRichBolt{        private static final long serialVersionUID = 1L;        OutputCollector collector = null;        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {                this.collector = collector;        }        public void execute(Tuple input) {                //这里把消息大一出来,在对应的woker下面的日志可以找到打印的内容                //因为得到的内容是byte数组,所以需要转换                String out = new String((byte[])input.getValue(0));                System.out.println(out);                collector.ack(input);                        }        public void declareOutputFields(OutputFieldsDeclarer declarer) {                        }                }
pom.xml文件的依赖
  4.0.0  hgs  core.sk  1.0.0-SNAPSHOT  jar  core.sk  http://maven.apache.org      UTF-8              junit      junit      3.8.1      test                        org.apache.storm            storm-kafka            1.1.3                              org.apache.storm            storm-core              1.1.3              provided                            org.apache.kafka            kafka_2.11            1.0.0                                              org.slf4j                      slf4j-log4j12                                                    org.apache.zookeeper                    zookeeper                                                                             org.clojure            clojure            1.7.0                                    org.apache.kafka            kafka-clients            1.0.0                                                             maven-assembly-plugin                2.2                                                                                                                    hgs.core.sk.StormKafkaMainTest                                                                                                                                                jar-with-dependencies                                                                                                                                        make-assembly                        package                                                    single                                                                                                                 org.apache.maven.plugins                maven-compiler-plugin                                    1.8                    1.8                                        

以上就是如何进行storm1.1.3与kafka1.0.0整合,小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注行业资讯频道。

0