千家信息网

MapReduce Map Join怎么使用

发表于:2025-02-02 作者:千家信息网编辑
千家信息网最后更新 2025年02月02日,这篇文章主要介绍"MapReduce Map Join怎么使用",在日常操作中,相信很多人在MapReduce Map Join怎么使用问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希
千家信息网最后更新 2025年02月02日MapReduce Map Join怎么使用

这篇文章主要介绍"MapReduce Map Join怎么使用",在日常操作中,相信很多人在MapReduce Map Join怎么使用问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"MapReduce Map Join怎么使用"的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

1. 样例数据

011990-99999    SIHCCAJAVRI012650-99999    TYNSET-HANSMOEN


012650-99999    194903241200    111012650-99999    194903241800    78011990-99999    195005150700    0011990-99999    195005151200    22011990-99999    195005151800    -11


2. 需求


3. 思路、代码
足够小的关联文件(即气象台信息)添加到分布式缓存,然后在每个 Mapper 端读取被缓存到本地全量气象台信息,再与天气信息相关联。

import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;import java.io.BufferedReader;import java.io.FileReader;import java.io.IOException;import java.util.HashMap;import java.util.Map;public class MapJoin {    static class RecordMapper extends Mapper {        private Map stationMap = new HashMap();        @Override        protected void setup(Context context) throws IOException, InterruptedException {            //预处理,把要关联的文件加载到缓存中            Path[] paths = context.getLocalCacheFiles();            //新的检索缓存文件的API是 context.getCacheFiles() ,而 context.getLocalCacheFiles() 被弃用            //然而 context.getCacheFiles() 返回的是 HDFS 路径; context.getLocalCacheFiles() 返回的才是本地路径            //这里只缓存了一个文件,所以取第一个即可            BufferedReader reader = new BufferedReader(new FileReader(paths[0].toString()));            String line = null;            try {                while ((line = reader.readLine()) != null) {                    String[] vals = line.split("\\t");                    if (vals.length == 2) {                        stationMap.put(vals[0], vals[1]);                    }                }            } catch (Exception e) {                e.printStackTrace();            } finally {                reader.close();            }            super.setup(context);        }        @Override        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {            String[] vals = value.toString().split("\\t");            if (vals.length == 3) {                String stationName = stationMap.get(vals[0]); //Join                stationName = stationName == null ? "" : stationName;                context.write(new Text(vals[0]),                        new Text(stationName + "\t" + vals[1] + "\t" + vals[2]));            }        }    }    public static void main(String[] args) throws Exception {        Configuration conf = new Configuration();        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();        if (otherArgs.length != 3) {            System.err.println("Parameter number is wrong, please enter three parameters:  ");            System.exit(-1);        }        Path inputPath = new Path(otherArgs[0]);        Path stationPath = new Path(otherArgs[1]);        Path outputPath = new Path(otherArgs[2]);        Job job = Job.getInstance(conf, "MapJoin");        job.setJarByClass(MapJoin.class);        FileInputFormat.addInputPath(job, inputPath);        FileOutputFormat.setOutputPath(job, outputPath);        job.addCacheFile(stationPath.toUri()); //添加缓存文件,可添加多个        job.setMapperClass(RecordMapper.class);        job.setMapOutputKeyClass(Text.class);        System.exit(job.waitForCompletion(true) ? 0 : 1);    }}


4. 运行结果

到此,关于"MapReduce Map Join怎么使用"的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注网站,小编会继续努力为大家带来更多实用的文章!

0