MapReduce的典型编程场景3
发表于:2024-12-12 作者:千家信息网编辑
千家信息网最后更新 2024年12月12日,1. 自定义InputFormat -数据分类输出 需求:小文件的合并 分析: - 在数据采集的时候,就将小文件或小批数据合成大文件再上传 HDFS - 在业务处理之前,在 HDF
千家信息网最后更新 2024年12月12日MapReduce的典型编程场景3
1. 自定义InputFormat -数据分类输出
需求:小文件的合并
分析:
- 在数据采集的时候,就将小文件或小批数据合成大文件再上传 HDFS
- 在业务处理之前,在 HDFS 上使用 MapReduce 程序对小文件进行合并
- 在 MapReduce 处理时,可采用 CombineFileInputFormat 提高效率
实现思路:
- 编写自定义的InoputFormat
- 改写 RecordReader,实现一次 maptask 读取一个小文件的完整内容封装到一个 KV 对
- 在Driver 类中一定要设置使用自定义的 InputFormat: job.setInputFormatClass(WholeFileInputFormat.class)
代码实现:
public class MergeDriver { //job public static void main(String[] args) { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://hadoop01:9000"); Job job = null; try { job = Job.getInstance(conf, "combine small files to bigfile"); job.setJarByClass(MergeDriver.class); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); job.setMapOutputKeyClass(NullWritable.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); //设置自定义输入的类 job.setInputFormatClass(MyMyFileInputForamt.class); Path input = new Path("/hadoop/input/num_add"); Path output = new Path("/hadoop/output/merge_output1"); //这里使用自定义得我FileInputForamt去格式化input MyMyFileInputForamt.addInputPath(job,input); FileSystem fs = FileSystem.get(conf); if (fs.exists(output)) { fs.delete(output, true); } FileOutputFormat.setOutputPath(job, output); int status = job.waitForCompletion(true) ? 0 : 1; System.exit(status); } catch (Exception e) { e.printStackTrace(); } } //Mapper static private class MyMapper extends Mapper { /* 这里的map方法就是每读取一个文件调用一次 */ @Override protected void map(NullWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException { context.write(key, value); } } //Reducer private static class MyReducer extends Reducer { @Override protected void reduce(NullWritable key, Iterable values, Reducer.Context context) throws IOException, InterruptedException { for (Text v : values) { context.write(key, v); } } } //RecordReader ,这种这个两个泛型,是map端输入的key和value的类型 private static class MyRecordReader extends RecordReader { // 输出的value对象 Text map_value = new Text(); // 文件系统对象,用于获取文件的输入流 FileSystem fs; // 判断当前文件是否已经读完 Boolean isReader = false; //文件的切片信息 FileSplit fileSplit; //初始化方法,类似于Mapper中的setup,整个类最开始运行 @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { //初始化文件系统对象 fs = FileSystem.get(context.getConfiguration()); //获取文件路径 fileSplit = (FileSplit) split; } //这个方法,在每次调用map中传入的K-V中,就是在这个方法中给K-V赋值的 @Override public boolean nextKeyValue() throws IOException, InterruptedException { //先读取一次 if (!isReader) { FSDataInputStream input = fs.open(fileSplit.getPath()); //一次性将整个小文件内容都读取出来 byte flush[] = new byte[(int) fileSplit.getLength()]; //将文件内容读取到这个byte数组中 /** * 参数一:读取的字节数组 * 参数二:开始读取的偏移量 * 参数三:读取的长度 */ input.readFully(flush, 0, (int) fileSplit.getLength()); isReader = true; map_value.set(flush); //将读取的内容,放置在map的value中 //保证能正好读一次,nextKeyValue()第一次返回true正好可以调用一次map,第二次返回false return isReader; } return false; } @Override public NullWritable getCurrentKey() throws IOException, InterruptedException { return NullWritable.get(); } @Override public Text getCurrentValue() throws IOException, InterruptedException { return map_value; } @Override public float getProgress() throws IOException, InterruptedException { return 0; } @Override public void close() throws IOException { fs.close(); } } //FileInputFormat private static class MyMyFileInputForamt extends FileInputFormat { @Override public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { MyRecordReader mr = new MyRecordReader(); //先调用初始化方法 mr.initialize(split, context); return mr; } }}
2. 自定义OutputFormat
需求:一些原始日志需要做增强解析处理,流程
- 从原始日志文件中读取数据
- 根据业务获取业务数据库中的数据
- 根据某个连接条件获取相应的连接结果
分析:
- 在 MapReduce 中访问外部资源
- 在业务处理之前,在 HDFS 上使用 MapReduce 程序对小文件进行合并
- 自定义 OutputFormat,改写其中的 RecordWriter,改写具体输出数据的方法 write() CombineFileInputFormat 提高效率
代码实现
//这里以一个简单的案例为例,将文件按照不同的等级输出的不同的文件中
public class Score_DiffDic { //job public static void main(String[] args) { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://hadoop01:9000"); Job job = null; try { job = Job.getInstance(conf, "Score_DiffDic"); job.setJarByClass(Score_DiffDic.class); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); //设置自定义输出类型 job.setOutputFormatClass(MyOutputFormat.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(DoubleWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(DoubleWritable.class); Path input = new Path("/hadoop/input/num_add"); FileInputFormat.addInputPath(job,input); Path output = new Path("/hadoop/output/merge_output1"); //这是自定义输出类型 MyOutputFormat.setOutputPath(job,output); FileSystem fs = FileSystem.get(conf); if (fs.exists(output)) { fs.delete(output, true); } FileOutputFormat.setOutputPath(job, output); int status = job.waitForCompletion(true) ? 0 : 1; System.exit(status); } catch (Exception e) { e.printStackTrace(); } } //Mapper private static class MyMapper extends Mapper{ Text mk=new Text(); DoubleWritable mv=new DoubleWritable(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] fields = value.toString().split("\\s+"); //computer,huangxiaoming,85 if(fields.length==3){ mk.set(fields[1]); mv.set(Double.parseDouble(fields[2])); context.write(mk, mv); } } } //Reducer private static class MyReducer extends Reducer{ DoubleWritable mv=new DoubleWritable(); @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { double sum=0; int count=0; for(DoubleWritable value:values){ sum+=value.get(); count++; } mv.set(sum/count); context.write(key,mv); } } //FileOutputFormat private static class MyOutputFormat extends FileOutputFormat { @Override public RecordWriter getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { FileSystem fs =FileSystem.get(job.getConfiguration()); return new MyRecordWrite(fs); } } //RecordWriter,这里的两个泛型是Reudcer输出K-V的类型 private static class MyRecordWrite extends RecordWriter { FileSystem fs; //输出的文件的路径 Path path2 = new Path("/hadoop/output/score_out1"); Path path3 = new Path("/hadoop/output/score_out2"); FSDataOutputStream output1; FSDataOutputStream output2; public MyRecordWrite() { } //初始化参数 public MyRecordWrite(FileSystem fs) { this.fs = fs; try { output1=fs.create(path2); output2=fs.create(path3); } catch (IOException e) { e.printStackTrace(); } } @Override public void write(Text key, DoubleWritable value) throws IOException, InterruptedException { //业务逻辑操作,平均分数大于80的在path2中,其他的在path3中 if(value.get()>80){ output1.write((key.toString()+":"+value.get()+"\n").getBytes()); }else{ output2.write((key.toString()+":"+value.get()+"\n").getBytes()); } } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { fs.close(); output1.close(); output2.close(); } }}
文件
输出
数据
方法
内容
参数
类型
处理
业务
对象
输入
不同
原始
两个
代码
就是
效率
数组
日志
程序
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
qt连接mysql数据库有驱动
数据库恢复技术原理
2019源刊数据库
比较两个集合差异的数据库
河北成就未来网络技术有限公司
vc 编程用什么软件开发
数据库嵌入式
巴彦淖尔软件开发培训学校
it软件开发限制年龄吗
熊猫酒仙在哪个服务器
饥荒配置服务器
2019网络安全周宣传传单
淮安网络安全管理
软件开发工程师需要什么配置电脑
期刊数据库技术
ctf网络安全大赛奖
云服务器能用几年
如何设置数据库中表的输入掩码
洛克王国忘了服务器
c 数据库编程
vb 数据库 外国
软件开发项目公司电话
网络安全密码忘了怎么办啊
如何保存数据库
服务器系统文件误删
山西丰巢网络技术有限公司
禁毒教育软件开发
深圳工控软件开发需要多少钱
签署网络安全战略服务合作协议
ctf网络安全大赛奖