十一、MapReduce--自定义Input输入
发表于:2025-01-29 作者:千家信息网编辑
千家信息网最后更新 2025年01月29日,在"MapReduce--input之输入原理"中说到实现定义输入的方法,其实就是继承InputFormat以及 RecordReader实现其中的方法。下面例子讲解操作。1、需求将多个文件合并成一个
千家信息网最后更新 2025年01月29日十一、MapReduce--自定义Input输入
在"MapReduce--input之输入原理"中说到实现定义输入的方法,其实就是继承InputFormat以及 RecordReader实现其中的方法。下面例子讲解操作。
1、需求
将多个文件合并成一个大文件(有点类似于combineInputFormat),并输出。大文件中包括小文件所在的路径,以及小文件的内容。
2、源码
inputFormat
public class SFileInputFormat extends FileInputFormat { /** * 是否切片 * @param context * @param filename * @return */ @Override protected boolean isSplitable(JobContext context, Path filename) { return false; } /** * 返回读取文件内容的读取器 * @param inputSplit * @param taskAttemptContext * @return * @throws IOException * @throws InterruptedException */ @Override public RecordReader createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { SRecordReader sRecordReader = new SRecordReader(); sRecordReader.initialize(inputSplit, taskAttemptContext); return sRecordReader; }}
RecordReader:
public class SRecordReader extends RecordReader { private Configuration conf; private FileSplit split; //当前分片是否已读取的标志位 private boolean process = false; private BytesWritable value = new BytesWritable(); /** * 初始化 * @param inputSplit * @param taskAttemptContext * @throws IOException * @throws InterruptedException */ @Override public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { split = (FileSplit)inputSplit; conf = taskAttemptContext.getConfiguration(); } /** * 从分片中读取下一个KV * @return * @throws IOException * @throws InterruptedException */ @Override public boolean nextKeyValue() throws IOException, InterruptedException { if (!process) { byte[] buffer = new byte[(int) split.getLength()]; //获取文件系统 Path path = split.getPath(); FileSystem fs = path.getFileSystem(conf); //创建输入流 FSDataInputStream fis = fs.open(path); //流对接,将数据读取缓冲区 IOUtils.readFully(fis, buffer, 0, buffer.length); //将数据装载入value value.set(buffer, 0, buffer.length); //关闭流 IOUtils.closeStream(fis); //读完就标志位设置为true,表示已读 process = true; return true; } return false; } @Override public NullWritable getCurrentKey() throws IOException, InterruptedException { return NullWritable.get(); } @Override public BytesWritable getCurrentValue() throws IOException, InterruptedException { return this.value; } @Override public float getProgress() throws IOException, InterruptedException { return process? 1 : 0; } @Override public void close() throws IOException { }}
mapper:
public class SFileMapper extends Mapper { Text k = new Text(); @Override protected void setup(Context context) throws IOException, InterruptedException { FileSplit inputSplit = (FileSplit)context.getInputSplit(); String name = inputSplit.getPath().toString(); k.set(name); } @Override protected void map(NullWritable key, BytesWritable value, Context context) throws IOException, InterruptedException { context.write(k, value); } }
reducer:
public class SFileReducer extends Reducer { @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { context.write(key, values.iterator().next()); }}
driver:
public class SFileDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { args = new String[]{"G:\\test\\date\\A\\order\\", "G:\\test\\date\\A\\order2\\"}; Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(SFileDriver.class); job.setMapperClass(SFileMapper.class); job.setReducerClass(SFileReducer.class); //设置输入和输出类,默认是 TextInputFormat job.setInputFormatClass(SFileInputFormat.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(BytesWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(BytesWritable.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); }}
自定义的inputformat需要在job中通过 job.setInputFormatClass() 来指定
文件
输入
内容
数据
方法
标志
输出
例子
原理
多个
就是
所在
源码
系统
缓冲区
路径
需求
中包
中通
并成
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
mndr数据库
服务器机柜招标参数
强化网络安全数据安全和
佛山网络安全培训学校哪家好
深圳合力发互联网科技有限公
湖南锐基网络技术有限公司
阳江数据库安全
最好的网络安全工程师
网络安全类的书籍
凯度网络技术
郑州中澳网络技术原国斌
正规软件开发预算
成都铁路局网络安全红线
警察进校园讲网络安全
湖南郴州学计算机软件开发招生
全境封锁数据库
软件开发设计V
交通软件开发现状
mimic iii数据库
晨溪互联网科技有限公司
win7系统可以装数据库吗
商用的数据库
花网络安全手抄报党史
初中生网络安全作业
网络安全领域重点检查
军职在线网络技术与应用实验
福建蜂巢网络技术
台式机用服务器内存效果怎样
web数据库访问技术
硕士论文查重对比数据库