十五、MapReduce--自定义output输出
发表于:2025-01-31 作者:千家信息网编辑
千家信息网最后更新 2025年01月31日,我们要自定义输出时,首先继承两个抽象类,一个是 OutputFormat,一个是 RecordWriter。前者是主要是创建RecordWriter,后者就是主要实现 write方法来将kv写入文件。
千家信息网最后更新 2025年01月31日十五、MapReduce--自定义output输出
我们要自定义输出时,首先继承两个抽象类,一个是 OutputFormat,一个是 RecordWriter
。前者是主要是创建RecordWriter,后者就是主要实现 write方法来将kv写入文件。
1、需求
将reduce输出的KV中,如果key中包含特定字符串,则将其输出到一个文件中,剩下的KV则输出到另外的文件中。
2、源码
源数据
http://cn.bing.comhttp://www.baidu.comhttp://www.google.comhttp://www.itstar.comhttp://www.itstar1.comhttp://www.itstar2.comhttp://www.itstar3.comhttp://www.baidu.comhttp://www.sin2a.comhttp://www.sin2a.comw.google.comhttp://www.sin2desa.comhttp://www.sin2desa.comw.google.comhttp://www.sina.comhttp://www.sindsafa.comhttp://www.sohu.com
outputFormat
public class MyOutputFormat extends FileOutputFormat { @Override public RecordWriter getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { return new MyRecordWriter(taskAttemptContext); }}
RecordWriter
public class MyRecordWriter extends RecordWriter { private FSDataOutputStream startOut; private FSDataOutputStream otherOut; public MyRecordWriter(TaskAttemptContext job) { try { FileSystem fs = FileSystem.get(job.getConfiguration()); startOut = fs.create(new Path("G:\\test\\date\\A\\itstarlog\\logdir\\startout.log")); otherOut = fs.create(new Path("G:\\test\\date\\A\\itstarlog\\logdir\\otherout.log")); } catch (IOException e) { e.printStackTrace(); } } @Override public void write(Text key, NullWritable value) throws IOException, InterruptedException { String line = key.toString(); //如果key中包含itstar就写入到另外一个文件中 if (line.contains("itstar")) { this.startOut.writeUTF(line); } else { this.otherOut.writeUTF(line); } } @Override public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { this.startOut.close(); this.otherOut.close(); }}
mapper
public class MyOutputMapper extends Mapper { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { context.write(value, NullWritable.get()); }}
reducer
public class MyOutputReducer extends Reducer { Text k = new Text(); @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { String line = key.toString(); line = line + "\r\n"; k.set(line); context.write(k, NullWritable.get()); }}
driver
ublic class MyDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { args = new String[]{"G:\\test\\date\\A\\itstarlog\\A\\other.log", "G:\\test\\date\\A\\itstarlog\\logresult\\"}; Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(MyDriver.class); job.setMapperClass(MyOutputMapper.class); job.setReducerClass(MyOutputReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); //自定义输出的实现子类,也是继承FileOutputFormat job.setOutputFormatClass(MyOutputFormat.class); FileInputFormat.setInputPaths(job, new Path(args[0])); //这个路径输出的是job的执行成功successs文件的输出路径 FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); }}
输出
文件
路径
中包
成功
两个
子类
字符
字符串
就是
数据
方法
源码
需求
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
计算机应用软件开发有用吗
服务器就是数据库吗
重庆现代化软件开发代理品牌
荷兰网络安全专业
我国网络安全最突出的问题
湖南省委网信办网络安全
软件开发方法的主要生存模型
软件开发9001程序文件
人工智能软件开发有哪些
第二届民航网络安全张电生
重庆木鱼跳跳网络技术有限公司
医院服务器存储硬盘寿命
车载网络技术相关企业
软件开发除了瀑布模型
信息网络安全从我做起手抄报
网络安全入我心体会
网络技术应用层协议实训报告
网络安全小品相声
php操作数据库删除
企业的网络安全制度
九江服务器哪里比较好
云服务器安全防护等级
自考网络技术应用专业
软件开发后端工资一般多少
jade的数据库怎么发
中国人民解放军网络安全保密规定
社交系统中用户关系数据库设计
物联网安全数据库隐私保护
销售管理软件开发定制公司
广州手机软件开发电话