学习日志---hadoop的join处理
发表于:2025-02-01 作者:千家信息网编辑
千家信息网最后更新 2025年02月01日,Join方法需求:处理input1和input2文件,两个文件中的id都一样,也就是key值一样,value值不同,把两者合并。input1存的是id和名字,input2存的是id和各种信息。处理方法
千家信息网最后更新 2025年02月01日学习日志---hadoop的join处理
Join方法
需求:处理input1和input2文件,两个文件中的id都一样,也就是key值一样,value值不同,把两者合并。input1存的是id和名字,input2存的是id和各种信息。
处理方法一:
package org.robby.join;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.*;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;public class MyReduceJoin{ public static class MapClass extends Mapper{ //map过程需要用到的中间变量 private Text key = new Text(); private Text value = new Text(); private String[] keyValue = null; @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //用逗号分开后传出 keyValue = value.toString().split(",", 2); this.key.set(keyValue[0]); this.value.set(keyValue[1]); context.write(this.key, this.value); } } public static class Reduce extends Reducer { private Text value = new Text(); @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { StringBuilder valueStr = new StringBuilder(); //reduce过程之所以可以用迭代出相同的id,因为shuffle过程进行了分区,排序,在进入reduce之前,有进行排序和分组, //相同的key的值默认分在一组 for(Text val : values) { valueStr.append(val); valueStr.append(","); } this.value.set(valueStr.deleteCharAt(valueStr.length()-1).toString()); context.write(key, this.value); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(MyReduceJoin.class); job.setMapperClass(MapClass.class); job.setReducerClass(Reduce.class); //reduce输出的格式 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); Path outputPath = new Path(args[1]); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, outputPath); outputPath.getFileSystem(conf).delete(outputPath, true); System.exit(job.waitForCompletion(true) ? 0 : 1); }}
方法一缺点:value值无需,可能第一个文件的value在前,也可能第二个文件的value在前;
处理方法二:
引入了一个自定义类型:
package org.robby.join;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.WritableComparable;public class CombineValues implements WritableComparable{ //这里的自定义类型,实现WritableComparable接口 //里面的数据使用的是hadoop自带的类型Text private Text joinKey; private Text flag; private Text secondPart; public void setJoinKey(Text joinKey) { this.joinKey = joinKey; } public void setFlag(Text flag) { this.flag = flag; } public void setSecondPart(Text secondPart) { this.secondPart = secondPart; } public Text getFlag() { return flag; } public Text getSecondPart() { return secondPart; } public Text getJoinKey() { return joinKey; } public CombineValues() { //构造时初始化数据,用set添加 this.joinKey = new Text(); this.flag = new Text(); this.secondPart = new Text(); } //序列与反序列化,其中体现为传入文件流,使用hadoop提供的文件流去传送数据 @Override public void write(DataOutput out) throws IOException { //因使用的是hadoop自带的Text,因此序列化时,可以用本身的Text,传入流out即可 this.joinKey.write(out); this.flag.write(out); this.secondPart.write(out); } @Override public void readFields(DataInput in) throws IOException { this.joinKey.readFields(in); this.flag.readFields(in); this.secondPart.readFields(in); } @Override public int compareTo(CombineValues o) { return this.joinKey.compareTo(o.getJoinKey()); } @Override public String toString() { // TODO Auto-generated method stub return "[flag="+this.flag.toString()+",joinKey="+this.joinKey.toString()+",secondPart="+this.secondPart.toString()+"]"; }}
处理过程:可以在mapper阶段通过context得到处理的文件是哪一个,因此可以分别处理。
package org.robby.join;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.*;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.FileSplit;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class MyReduceJoin1{ public static class Map extends Mapper{ private CombineValues combineValues = new CombineValues(); private Text flag = new Text(); private Text key = new Text(); private Text value = new Text(); private String[] keyValue = null; @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //FileSplit是文件块,通过context,文件处理可以的到处理的文件属于哪一个文件 String pathName = ((FileSplit) context.getInputSplit()).getPath().toString(); //通过pathName获得处理文件的名字,然后用flag进行标示 if(pathName.endsWith("input1.txt")) flag.set("0"); else flag.set("1"); combineValues.setFlag(flag); keyValue = value.toString().split(",", 2); combineValues.setJoinKey(new Text(keyValue[0])); combineValues.setSecondPart(new Text(keyValue[1])); this.key.set(keyValue[0]); //将封装的数据传出,key是id,用于分区排序分组,value是自定义的类,在main函数里需要说明 context.write(this.key, combineValues); } } public static class Reduce extends Reducer { private Text value = new Text(); private Text left = new Text(); private Text right = new Text(); @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { //因key一样,因此默认分在一组 for(CombineValues val : values) { System.out.println("val:" + val.toString()); Text secondPar = new Text(val.getSecondPart().toString()); //根据flag,来判断是左边还是右边 if(val.getFlag().toString().equals("0")){ System.out.println("left :" + secondPar); left.set(secondPar); } else{ System.out.println("right :" + secondPar); right.set(secondPar); } } //整合value,输出 Text output = new Text(left.toString() + "," + right.toString()); context.write(key, output); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(MyReduceJoin1.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); //这里要指明map的输出,因为默认是Text.class job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(CombineValues.class); //指明reduce的输出 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //job任务的文件输入和输出形式 job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); //job任务的输出与输入文件路径 Path outputPath = new Path(args[1]); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, outputPath); //通个outputPath,查看hdfs是否已有这个文件,有则删除 outputPath.getFileSystem(conf).delete(outputPath, true); System.exit(job.waitForCompletion(true) ? 0 : 1); }}
缺点:如果两个文件的条数不同,并且还需要把id相同的合并
处理方法三:
package org.robby.join;import java.io.IOException;import java.util.ArrayList;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.*;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.FileSplit;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class MyReduceJoin2{ public static class Map extends Mapper{ private CombineValues combineValues = new CombineValues(); private Text flag = new Text(); private Text key = new Text(); private Text value = new Text(); private String[] keyValue = null; @Override //map的处理和以前一样,分文件加flag标识,用自定义的类型封装输出 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String pathName = ((FileSplit) context.getInputSplit()).getPath().toString(); if(pathName.endsWith("input1.txt")) flag.set("0"); else flag.set("1"); combineValues.setFlag(flag); keyValue = value.toString().split(",", 2); combineValues.setJoinKey(new Text(keyValue[0])); combineValues.setSecondPart(new Text(keyValue[1])); this.key.set(keyValue[0]); context.write(this.key, combineValues); } } public static class Reduce extends Reducer { private Text value = new Text(); private Text left = new Text(); private ArrayList right = new ArrayList (); @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { right.clear(); for(CombineValues val : values) { //这里id相同的合并,有多个了 System.out.println("val:" + val.toString()); Text secondPar = new Text(val.getSecondPart().toString()); if(val.getFlag().toString().equals("0")){ left.set(secondPar); } else{ //文件一是名字,文件二是各种信息,因此存在一个list集合中 right.add(secondPar); } } for(Text t : right){ Text output = new Text(left.toString() + "," + t.toString()); context.write(key, output); } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(MyReduceJoin2.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(CombineValues.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); Path outputPath = new Path(args[1]); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, outputPath); outputPath.getFileSystem(conf).delete(outputPath, true); System.exit(job.waitForCompletion(true) ? 0 : 1); }}
其他处理方法:
使用distributedCache在mapper环节进行映射;
主要是重写mapper里面的setup方法,通个context去读取job传入的文件,然后存在mapper对象中,从而使得mapper在每次实现map方法时都可以调用这些预先存入的数据;
使用setup预先处理input1,则mapper的map方法处理input2即可。
package org.robby.join;import java.io.BufferedReader;import java.io.IOException;import java.io.InputStreamReader;import java.net.URI;import java.util.HashMap;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.*;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;public class MapJoinWithCache { public static class Map extends Mapper{ private CombineValues combineValues = new CombineValues(); private Text flag = new Text(); private Text key = new Text(); private Text value = new Text(); private String[] keyValue = null; //这个keyMap就是存文件数据供map共享的 private HashMap keyMap = null; @Override //这个map每行都会调用一次,传入数据 //每次都会访问keyMap集合 //因为setup方法处理了input1文件,因此这里只需要处理input2就行 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { keyValue = value.toString().split(",", 2); String name = keyMap.get(keyValue[0]); this.key.set(keyValue[0]); String output = name + "," + keyValue[1]; this.value.set(output); context.write(this.key, this.value); } @Override //这个setup方法是在mapper类初始化运行的方法 protected void setup(Context context) throws IOException, InterruptedException { //context传入文件路径 URI[] localPaths = context.getCacheFiles(); keyMap = new HashMap (); for(URI url : localPaths){ //通过uri打开hdfs文件系统 FileSystem fs = FileSystem.get(URI.create("hdfs://hadoop1:9000"), context.getConfiguration()); FSDataInputStream in = null; //打开hdfs的对应文件,需要path类创建并传入,获取流对象 in = fs.open(new Path(url.getPath())); BufferedReader br=new BufferedReader(new InputStreamReader(in)); String s1 = null; while ((s1 = br.readLine()) != null) { keyValue = s1.split(",", 2); keyMap.put(keyValue[0], keyValue[1]); System.out.println(s1); } br.close(); } } } public static class Reduce extends Reducer { //处理都在mpper中进行,reduce迭代分组后的数据就行 @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { for(Text val : values) context.write(key, val); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(MapJoinWithCache.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); Path outputPath = new Path(args[1]); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, outputPath); outputPath.getFileSystem(conf).delete(outputPath, true); //其他都一样,这里在job中加入了要传入的文件路径,用作cache //可以传入多个文件,文件全路径 job.addCacheFile(new Path(args[2]).toUri()); System.exit(job.waitForCompletion(true) ? 0 : 1); }}
其他linux指令:
[root@hadoop1 dataFile]# wc test* 6 14 35 test2.txt 7 16 41 test.txt13 30 76 total
可以通过wc查看文件的条数
文件
处理
方法
数据
输出
相同
类型
路径
过程
名字
序列
分组
排序
不同
两个
任务
信息
多个
对象
缺点
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
云栖大会阿里云数据库
软件的代理服务器怎么查
计算机三级网络技术综合题
从事网络安全专业的毕业生
浪潮云服务器怎么安装
怎么让服务器内存用在普通主板
互联网数据库自考教材00911
网络安全 海外财务经理
共享电单车软件开发公司
高性能的聊天软件开发
广州国产信创服务器什么价格
3级网络技术都考什么
相图数据库
常州市森林森互联网科技有限公司
windows7能下载数据库吗
网络技术应用实例
我的世界服务器只能跳
网络安全 老年群体
蓝调色软件开发
域名服务器位置
开物品的图片数据库
传达网络安全工作会议精神
江阴大型软件开发项目信息
苏州阿里云服务器如何接入
广州大学方滨兴网络安全
胃癌数据库名称有哪些
中山网络安全通知
产能规划软件开发
怎么搭建代理ip服务器
国际桥梁船撞事故数据库