千家信息网

storm集群WordCount的示例分析

发表于:2025-01-23 作者:千家信息网编辑
千家信息网最后更新 2025年01月23日,小编给大家分享一下storm集群WordCount的示例分析,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!storm集群实例运行storm本地运行只需要storm的jar包就可以了,
千家信息网最后更新 2025年01月23日storm集群WordCount的示例分析

小编给大家分享一下storm集群WordCount的示例分析,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!

storm集群实例运行

storm本地运行只需要storm的jar包就可以了,结果可以在控制台直接看到,storm集群运行,结果要在log日志里看,或者存储下来。并且,集群运行,execute方法里的输出可以看到,但是cleanup里的输出是看不到的,因为cleanup只有在topology结束后才会执行,而storm是实时连续的运行的,所以输出放在execute里或者保存起来查看。

wordcount实例代码

代码在前面的博客里已经写了只是将WordCounter做了点修改

package com.storm.stormDemo;  import java.io.BufferedWriter;import java.io.FileWriter;import java.io.IOException;import java.util.HashMap;  import java.util.Iterator;import java.util.Map; import org.apache.log4j.Logger;import com.storm.stormTest.MergeObjects;import backtype.storm.task.OutputCollector;  import backtype.storm.task.TopologyContext;  import backtype.storm.topology.IRichBolt;  import backtype.storm.topology.OutputFieldsDeclarer;  import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;    public class WordCounter implements IRichBolt { public static Logger LOG = Logger.getLogger(WordCounter.class);    Integer id;      String name;      Map counters;      private OutputCollector collector;    BufferedWriter output;      public void prepare(Map stormConf, TopologyContext context,              OutputCollector collector) {          this.counters = new HashMap();          this.collector = collector;          this.name = context.getThisComponentId();          this.id = context.getThisTaskId();          try {   output = new BufferedWriter(new FileWriter("/home/zhanghuan/Downloads/wordcount.txt" , true));  } catch (IOException e) {   // TODO Auto-generated catch block   try {    output.close();   } catch (IOException e1) {    // TODO Auto-generated catch block    e1.printStackTrace();   }   e.printStackTrace();  }      }      public void execute(Tuple input) {          String str = input.getString(0);          if (!counters.containsKey(str)) {              counters.put(str, 1);          } else {              Integer c = counters.get(str) + 1;              counters.put(str, c);          }          Iterator iterator = counters.keySet().iterator();        while(iterator.hasNext()){         String next = iterator.next();         try {          System.out.print(next + ":" + counters.get(next) + " ");    output.write(next + ":" + counters.get(next) + " ");    output.flush();   } catch (IOException e) {    e.printStackTrace();    try {     output.close();    } catch (IOException e1) {     e1.printStackTrace();    }   }        }        // 确认成功处理一个tuple        collector.ack(input);      }      /**      * Topology执行完毕的清理工作,比如关闭连接、释放资源等操作都会写在这里      * 因为这只是个Demo,我们用它来打印我们的计数器      * */      public void cleanup() {       LOG.info("-- Word Counter [" + name + "-" + id + "] --");          for (Map.Entry entry : counters.entrySet()) {           LOG.info(entry.getKey() + ": " + entry.getValue());          }          counters.clear();      }      public void declareOutputFields(OutputFieldsDeclarer declarer) {          // TODO Auto-generated method stub       //declarer.declare(new Fields("word","number"));      }      public Map getComponentConfiguration() {          // TODO Auto-generated method stub          return null;      }  }

集群运行

storm jar StormDemo.jar com.storm.stormDemo.WordCountTopologyMain StormDemo /home/zhanghuan/Downloads/test.txt

注意:主函数路径要写全

如果集群报错如下:

则打开你打的第三方jar包文件夹,在里面找到storm-core-0.10.0.jar,删除这个jar包里的default.yarml文件,或则删掉你打的storm jar包。

topology提交后,会启动相应数量的worker进程和logwriter进程,ui界面上也能看到这个topology的运行

这时候你就可以查看log日志文件或者存储位置,查看结果。

没有数据输入的时候,日志就像最下方一样,保持通信。

停止topology运行

storm kill topology的名字

看完了这篇文章,相信你对"storm集群WordCount的示例分析"有了一定的了解,如果想了解更多相关知识,欢迎关注行业资讯频道,感谢各位的阅读!

0