千家信息网

MapReduce怎么使用

发表于:2025-02-01 作者:千家信息网编辑
千家信息网最后更新 2025年02月01日,本篇内容主要讲解"MapReduce怎么使用",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"MapReduce怎么使用"吧!什么是MRMR是一种分布计算模型
千家信息网最后更新 2025年02月01日MapReduce怎么使用

本篇内容主要讲解"MapReduce怎么使用",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"MapReduce怎么使用"吧!

  1. 什么是MR

    MR是一种分布计算模型,主要用来解决海量数据的计算问题的。它包含了两种计算函数,一个是Mapping,另外一个是Reducing。Mapping对集合内的每个目标做同一个操作,Reduceing则是遍历集合中的元素返回一个综合的结果。我们操作代码时,只需要重写map和reduce方法就行,十分简单。这两个函数的形参都是k,v对,当数据量到达10PB以上时,则会速度变慢。

  2. MR执行过程

    MR程序启动时,会把输入文件转化成键值对传给map函数,有几个键值对就执行几次map函数,但不是说有几个键值对就有几个Mapper进程,这是不对的。经过map函数处理,变成键值对。由转变成reduce函数的输入的过程被称之为shuffle。shuffle并不是象map和reduce这样的某个函数,不是需要单独拿出节点运行的,它仅仅只是一个过程。进过reduce函数处理,变成了最后的输出。在到达reduce函数之前,键值对的数目是不变的。

    Map阶段

    (1).根据输入文件解析成对,每一对调用一次map函数

    (2).根据自己编写的map函数,将键值对处理,变成新的键值对输出

    (3).对输出的键值对进行分区,不同分区对应着不同的Reducer进程

    (4).每个分区中的键值对,根据key进行排序,分组。然后把相同key的val放到同一个集合中。

    (5).进行规约(可选)

Reduce阶段

(1).多个map函数输出的kv对,按照不同分区,传输到不同的reduce节点上。

(2).将多个map函数输出的kv对合并,排序。根据reduce函数逻辑,处理,转换成新的键值对输出

(3).输出保存文件

3.简单例子

Wordcount

public class WordCount { public static class  MyMapper extends Mapper{  Text k2=new Text();  LongWritable v2=new LongWritable();  @Override  protected void map(LongWritable k1, Text v1,Context context)    throws IOException, InterruptedException {    String[] words=v1.toString().split("\t");    for (String string : words) {     k2.set(string);     v2.set(1L);    context.write(k2, v2);   }  } } public static class MyReduce extends Reducer{  LongWritable v3=new LongWritable();  @Override  protected void reduce(Text k2, Iterable v2s,Context context) throws IOException, InterruptedException {   long sum=0;    for (LongWritable longWritable : v2s) {    sum=sum+longWritable.get();   }   v3.set(sum);   context.write(k2, v3);  }   } public static void main(String[] args) throws Exception {  Configuration conf=new Configuration();  Job job=Job.getInstance(conf, WordCount.class.getSimpleName());  job.setJarByClass(WordCount.class);  job.setMapperClass(MyMapper.class);  job.setReducerClass(MyReduce.class);    job.setMapOutputKeyClass(Text.class);  job.setMapOutputValueClass(LongWritable.class);  job.setOutputKeyClass(Text.class);  job.setOutputValueClass(LongWritable.class);    FileInputFormat.setInputPaths(job, new Path("hdfs://115.28.138.100:9000/a.txt"));  FileOutputFormat.setOutputPath(job, new Path("hdfs://115.28.138.100:9000/out4"));    job.waitForCompletion(true); }  }

4.MR的序列化

序列化就是把结构化的对象转换为字节流,在MR中,他没有用java自己的序列化,而是自己实现了一套序列化。因为相比较而言,hadoop的序列化有着诸多优点。在mr程序中,我们的参数和输出的键值对全都是实现了序列化的对象,当我们需要自订一个序列化对象,该如何操作呢?只需要实现Writable接口即可,当然key需要实现WritableComparable接口,因为需要根据key来排序和分组。

接着有个小例子来展示序列化。就是电信流量的处理例子。

public class LiuLiang { public static class MyMapper extends Mapper{  Text k2=new Text();  MyArrayWritable v2=new MyArrayWritable();  LongWritable v21=new LongWritable();  LongWritable v22=new LongWritable();  LongWritable v23=new LongWritable();  LongWritable v24=new LongWritable();  LongWritable[] values=new LongWritable[4];  @Override  protected void map(LongWritable k1, Text v1, Context context)      throws IOException, InterruptedException {    String[] words=v1.toString().split("\t");    k2.set(words[1]);    v21.set(Long.parseLong(words[6]));    v22.set(Long.parseLong(words[7]));    v23.set(Long.parseLong(words[8]));    v24.set(Long.parseLong(words[9]));    values[0]=v21;    values[1]=v22;    values[2]=v23;    values[3]=v24;    v2.set(values);    context.write(k2, v2);  } } public static class MyReduce extends Reducer{  Text v3=new Text();  @Override  protected void reduce(Text k2, Iterable v2s, Context context)      throws IOException, InterruptedException {    long sum1=0;    long sum2=0;    long sum3=0;    long sum4=0;    for (MyArrayWritable myArrayWritable : v2s) {     Writable[] values= myArrayWritable.get();     sum1=sum1+((LongWritable)values[0]).get();     sum2=sum2+((LongWritable)values[1]).get();     sum3=sum3+((LongWritable)values[2]).get();     sum4=sum4+((LongWritable)values[3]).get();   }    v3.set("\t"+sum1+"\t"+sum2+"\t"+sum3+"\t"+sum4);    context.write(k2, v3);  } } public static void main(String[] args) throws Exception {  Configuration conf=new Configuration();  Job job=Job.getInstance(conf, LiuLiang.class.getSimpleName());  job.setJarByClass(LiuLiang.class);  job.setMapperClass(MyMapper.class);  job.setReducerClass(MyReduce.class);    job.setMapOutputKeyClass(Text.class);  job.setMapOutputValueClass(MyArrayWritable.class);  job.setOutputKeyClass(Text.class);  job.setOutputValueClass(Text.class);    FileInputFormat.setInputPaths(job, new Path("hdfs://115.28.138.100:9000/HTTP_20130313143750.dat"));  FileOutputFormat.setOutputPath(job, new Path("hdfs://115.28.138.100:9000/ceshi3"));    job.waitForCompletion(true); } }class MyArrayWritable extends ArrayWritable{ public MyArrayWritable(){  super(LongWritable.class); } public MyArrayWritable(String[] arg0) {  super(arg0); } }

5.SequenceFile

在HDFS的学习中,提到了小文件的解决方案,其中一个便是这个SequenceFile。他是一种无序存储,将kv对序列化到文件中,从而合并许多小文件并且支持压缩。缺点是必须遍历才能查看里面各个小文件。

public class SequenceFileTest { public static void main(String[] args) throws Exception{  Configuration conf = new Configuration();  FileSystem fileSystem = FileSystem.get(new URI("hdfs://115.28.138.100:9000"), conf, "hadoop");  //Write(conf, fileSystem);  Read(conf, fileSystem); }   private static void Read(Configuration conf, FileSystem fileSystem) throws IOException {  Reader reader=new SequenceFile.Reader(fileSystem, new Path("/sqtest"), conf);  Text key=new Text();  Text val=new Text();  while(reader.next(key, val)){   System.out.println(key.toString()+"----"+val.toString());  }  IOUtils.closeStream(reader); }   private static void Write(Configuration conf, FileSystem fileSystem) throws IOException {  Writer writer = SequenceFile.createWriter(fileSystem, conf, new Path("/sqtest"), Text.class, Text.class);  Collection files = FileUtils.listFiles(new File("F:\\ceshi1"), new String[] { "txt" }, false);  for (File file : files) {   Text text = new Text();   text.set(FileUtils.readFileToString(file));   writer.append(new Text(file.getName()), text);  }  IOUtils.closeStream(writer); }}

到此,相信大家对"MapReduce怎么使用"有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

0