千家信息网

windows如何安装storm eclipse调试TopN实例

发表于:2025-01-23 作者:千家信息网编辑
千家信息网最后更新 2025年01月23日,这篇文章将为大家详细讲解有关windows如何安装storm eclipse调试TopN实例,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。一:安装JD配置Java环
千家信息网最后更新 2025年01月23日windows如何安装storm eclipse调试TopN实例

这篇文章将为大家详细讲解有关windows如何安装storm eclipse调试TopN实例,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。

一:安装JD

配置Java环境变量 JAVA_HOME、Path、CLASSPATH三个值分别为(按照自己安装状况设置,此处供参考):

D:\java\jdk1.8

%JAVA_HOME%/bin;%JAVA_HOME%/jre/bin

.;%JAVA_HOME%/lib/dt.jar;%JAVA_HOME%/lib/tools.jar (要加.表示当前路径)

二:安装 Python

这是为了测试安装效果,我们将部署 storm-starter project案例中word coun程序,用的是python写的multi-lang bolt,使用python 2.7.11,安装路径在:

C:\Python27\

三:安装并运行ZooKeeper

Download Apache Zookeeper 3.4.8 ,解压配置:

> cd zookeeper-3.4.8
> copy conf\zoo_sample.cfg conf\zoo.cfg
> .\bin\zkServer.cmd

四:安装Storm

Storm的windows官方版还没有释放,here.下载,源码here下载。

注意1:

源码一定要用这个版本,否则启动会报各种错误,而这些错误和 jdk、python、zookeeper、eclipse 版本都无关。

http://dl.dropboxusercontent.com/s/iglqz73chkul1tu/storm-0.9.1-incubating-SNAPSHOT-12182013.zip

配置Storm环境变量

  • Storm需要STORM_HOME和JAVA_HOME,比如STORM_HOME为:

C:\storm-0.9.1-incubating-SNAPSHOT-12182013\

  • 在PATH中加入:

%STORM_HOME%\bin;C:\Python27\Lib\site-packages\;C:\Python27\Scripts\

此处与参考文章略有不同,下图是参考文章给出的配置

JAVA_HOME已经在安装JDK时手动配置了环境变量,而Python好像是默认自动就会配置好环境变量的,

我的Python目录下没有Scripts文件夹,暂时这样配置就可以了,不影响下面的使用。

五:启动Nimbus, Supervisor, and Storm UI Daemons

  • Nimbus

注意2:

一定要在 STORM_HOME 目录下执行后续命令,否则会报错。

ERROR backtype.storm.event - Error when processing event
java.lang.RuntimeException: java.io.InvalidClassException: clojure.lang.APersistentMap; local class incompatible: stream classdesc serialVersionUID = 8648225932767613808, local class serialVersionUID = 270281984708184947
at backtype.storm.utils.Utils.deserialize(Utils.java:86) ~[storm-core-0.9.1-incubating-SNAPSHOT-12182013.jar:na]

> cd %STORM_HOME%

> storm nimbus

  • Supervisor

> cd %STORM_HOME%

> storm supervisor

  • Storm UI # 可选,也可以用 storm list 查看所有 storm 任务

> cd %STORM_HOME%

> storm ui

浏览器打开http://localhost:8080/ 可看到Storm运行。

六:部署 Word count

下载download a pre-built jar。

部署这个jar在本地:

> storm jar storm-starter-0.0.1-SNAPSHOT-jar-with-dependencies.jar storm.starter.WordCountTopology WordCount -c nimbus.host=localhost

如果你刷新 Storm UI页面,会看到 "WordCount" topology显示列出,点按链接确认它处理数据。

七:eclipse 调试 TopN 实例

storm 求 csdn 密码库中密码出现的 topN,并直接在 eclipse 中调试运行:

package com.bj.test.top10;/** * @Author:tester * @DateTime:2016年6月21日 下午7:58:45 * @Description:  Spout作为数据源,它实现了IRichSpout接口,功能是读取一个文本文件并把它的每一行内容发送给bolt。* @Version:1.0*/import java.io.BufferedReader;import java.io.FileNotFoundException;import java.io.FileReader;import java.util.Map;import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichSpout;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;public class PasswdSpout extends BaseRichSpout {        private SpoutOutputCollector collector;        private FileReader fileReader;        private boolean completed = false;        public void ack(Object msgId) {                System.out.println("==============OK:" + msgId);        }        public void close() {        }        public void fail(Object msgId) {                System.out.println("++++++++++++++FAIL:" + msgId);        }        /**      * 这是Spout最主要的方法,在这里我们读取文本文件,并把它的每一行发射出去(给bolt)      * 这个方法会不断被调用,为了降低它对CPU的消耗,当任务完成时让它sleep一下      * **/          public void nextTuple() {                /**                 * The nextuple it is called forever, so if we have been readed the file                 * we will wait and then return                 */                if (completed) {                        try {                                Thread.sleep(1000);                        } catch (InterruptedException e) {                                // Do nothing                        }                        return;                }                String line;                // Open the reader                BufferedReader reader = new BufferedReader(fileReader);                try {                        // Read all lines                        while ((line = reader.readLine()) != null) {                                String[] words = line.split("#");                                String passwd = words[1].trim();                                // Emit the word                  collector.emit(new Values(passwd));                        /*for(String word : words){                            word = word.trim();                            if(!word.isEmpty()){                                word = word.toLowerCase();                                // Emit the word                                  collector.emit(new Values(word));                            }                        }*/                        }                } catch (Exception e) {                        throw new RuntimeException("Error reading tuple", e);                } finally {                        completed = true;                }        }        /**      * 这是第一个方法,里面接收了三个参数,第一个是创建Topology时的配置,      * 第二个是所有的Topology数据,第三个是用来把Spout的数据发射给bolt      * **/          public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {                try {                        //获取创建Topology时指定的要读取的文件路径                         this.fileReader = new FileReader(conf.get("wordsFile").toString());                } catch (FileNotFoundException e) {                        throw new RuntimeException("Error reading file [" + conf.get("wordFile") + "]");                }                //初始化发射器                  this.collector = collector;        }        /**         * Declare the output field "word"         */        public void declareOutputFields(OutputFieldsDeclarer declarer) {                declarer.declare(new Fields("word"));        }}/////////////////////////////////////////////////////////////////////////////////////////////package com.bj.test.top10;import java.util.HashMap;import java.util.Map;import java.util.NavigableMap;import java.util.TreeMap;import backtype.storm.task.TopologyContext;import backtype.storm.topology.BasicOutputCollector;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseBasicBolt;import backtype.storm.tuple.Tuple;import static com.bj.test.top10.SortMapByValue.*;public class Top10Bolt extends BaseBasicBolt {        Integer id;        String name;        NavigableMap counters;        /**      * Topology执行完毕的清理工作,比如关闭连接、释放资源等操作都会写在这里      * 因为这只是个Demo,我们用它来打印我们的计数器      * */          @Override        public void cleanup() {                System.out.println(">>>>>>>>>>>> Word Counter ["+name+"-"+id+"] <<<<<<<<<<<");                /*for(Map.Entry entry : counters.entrySet()){                        System.out.println(entry.getKey()+": "+entry.getValue());                }*/                printMap(list2Map(sortMapByValuesTopN(counters, 10)));        }        /**         * On create          */        @Override        public void prepare(Map stormConf, TopologyContext context) {                this.counters = new TreeMap().descendingMap();                this.name = context.getThisComponentId();                this.id = context.getThisTaskId();        }        @Override        public void declareOutputFields(OutputFieldsDeclarer declarer) {}        //      Bolt中最重要的是execute方法,每当一个tuple传过来时它便会被调用        @Override        public void execute(Tuple input, BasicOutputCollector collector) {                String word = input.getString(0);                /**                 * If the word dosn't exist in the map we will create                 * this, if not We will add 1                  */                if(!counters.containsKey(word)){                        counters.put(word, 1);                }else{                        Integer count = counters.get(word) + 1;                        counters.put(word, count);                }        }}/////////////////////////////////////////////////////////////////////////////////////////////package com.bj.test.top10;/** * @Author:tester * @DateTime:2016年6月21日 下午7:52:32 * @Description: * @Version:1.0*/import backtype.storm.Config;import backtype.storm.LocalCluster;import backtype.storm.topology.TopologyBuilder;import backtype.storm.tuple.Fields;public class TopologyMain {        public static void main(String[] args) throws InterruptedException {                // 定义一个Topology                TopologyBuilder builder = new TopologyBuilder();                // executor的数目, set parallelism hint to 4                builder.setSpout("PasswdSpout", new PasswdSpout(), 1);                // set tasks number to 4                builder.setBolt("Top10Bolt", new Top10Bolt(), 1).setNumTasks(1).fieldsGrouping("PasswdSpout",                                new Fields("word"));                // 配置                Config conf = new Config();                conf.put("wordsFile", "H:\\mysql\\csdn_database\\www.csdn.net.100.sql");                //              conf.put("wordsFile", "H:\\mysql\\csdn_database\\www.csdn.net.sql");                conf.setDebug(false);                conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);                // use two worker processes                // conf.setNumWorkers(4);                // 创建一个本地模式cluster                LocalCluster cluster = new LocalCluster();                // 提交Topology                cluster.submitTopology("Getting-Started-Toplogie", conf, builder.createTopology());                Thread.sleep(1000);                cluster.shutdown();        }}

关于"windows如何安装storm eclipse调试TopN实例"这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,使各位可以学到更多知识,如果觉得文章不错,请把它分享出去让更多的人看到。

0