MapReduce的典型编程场景3
发表于:2025-02-02 作者:千家信息网编辑
千家信息网最后更新 2025年02月02日,1. 自定义InputFormat -数据分类输出 需求:小文件的合并 分析: - 在数据采集的时候,就将小文件或小批数据合成大文件再上传 HDFS - 在业务处理之前,在 HDF
千家信息网最后更新 2025年02月02日MapReduce的典型编程场景3
1. 自定义InputFormat -数据分类输出
需求:小文件的合并
分析:
- 在数据采集的时候,就将小文件或小批数据合成大文件再上传 HDFS
- 在业务处理之前,在 HDFS 上使用 MapReduce 程序对小文件进行合并
- 在 MapReduce 处理时,可采用 CombineFileInputFormat 提高效率
实现思路:
- 编写自定义的InoputFormat
- 改写 RecordReader,实现一次 maptask 读取一个小文件的完整内容封装到一个 KV 对
- 在Driver 类中一定要设置使用自定义的 InputFormat: job.setInputFormatClass(WholeFileInputFormat.class)
代码实现:
public class MergeDriver { //job public static void main(String[] args) { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://hadoop01:9000"); Job job = null; try { job = Job.getInstance(conf, "combine small files to bigfile"); job.setJarByClass(MergeDriver.class); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); job.setMapOutputKeyClass(NullWritable.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); //设置自定义输入的类 job.setInputFormatClass(MyMyFileInputForamt.class); Path input = new Path("/hadoop/input/num_add"); Path output = new Path("/hadoop/output/merge_output1"); //这里使用自定义得我FileInputForamt去格式化input MyMyFileInputForamt.addInputPath(job,input); FileSystem fs = FileSystem.get(conf); if (fs.exists(output)) { fs.delete(output, true); } FileOutputFormat.setOutputPath(job, output); int status = job.waitForCompletion(true) ? 0 : 1; System.exit(status); } catch (Exception e) { e.printStackTrace(); } } //Mapper static private class MyMapper extends Mapper { /* 这里的map方法就是每读取一个文件调用一次 */ @Override protected void map(NullWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException { context.write(key, value); } } //Reducer private static class MyReducer extends Reducer { @Override protected void reduce(NullWritable key, Iterable values, Reducer.Context context) throws IOException, InterruptedException { for (Text v : values) { context.write(key, v); } } } //RecordReader ,这种这个两个泛型,是map端输入的key和value的类型 private static class MyRecordReader extends RecordReader { // 输出的value对象 Text map_value = new Text(); // 文件系统对象,用于获取文件的输入流 FileSystem fs; // 判断当前文件是否已经读完 Boolean isReader = false; //文件的切片信息 FileSplit fileSplit; //初始化方法,类似于Mapper中的setup,整个类最开始运行 @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { //初始化文件系统对象 fs = FileSystem.get(context.getConfiguration()); //获取文件路径 fileSplit = (FileSplit) split; } //这个方法,在每次调用map中传入的K-V中,就是在这个方法中给K-V赋值的 @Override public boolean nextKeyValue() throws IOException, InterruptedException { //先读取一次 if (!isReader) { FSDataInputStream input = fs.open(fileSplit.getPath()); //一次性将整个小文件内容都读取出来 byte flush[] = new byte[(int) fileSplit.getLength()]; //将文件内容读取到这个byte数组中 /** * 参数一:读取的字节数组 * 参数二:开始读取的偏移量 * 参数三:读取的长度 */ input.readFully(flush, 0, (int) fileSplit.getLength()); isReader = true; map_value.set(flush); //将读取的内容,放置在map的value中 //保证能正好读一次,nextKeyValue()第一次返回true正好可以调用一次map,第二次返回false return isReader; } return false; } @Override public NullWritable getCurrentKey() throws IOException, InterruptedException { return NullWritable.get(); } @Override public Text getCurrentValue() throws IOException, InterruptedException { return map_value; } @Override public float getProgress() throws IOException, InterruptedException { return 0; } @Override public void close() throws IOException { fs.close(); } } //FileInputFormat private static class MyMyFileInputForamt extends FileInputFormat { @Override public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { MyRecordReader mr = new MyRecordReader(); //先调用初始化方法 mr.initialize(split, context); return mr; } }}
2. 自定义OutputFormat
需求:一些原始日志需要做增强解析处理,流程
- 从原始日志文件中读取数据
- 根据业务获取业务数据库中的数据
- 根据某个连接条件获取相应的连接结果
分析:
- 在 MapReduce 中访问外部资源
- 在业务处理之前,在 HDFS 上使用 MapReduce 程序对小文件进行合并
- 自定义 OutputFormat,改写其中的 RecordWriter,改写具体输出数据的方法 write() CombineFileInputFormat 提高效率
代码实现
//这里以一个简单的案例为例,将文件按照不同的等级输出的不同的文件中
public class Score_DiffDic { //job public static void main(String[] args) { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://hadoop01:9000"); Job job = null; try { job = Job.getInstance(conf, "Score_DiffDic"); job.setJarByClass(Score_DiffDic.class); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); //设置自定义输出类型 job.setOutputFormatClass(MyOutputFormat.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(DoubleWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(DoubleWritable.class); Path input = new Path("/hadoop/input/num_add"); FileInputFormat.addInputPath(job,input); Path output = new Path("/hadoop/output/merge_output1"); //这是自定义输出类型 MyOutputFormat.setOutputPath(job,output); FileSystem fs = FileSystem.get(conf); if (fs.exists(output)) { fs.delete(output, true); } FileOutputFormat.setOutputPath(job, output); int status = job.waitForCompletion(true) ? 0 : 1; System.exit(status); } catch (Exception e) { e.printStackTrace(); } } //Mapper private static class MyMapper extends Mapper{ Text mk=new Text(); DoubleWritable mv=new DoubleWritable(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] fields = value.toString().split("\\s+"); //computer,huangxiaoming,85 if(fields.length==3){ mk.set(fields[1]); mv.set(Double.parseDouble(fields[2])); context.write(mk, mv); } } } //Reducer private static class MyReducer extends Reducer{ DoubleWritable mv=new DoubleWritable(); @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { double sum=0; int count=0; for(DoubleWritable value:values){ sum+=value.get(); count++; } mv.set(sum/count); context.write(key,mv); } } //FileOutputFormat private static class MyOutputFormat extends FileOutputFormat { @Override public RecordWriter getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { FileSystem fs =FileSystem.get(job.getConfiguration()); return new MyRecordWrite(fs); } } //RecordWriter,这里的两个泛型是Reudcer输出K-V的类型 private static class MyRecordWrite extends RecordWriter { FileSystem fs; //输出的文件的路径 Path path2 = new Path("/hadoop/output/score_out1"); Path path3 = new Path("/hadoop/output/score_out2"); FSDataOutputStream output1; FSDataOutputStream output2; public MyRecordWrite() { } //初始化参数 public MyRecordWrite(FileSystem fs) { this.fs = fs; try { output1=fs.create(path2); output2=fs.create(path3); } catch (IOException e) { e.printStackTrace(); } } @Override public void write(Text key, DoubleWritable value) throws IOException, InterruptedException { //业务逻辑操作,平均分数大于80的在path2中,其他的在path3中 if(value.get()>80){ output1.write((key.toString()+":"+value.get()+"\n").getBytes()); }else{ output2.write((key.toString()+":"+value.get()+"\n").getBytes()); } } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { fs.close(); output1.close(); output2.close(); } }}
文件
输出
数据
方法
内容
参数
类型
处理
业务
对象
输入
不同
原始
两个
代码
就是
效率
数组
日志
程序
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
极光代理ip服务器解决上网限制
sci数据库是哪个国家的
提示服务器安全状态
服务器主板有集显吗
金融科技互联网服务上市
mysql数据库名能用点吗
国家安全局 网络安全
学什么数据库好
有网络技术专业的大学
软件开发岗位上游下游看法
发展和网络安全模式是什么
国企工厂做软件开发
家用服务器和nas区别
dns 免费 服务器
贵州gpu云服务器云空间
大唐高鸿网络安全事业部
小学生网络安全演讲稿免费
南昌大学计算机网络安全试卷
腾讯阿里巴巴数据库在哪
你以为的计算机网络技术专业
搜索管理类文献的外文数据库
数据库图片放在哪里了
软件开发课程设计论文
国服服务器查询
我国租用美国网络安全吗
启迪古汉软件开发部
中学生 网络安全 故事
做软件开发好找工作吗
哪个数据库可以查到EVA数据
个性化 软件开发