千家信息网

hadoop中wordcount 、wordmean的示例代码

发表于:2025-02-05 作者:千家信息网编辑
千家信息网最后更新 2025年02月05日,小编给大家分享一下hadoop中wordcount 、wordmean的示例代码,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!注意hadoop src 可以在 hadoop官网上下载
千家信息网最后更新 2025年02月05日hadoop中wordcount 、wordmean的示例代码

小编给大家分享一下hadoop中wordcount 、wordmean的示例代码,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!

注意

  1. hadoop src 可以在 hadoop官网上下载。

  2. 分析中/**开头的为源码自带,//开头的为作者心得。

  3. hadoop安装环境下的/share/dc/hadoop/api下有自带的api链接,可以方便大家查看,学习。

wordcount:

package hadoop1;import java.io.IOException;import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;public class WordCount {  public static class TokenizerMapper        extends Mapper{        private final static IntWritable one = new IntWritable(1);    private Text word = new Text();          public void map(Object key, Text value, Context context                    ) throws IOException, InterruptedException {            //split into words by blank just like "//s" in  Regular Expression      StringTokenizer itr = new StringTokenizer(value.toString());      while (itr.hasMoreTokens()) {        word.set(itr.nextToken());        //format :,      context.write() is the format in map&&reduce to output        context.write(word, one);      }    }  }  public static class IntSumReducer        extends Reducer {    private IntWritable result = new IntWritable();    public void reduce(Text key, Iterable values,                        Context context                       ) throws IOException, InterruptedException {      int sum = 0;      //foreach value(a kind of word),      //do sum : ,......,------>, ,......,------>      for (IntWritableval : values) {        sum += val.get();      }      result.set(sum);      //format ,......      context.write(key, result);    }  }  public static void main(String[] args) throws Exception {    Configuration conf = new Configuration();    //get in/out path, format:otherArgs ={input1,input2......,output}    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();    if (otherArgs.length < 2) {      System.err.println("Usage: wordcount  [...] ");      System.exit(2);    }    //singleton    Job job = Job.getInstance(conf, "word count");    job.setJarByClass(WordCount.class);    job.setMapperClass(TokenizerMapper.class);        //combiner is just like a reducer on single node to reduce the net pressure,           //not all the task suit for combiner.    //so ,......,------>    job.setCombinerClass(IntSumReducer.class);    job.setReducerClass(IntSumReducer.class);        //mapper's output    job.setOutputKeyClass(Text.class);    job.setOutputValueClass(IntWritable.class);        //add new path for multinput (input1,input2...)    for (int i = 0; i < otherArgs.length - 1; ++i) {      FileInputFormat.addInputPath(job, new Path(otherArgs[i]));    }    FileOutputFormat.setOutputPath(job,      new Path(otherArgs[otherArgs.length - 1]));    //0 for normal exit,else not normal    System.exit(job.waitForCompletion(true) ? 0 : 1);  }}



wordmean:

package hadoop2;import java.io.BufferedReader;import java.io.IOException;import java.io.InputStreamReader;import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;import com.google.common.base.Charsets;public class WordMean extends Configured implements Tool {  private double mean = 0;    private final static Text COUNT = new Text("count");  private final static Text LENGTH = new Text("length");  private final static LongWritable ONE = new LongWritable(1);  /**   * Maps words from line of text into 2 key-value pairs; one key-value pair for   * counting the word, another for counting its length.   */  public static class WordMeanMapper extends      Mapper {    private LongWritable wordLen = new LongWritable();    /**     * Emits 2 key-value pairs for counting the word and its length. Outputs are     * (Text, LongWritable).     *      * @param value     *          This will be a line of text coming in from our input file.     */    public void map(Object key, Text value, Context context)        throws IOException, InterruptedException {      StringTokenizer itr = new StringTokenizer(value.toString());      while (itr.hasMoreTokens()) {        String string = itr.nextToken();        //map into format:word1,word2......,wordn.        //for each token(word) split by blank,         //set two kinds of------><"count",1>and<"length",string.length>        this.wordLen.set(string.length());        context.write(LENGTH, this.wordLen);        context.write(COUNT, ONE);      }    }  }  /**   * Performs integer summation of all the values for each key.   */  public static class WordMeanReducer extends      Reducer {// LongWritable is just like Long in java, //to implement hadoop's own type is for Serialization and Anti serialization    private LongWritable sum = new LongWritable();    /**     * Sums all the individual values within the iterator and writes them to the     * same key.     *      * @param key     *          This will be one of 2 constants: LENGTH_STR or COUNT_STR.     * @param values     *          This will be an iterator of all the values associated with that     *          key.     */    public void reduce(Text key, Iterable values, Context context)        throws IOException, InterruptedException {//for two constants:"count" and "length",//calculate the sum for each constant. :,....., -------> &&,......,----->      int theSum = 0;      for (LongWritableval : values) {        theSum += val.get();      }      sum.set(theSum);      context.write(key, sum);    }  }  /**   * Reads the output file and parses the summation of lengths, and the word   * count, to perform a quick calculation of the mean.   *    * @param path   *          The path to find the output file in. Set in main to the output   *          directory.   * @throws IOException   *           If it cannot access the output directory, we throw an exception.   */  private double readAndCalcMean(Path path, Configuration conf)      throws IOException {                  //read from reducers' output     FileSystem fs = FileSystem.get(conf);    Path file = new Path(path, "part-r-00000");    if (!fs.exists(file))      throw new IOException("Output not found!");    BufferedReader br = null;    // average = total sum(m in reduce) / number of elements(n in reduce);    try {            //BufferedReader is a Decorator to InputStreamReader, /for add the method .readLine() and so on.      br = new BufferedReader(new InputStreamReader(fs.open(file), Charsets.UTF_8));      long count = 0;      long length = 0;      String line;      while ((line = br.readLine()) != null) {                     StringTokenizer st = new StringTokenizer(line);                // grab type ------to spilt "count" and "length"        String type = st.nextToken();             // differentiate        if (type.equals(COUNT.toString())) {          String countLit = st.nextToken();          count = Long.parseLong(countLit);          System.out.println("The count is: " + count );//~ add by author :output total word count n        } else if (type.equals(LENGTH.toString())) {          String lengthLit = st.nextToken();          length = Long.parseLong(lengthLit);          System.out.println("The  length  is: " +  length  );//~ add by author :output total word length m        }      }      double theMean = (((double) length) / ((double) count));      System.out.println("The mean is: " + theMean);      return theMean;    } finally {      if (br != null) {        br.close();      }    }  }  public static void main(String[] args) throws Exception {    ToolRunner.run(new Configuration(), new WordMean(), args);  }  @Override  public int run(String[] args) throws Exception {    if (args.length != 2) {      System.err.println("Usage: wordmean  ");      return 0;    }    Configuration conf = getConf();    Job job = Job.getInstance(conf, "word mean");    job.setJarByClass(WordMean.class);    job.setMapperClass(WordMeanMapper.class);    job.setCombinerClass(WordMeanReducer.class);    job.setReducerClass(WordMeanReducer.class);    job.setOutputKeyClass(Text.class);    job.setOutputValueClass(LongWritable.class);    FileInputFormat.addInputPath(job, new Path(args[0]));    Path outputpath = new Path(args[1]);    FileOutputFormat.setOutputPath(job, outputpath);    boolean result = job.waitForCompletion(true);    mean = readAndCalcMean(outputpath, conf);    return (result ? 0 : 1);  }  /**   * Only valuable after run() called.   *    * @return Returns the mean value.   */  public double getMean() {    return mean;  }}

看完了这篇文章,相信你对"hadoop中wordcount 、wordmean的示例代码"有了一定的了解,如果想了解更多相关知识,欢迎关注行业资讯频道,感谢各位的阅读!

0