千家信息网

学习日志---hadoop的join处理

发表于:2025-02-01 作者:千家信息网编辑
千家信息网最后更新 2025年02月01日,Join方法需求:处理input1和input2文件,两个文件中的id都一样,也就是key值一样,value值不同,把两者合并。input1存的是id和名字,input2存的是id和各种信息。处理方法
千家信息网最后更新 2025年02月01日学习日志---hadoop的join处理

Join方法

需求:处理input1和input2文件,两个文件中的id都一样,也就是key值一样,value值不同,把两者合并。input1存的是id和名字,input2存的是id和各种信息。

处理方法一:

package org.robby.join;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.*;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;public class MyReduceJoin{    public static class MapClass extends         Mapper    {        //map过程需要用到的中间变量        private Text key = new Text();        private Text value = new Text();        private String[] keyValue = null;                @Override        protected void map(LongWritable key, Text value, Context context)            throws IOException, InterruptedException        {            //用逗号分开后传出            keyValue = value.toString().split(",", 2);            this.key.set(keyValue[0]);            this.value.set(keyValue[1]);            context.write(this.key, this.value);        }            }        public static class Reduce extends Reducer    {        private Text value = new Text();                @Override        protected void reduce(Text key, Iterable values, Context context)                throws IOException, InterruptedException        {            StringBuilder valueStr = new StringBuilder();                        //reduce过程之所以可以用迭代出相同的id,因为shuffle过程进行了分区,排序,在进入reduce之前,有进行排序和分组,            //相同的key的值默认分在一组            for(Text val : values)            {                valueStr.append(val);                valueStr.append(",");            }                        this.value.set(valueStr.deleteCharAt(valueStr.length()-1).toString());            context.write(key, this.value);        }            }        public static void main(String[] args) throws Exception    {        Configuration conf = new Configuration();        Job job = Job.getInstance(conf);                job.setJarByClass(MyReduceJoin.class);        job.setMapperClass(MapClass.class);        job.setReducerClass(Reduce.class);                //reduce输出的格式        job.setOutputKeyClass(Text.class);        job.setOutputValueClass(Text.class);                job.setInputFormatClass(TextInputFormat.class);        job.setOutputFormatClass(TextOutputFormat.class);                Path outputPath = new Path(args[1]);        FileInputFormat.addInputPath(job, new Path(args[0]));        FileOutputFormat.setOutputPath(job, outputPath);        outputPath.getFileSystem(conf).delete(outputPath, true);          System.exit(job.waitForCompletion(true) ? 0 : 1);    }}

方法一缺点:value值无需,可能第一个文件的value在前,也可能第二个文件的value在前;

处理方法二:

引入了一个自定义类型:

package org.robby.join;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.WritableComparable;public class CombineValues implements WritableComparable{    //这里的自定义类型,实现WritableComparable接口    //里面的数据使用的是hadoop自带的类型Text    private Text joinKey;    private Text flag;    private Text secondPart;        public void setJoinKey(Text joinKey) {        this.joinKey = joinKey;    }    public void setFlag(Text flag) {        this.flag = flag;    }    public void setSecondPart(Text secondPart) {        this.secondPart = secondPart;    }    public Text getFlag() {        return flag;    }    public Text getSecondPart() {        return secondPart;    }    public Text getJoinKey() {        return joinKey;    }    public CombineValues() {        //构造时初始化数据,用set添加        this.joinKey =  new Text();        this.flag = new Text();        this.secondPart = new Text();    }        //序列与反序列化,其中体现为传入文件流,使用hadoop提供的文件流去传送数据                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         @Override    public void write(DataOutput out) throws IOException {        //因使用的是hadoop自带的Text,因此序列化时,可以用本身的Text,传入流out即可        this.joinKey.write(out);        this.flag.write(out);        this.secondPart.write(out);    }    @Override    public void readFields(DataInput in) throws IOException {        this.joinKey.readFields(in);        this.flag.readFields(in);        this.secondPart.readFields(in);    }    @Override    public int compareTo(CombineValues o) {        return this.joinKey.compareTo(o.getJoinKey());    }        @Override    public String toString() {        // TODO Auto-generated method stub        return "[flag="+this.flag.toString()+",joinKey="+this.joinKey.toString()+",secondPart="+this.secondPart.toString()+"]";    }}

处理过程:可以在mapper阶段通过context得到处理的文件是哪一个,因此可以分别处理。

package org.robby.join;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.*;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.FileSplit;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class MyReduceJoin1{    public static class Map extends         Mapper    {        private CombineValues combineValues = new CombineValues();        private Text flag = new Text();        private Text key = new Text();        private Text value = new Text();        private String[] keyValue = null;                @Override        protected void map(LongWritable key, Text value, Context context)            throws IOException, InterruptedException        {            //FileSplit是文件块,通过context,文件处理可以的到处理的文件属于哪一个文件            String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();            //通过pathName获得处理文件的名字,然后用flag进行标示            if(pathName.endsWith("input1.txt"))                flag.set("0");            else                flag.set("1");                        combineValues.setFlag(flag);            keyValue = value.toString().split(",", 2);            combineValues.setJoinKey(new Text(keyValue[0]));            combineValues.setSecondPart(new Text(keyValue[1]));            this.key.set(keyValue[0]);            //将封装的数据传出,key是id,用于分区排序分组,value是自定义的类,在main函数里需要说明            context.write(this.key, combineValues);        }            }        public static class Reduce extends Reducer    {        private Text value = new Text();        private Text left = new Text();        private Text right = new Text();                @Override        protected void reduce(Text key, Iterable values, Context context)                throws IOException, InterruptedException        {            //因key一样,因此默认分在一组            for(CombineValues val : values)            {                System.out.println("val:" + val.toString());                Text secondPar = new Text(val.getSecondPart().toString());                //根据flag,来判断是左边还是右边                if(val.getFlag().toString().equals("0")){                    System.out.println("left :" + secondPar);                    left.set(secondPar);                }                else{                    System.out.println("right :" + secondPar);                    right.set(secondPar);                }            }                        //整合value,输出            Text output = new Text(left.toString() + "," + right.toString());                        context.write(key, output);        }            }        public static void main(String[] args) throws Exception    {        Configuration conf = new Configuration();        Job job = Job.getInstance(conf);                job.setJarByClass(MyReduceJoin1.class);        job.setMapperClass(Map.class);        job.setReducerClass(Reduce.class);                //这里要指明map的输出,因为默认是Text.class        job.setMapOutputKeyClass(Text.class);        job.setMapOutputValueClass(CombineValues.class);                //指明reduce的输出        job.setOutputKeyClass(Text.class);        job.setOutputValueClass(Text.class);                //job任务的文件输入和输出形式        job.setInputFormatClass(TextInputFormat.class);        job.setOutputFormatClass(TextOutputFormat.class);                //job任务的输出与输入文件路径        Path outputPath = new Path(args[1]);        FileInputFormat.addInputPath(job, new Path(args[0]));        FileOutputFormat.setOutputPath(job, outputPath);        //通个outputPath,查看hdfs是否已有这个文件,有则删除        outputPath.getFileSystem(conf).delete(outputPath, true);          System.exit(job.waitForCompletion(true) ? 0 : 1);    }}

缺点:如果两个文件的条数不同,并且还需要把id相同的合并

处理方法三:

package org.robby.join;import java.io.IOException;import java.util.ArrayList;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.*;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.FileSplit;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class MyReduceJoin2{    public static class Map extends         Mapper    {        private CombineValues combineValues = new CombineValues();        private Text flag = new Text();        private Text key = new Text();        private Text value = new Text();        private String[] keyValue = null;                @Override        //map的处理和以前一样,分文件加flag标识,用自定义的类型封装输出        protected void map(LongWritable key, Text value, Context context)            throws IOException, InterruptedException        {            String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();            if(pathName.endsWith("input1.txt"))                flag.set("0");            else                flag.set("1");                        combineValues.setFlag(flag);            keyValue = value.toString().split(",", 2);            combineValues.setJoinKey(new Text(keyValue[0]));            combineValues.setSecondPart(new Text(keyValue[1]));            this.key.set(keyValue[0]);            context.write(this.key, combineValues);        }            }        public static class Reduce extends Reducer    {        private Text value = new Text();        private Text left = new Text();        private ArrayList right = new ArrayList();                @Override        protected void reduce(Text key, Iterable values, Context context)                throws IOException, InterruptedException        {            right.clear();            for(CombineValues val : values)            {                //这里id相同的合并,有多个了                System.out.println("val:" + val.toString());                Text secondPar = new Text(val.getSecondPart().toString());                if(val.getFlag().toString().equals("0")){                    left.set(secondPar);                }                else{                    //文件一是名字,文件二是各种信息,因此存在一个list集合中                    right.add(secondPar);                }            }                        for(Text t : right){                Text output = new Text(left.toString() + "," + t.toString());                context.write(key, output);            }                    }            }        public static void main(String[] args) throws Exception    {        Configuration conf = new Configuration();        Job job = Job.getInstance(conf);                job.setJarByClass(MyReduceJoin2.class);        job.setMapperClass(Map.class);        job.setReducerClass(Reduce.class);                job.setMapOutputKeyClass(Text.class);        job.setMapOutputValueClass(CombineValues.class);                job.setOutputKeyClass(Text.class);        job.setOutputValueClass(Text.class);                job.setInputFormatClass(TextInputFormat.class);        job.setOutputFormatClass(TextOutputFormat.class);                Path outputPath = new Path(args[1]);        FileInputFormat.addInputPath(job, new Path(args[0]));        FileOutputFormat.setOutputPath(job, outputPath);        outputPath.getFileSystem(conf).delete(outputPath, true);          System.exit(job.waitForCompletion(true) ? 0 : 1);    }}

其他处理方法:

使用distributedCache在mapper环节进行映射;

主要是重写mapper里面的setup方法,通个context去读取job传入的文件,然后存在mapper对象中,从而使得mapper在每次实现map方法时都可以调用这些预先存入的数据;

使用setup预先处理input1,则mapper的map方法处理input2即可。

package org.robby.join;import java.io.BufferedReader;import java.io.IOException;import java.io.InputStreamReader;import java.net.URI;import java.util.HashMap;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.*;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;public class MapJoinWithCache {    public static class Map extends            Mapper {        private CombineValues combineValues = new CombineValues();        private Text flag = new Text();        private Text key = new Text();        private Text value = new Text();        private String[] keyValue = null;        //这个keyMap就是存文件数据供map共享的        private HashMap keyMap = null;        @Override        //这个map每行都会调用一次,传入数据        //每次都会访问keyMap集合        //因为setup方法处理了input1文件,因此这里只需要处理input2就行        protected void map(LongWritable key, Text value, Context context)                throws IOException, InterruptedException {            keyValue = value.toString().split(",", 2);            String name = keyMap.get(keyValue[0]);                        this.key.set(keyValue[0]);                        String output = name + "," + keyValue[1];            this.value.set(output);            context.write(this.key, this.value);        }        @Override        //这个setup方法是在mapper类初始化运行的方法        protected void setup(Context context) throws IOException,                InterruptedException {            //context传入文件路径            URI[] localPaths = context.getCacheFiles();                        keyMap = new HashMap();            for(URI url : localPaths){                 //通过uri打开hdfs文件系统                 FileSystem fs = FileSystem.get(URI.create("hdfs://hadoop1:9000"), context.getConfiguration());                 FSDataInputStream in = null;                 //打开hdfs的对应文件,需要path类创建并传入,获取流对象                 in = fs.open(new Path(url.getPath()));                 BufferedReader br=new BufferedReader(new InputStreamReader(in));                 String s1 = null;                 while ((s1 = br.readLine()) != null)                 {                     keyValue = s1.split(",", 2);                                          keyMap.put(keyValue[0], keyValue[1]);                     System.out.println(s1);                 }                 br.close();            }        }    }    public static class Reduce extends Reducer {        //处理都在mpper中进行,reduce迭代分组后的数据就行        @Override        protected void reduce(Text key, Iterable values,                Context context) throws IOException, InterruptedException {                        for(Text val : values)                context.write(key, val);                    }    }    public static void main(String[] args) throws Exception {        Configuration conf = new Configuration();        Job job = Job.getInstance(conf);        job.setJarByClass(MapJoinWithCache.class);        job.setMapperClass(Map.class);        job.setReducerClass(Reduce.class);        job.setMapOutputKeyClass(Text.class);        job.setMapOutputValueClass(Text.class);        job.setOutputKeyClass(Text.class);        job.setOutputValueClass(Text.class);        job.setInputFormatClass(TextInputFormat.class);        job.setOutputFormatClass(TextOutputFormat.class);        Path outputPath = new Path(args[1]);        FileInputFormat.addInputPath(job, new Path(args[0]));        FileOutputFormat.setOutputPath(job, outputPath);        outputPath.getFileSystem(conf).delete(outputPath, true);        //其他都一样,这里在job中加入了要传入的文件路径,用作cache        //可以传入多个文件,文件全路径        job.addCacheFile(new Path(args[2]).toUri());        System.exit(job.waitForCompletion(true) ? 0 : 1);    }}


其他linux指令:

[root@hadoop1 dataFile]# wc test* 6 14 35 test2.txt 7 16 41 test.txt13 30 76 total

可以通过wc查看文件的条数

附件:http://down.51cto.com/data/2366171
0