千家信息网

Storm怎么改变并行度

发表于:2024-11-11 作者:千家信息网编辑
千家信息网最后更新 2024年11月11日,这篇文章主要介绍"Storm怎么改变并行度",在日常操作中,相信很多人在Storm怎么改变并行度问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"Storm怎么改变并行度"
千家信息网最后更新 2024年11月11日Storm怎么改变并行度

这篇文章主要介绍"Storm怎么改变并行度",在日常操作中,相信很多人在Storm怎么改变并行度问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"Storm怎么改变并行度"的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

package bolts; import java.util.ArrayList;import java.util.List;import java.util.Map; import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.IRichBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values; public class WordNormalizer implements IRichBolt{    private OutputCollector collector;     public void cleanup(){}     /**    * The bolt will receive the line from the    * words file and process it to Normalize this line    *    * The normalize will be put the words in lower case    * and split the line to get all words in this    */     public void execute(Tuple input) {        String sentence = input.getString(0);        String[]words= sentence.split(" ");        for(String word:words){            word =word.trim();            if(!word.isEmpty()){                word =word.toLowerCase();                //Emit the word                List a =new ArrayList();                a.add(input);                collector.emit(a,new Values(word));            }        }        // Acknowledge the tuple        collector.ack(input);    }     public void prepare(Map stormConf,TopologyContext context,OutputCollector collector) {        this.collector=collector;    }     /**    * The bolt will only emit the field "word"    */    public void declareOutputFields(OutputFieldsDeclarer declarer) {        declarer.declare(new Fields("word"));    } }

提示:在这个类中,每调用一次execute()方法,会发送多个元组。例如,当execute()方法收到"This is the Storm book"这个句子时,该方法会发送5个新元组。

第二个bolt,WordCounter,负责统计每个单词个数。当topology结束时(cleanup()方法被调用时),显示每个单词的个数。

提示:第二个bolt中什么也不发送,本例中,将数据添加到一个map对象中,但是现实生活中,bolt可以将数据存储到一个数据库中。

package bolts; import java.util.HashMap;import java.util.Map; import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.IRichBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Tuple; public class WordCounter implements IRichBolt{    Integer id;    String name;    Mapcounters;     private OutputCollector collector;     /**    * At the end of the spout (when the cluster is shutdown    * We will show the word counters    */     @Override     public void cleanup(){        System.out.println("-- Word Counter ["+name+"-"+id+"]--");        for(Map.Entryentry: counters.entrySet()){            System.out.println(entry.getKey()+": "+entry.getValue());        }    }     /**    * On each word We will count    */    @Override     public void execute(Tuple input) {        String str =input.getString(0);        /**        * If the word dosn't exist in the map we will create        * this, if not We will add 1        */        if(!counters.containsKey(str)){            counters.put(str,1);        }else{            Integer c =counters.get(str) +1;            counters.put(str,c);        }        //Set the tuple as Acknowledge        collector.ack(input);    }     /**    * On create    */     @Override     public void prepare(Map stormConf,TopologyContext context,OutputCollector collector) {        this.counters=newHashMap();        this.collector=collector;        this.name=context.getThisComponentId();        this.id=context.getThisTaskId();    }     @Override     public void declareOutputFields(OutputFieldsDeclarer declarer) {} }

execute()方法使用一个映射(Map类型)采集单词并统计这些单词个数。当topology结束的时候,cleanup()方法被调用并且打印出counter映射。(这仅仅是个例子,通常情况下,当topology关闭时,你应该使用cleanup()方法关闭活动链接和其他资源。)

主类

在主类中,你将创建topology和一个LocalCluster对象,LocalCluster对象使你可以在本地测试和调试topology。LocalCluster结合Config对象允许你尝试不同的集群配置。例如,如果不慎使用一个全局变量或者类变量,当配置不同数量的worker测试topology的时候,你将会发现这个错误。(关于config对象在第三章会有更多介绍)

提示:所有的topology结点应该可以在进程间没有数据共享的情形下独立运行(也就是说没有全局或者类变量),因为当topology运行在一个真实的集群上时,这些进程可能运行在不同的机器上。

你将使用TopologyBuilder创建topology,TopologyBuilder会告诉Storm怎么安排节点顺序、它们怎么交换数据。

TopologyBuilder builder =new TopologyBuilder();builder.setSpout("word-reader",new WordReader());builder.setBolt("word-normalizer",new WordNormalizer()).shuffleGrouping("word-reader");builder.setBolt("word-counter",new WordCounter(),2).fieldsGrouping("word-normalizer",new Fields("word"));

本例中spout和bolt之间使用随机分组(shuffleGrouping)连接,这种分组类型告诉Storm以随机分布的方式从源节点往目标节点发送消息。

接着,创建一个包含topology配置信息的Config对象,该配置信息在运行时会与集群配置信息合并,并且通过prepare()方法发送到所有节点。

Config conf =new Config();conf.put("wordsFile",args[0]);conf.setDebug(false);

将wordFile属性设置为将要被spout读取的文件名称(文件名在args参数中传入),并将debug属性设置为true,因为你在开发过程中,当debug为true时,Storm会打印节点间交换的所有消息和其他调试数据,这些信息有助于理解topology是如何运行的。

前面提到,你将使用LocalCluster来运行topology。在一个产品环境中,topology会持续运行,但是在本例中,你仅需运行topology几秒钟就能看到结果。

LocalCluster cluster =new LocalCluster();cluster.submitTopology("Getting-Started-Toplogie",conf,builder.createTopology());Thread.sleep(1000);cluster.shutdown();

使用createTopology和submitTopology创建、运行topology,睡眠两秒(topology运行在不同的线程中),然后通过关闭集群来停止topology。

例2-3将上面代码拼凑到一起。

例2-3.src/main/java/TopologyMain.java

import spouts.WordReader;import bolts.WordCounter;import bolts.WordNormalizer; import backtype.storm.Config;import backtype.storm.LocalCluster;import backtype.storm.topology.TopologyBuilder;import backtype.storm.tuple.Fields; public class TopologyMain{    public static void main(String[]args)throws InterruptedException{    //Topology definition        TopologyBuilder builder =new TopologyBuilder();        builder.setSpout("word-reader",new WordReader());        builder.setBolt("word-normalizer",new WordNormalizer()).shuffleGrouping("word-reader");        builder.setBolt("word-counter",new WordCounter(),2).fieldsGrouping("word-normalizer",new Fields("word"));     //Configuration        Config conf =new Config();        conf.put("wordsFile",args[0]);        conf.setDebug(false);     //Topology run        conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING,1);        LocalCluster cluster =new LocalCluster();        cluster.submitTopology("Getting-Started-Toplogie",conf,builder.createTopology());        Thread.sleep(1000);        cluster.shutdown();    } }

运行本项目

现在开始准备运行第一个topology!如果你新建一个文本文件(src/main/resources/words.txt)并且每行一个单词,则可以通过如下命令运行这个topology:

mvn exec:java -Dexec.mainClass="TopologyMain" -Dexec.args="src/main/resources/words.txt"

例如,如果你使用如下words.txt文件:

Storm
test
are
great
is
an
Storm
simple
application
but
very
powerful
really
Storm
is
great

在日志中,你将会看到类似如下信息:

is: 2
application: 1
but: 1
great: 1
test: 1
simple: 1
Storm: 3
really: 1
are: 1
great: 1
an: 1
powerful: 1
very: 1

在本例中,你只使用了每个结点的一个单一实例,假如此时有一个非常大的日志文件怎么去统计每个单词的个数?此时可以很方便地改系统中节点数量来并行工作,如创建WordCounter的两个实例:

1builder.setBolt( "word-counter" , new WordCounter(), 2 ).shuffleGrouping( "word-normalizer" );

重新运行这个程序,你将看到:

- Word Counter [word-counter-2] -
application: 1
is: 1
great: 1
are: 1
powerful: 1
Storm: 3
- Word Counter [word-counter-3] -
really: 1
is: 1
but: 1
great: 1
test: 1
simple: 1
an: 1
very: 1

太棒了!改变并行度,so easy(当然,在实际生活中,每个实例运行在不同的机器中)。但仔细一看似乎还有点问题:"is"和"great"这两个单词在每个WordCounter实例中都被计算了一次。Why?当使用随机分组(shuffleGrouping)时,Storm以随机分布的方式向每个bolt实例发送每条消息。在这个例子中,将相同的单词发送到同一个WordCounter实例是更理想的。为了实现这个,你可以将shuffleGrounping("word-normalizer")改成fieldsGrouping("word-normalizer",new Fields("word"))。尝试一下并重新运行本程序来确认结果。

到此,关于"Storm怎么改变并行度"的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注网站,小编会继续努力为大家带来更多实用的文章!

0