hadoop如何通过CombineFileInputFormat实现小文件合并减少map的个数
发表于:2025-02-14 作者:千家信息网编辑
千家信息网最后更新 2025年02月14日,小编给大家分享一下hadoop如何通过CombineFileInputFormat实现小文件合并减少map的个数,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大
千家信息网最后更新 2025年02月14日hadoop如何通过CombineFileInputFormat实现小文件合并减少map的个数
小编给大家分享一下hadoop如何通过CombineFileInputFormat实现小文件合并减少map的个数,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!
//map读入的键package hgs.combinefileinputformat.test;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.WritableComparable;public class CombineFileKey implements WritableComparable{ private String fileName; private long offset; public String getFileName() { return fileName; } public void setFileName(String fileName) { this.fileName = fileName; } public long getOffset() { return offset; } public void setOffset(long offset) { this.offset = offset; } @Override public void readFields(DataInput input) throws IOException { this.fileName = Text.readString(input); this.offset = input.readLong(); } @Override public void write(DataOutput output) throws IOException { Text.writeString(output, fileName); output.writeLong(offset); } @Override public int compareTo(CombineFileKey obj) { int f = this.fileName.compareTo(obj.fileName); if(f==0) return (int)Math.signum((double)(this.offset-obj.offset)); return f; } @Override public int hashCode() { //摘自于 http://www.idryman.org/blog/2013/09/22/process-small-files-on-hadoop-using-combinefileinputformat-1/ final int prime = 31; int result = 1; result = prime * result + ((fileName == null) ? 0 : fileName.hashCode()); result = prime * result + (int) (offset ^ (offset >>> 32)); return result; } @Override public boolean equals(Object o) { if(o instanceof CombineFileKey) return this.compareTo((CombineFileKey)o)==0; return false; }}
package hgs.combinefileinputformat.test;import java.io.IOException;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.InputSplit;import org.apache.hadoop.mapreduce.RecordReader;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;import org.apache.hadoop.util.LineReader;public class CombineFileReader extends RecordReader{ private long startOffset; //offset of the chunk; private long end; //end of the chunk; private long position; // current pos private FileSystem fs; private Path path; private CombineFileKey key; private Text value; private FSDataInputStream input; private LineReader reader; public CombineFileReader(CombineFileSplit split,TaskAttemptContext context , Integer index) throws IOException { //初始化path fs startOffset end this.path = split.getPath(index); this.fs = this.path.getFileSystem(context.getConfiguration()); this.startOffset = split.getOffset(index); this.end = split.getLength()+this.startOffset; //判断现在开始的位置是否在一行的内部 boolean skipFirstLine = false; //open the file this.input = fs.open(this.path); //不等于0说明读取位置在一行的内部 if(this.startOffset !=0 ){ skipFirstLine = true; --(this.startOffset); //定位到开始读取的位置 this.input.seek(this.startOffset); } //初始化reader this.reader = new LineReader(input); if(skipFirstLine){ // skip first line and re-establish "startOffset". //这里着这样做的原因是 一行可能包含了这个文件的所有的数据,猜测如果遇到一行的话,还是会读取一行 //将其实位置调整到一行的开始,这样的话会舍弃部分数据 this.startOffset += this.reader.readLine(new Text(), 0, (int)Math.min ((long)Integer.MAX_VALUE, this.end - this.startOffset)); } this.position = this.startOffset; } @Override public void close() throws IOException {} @Override public void initialize(InputSplit splite, TaskAttemptContext context) throws IOException, InterruptedException {} //返回当前的key @Override public CombineFileKey getCurrentKey() throws IOException, InterruptedException { return key; } //返回当前的value @Override public Text getCurrentValue() throws IOException, InterruptedException { return value; } //执行的进度 @Override public float getProgress() throws IOException, InterruptedException { //返回的类型为float if(this.startOffset==this.end){ return 0.0f; }else{ return Math.min(1.0f, (this.position - this.startOffset)/(float)(this.end - this.startOffset)); } } //该方法判断是否有下一个key value @Override public boolean nextKeyValue() throws IOException, InterruptedException { //对key和value初始化 if(this.key == null){ this.key = new CombineFileKey(); this.key.setFileName(this.path.getName()); } this.key.setOffset(this.position); if(this.value == null){ this.value = new Text(); } //读取一行数据,如果读取的newSieze=0说明split的数据已经处理完成 int newSize = 0; if(this.position package hgs.combinefileinputformat.test;import java.io.IOException;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.InputSplit;import org.apache.hadoop.mapreduce.JobContext;import org.apache.hadoop.mapreduce.RecordReader;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;public class CustCombineInputFormat extends CombineFileInputFormat{ public CustCombineInputFormat(){ super(); //最大切片大小 this.setMaxSplitSize(67108864);//64 MB } @Override public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException { return new CombineFileRecordReader ((CombineFileSplit)split,context,CombineFileReader.class); } @Override protected boolean isSplitable(JobContext context, Path file) { return false; }}//驱动类package hgs.test;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import hgs.combinefileinputformat.test.CustCombineInputFormat;public class LetterCountDriver { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); //conf.set("mapreduce.map.log.level", "INFO"); ///conf.set("mapreduce.reduce.log.level", "INFO"); Job job = Job.getInstance(conf, "LetterCount"); job.setJarByClass(hgs.test.LetterCountDriver.class); // TODO: specify a mapper job.setMapperClass(LetterCountMapper.class); // TODO: specify a reducer job.setReducerClass(LetterReducer.class); // TODO: specify output types job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); if(args[0].equals("1")) job.setInputFormatClass(CustCombineInputFormat.class); else{} // TODO: specify input and output DIRECTORIES (not files) FileInputFormat.setInputPaths(job, new Path("/words")); FileOutputFormat.setOutputPath(job, new Path("/result")); if (!job.waitForCompletion(true)) return; }} hdfs文件:
运行结果:不使用自定义的:CustCombineInputFormat
运行结果:在使用自定义的:CustCombineInputFormat
以上是"hadoop如何通过CombineFileInputFormat实现小文件合并减少map的个数"这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注行业资讯频道!
一行
数据
文件
位置
篇文章
个数
内容
结果
运行
最大
这样的话
不怎么
原因
大小
大部分
方法
更多
知识
类型
行业
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
广西泽婷互联网科技有限公司
涉及网络安全的部门
南山企业网络安全产品
数据库加群速度
阿里的网络安全员多少钱
按文件名读hdfs数据库
云服务器怎么退订
网络安全涉及物理方面
武汉达梦数据库 中标
郧西县天气预报软件开发
sql建立一个数据库并导入数据
功能都用数据库进行实现可以吗
微梦创客网络技术有限公司
国家网络安全周资料
鹤壁启帆互联网科技
数据库非法字符有哪些
腾讯 数据库 tdb
网络安全属法律范畴
软件开发如何物联网创业
网络技术的自我评价
珠海发货软件开发
数据库model作用
在软件开发中提出最早
android服务器
电商用云服务器安全吗
有线电视网络安全传输预案
乡镇卫生院网络安全工作自查报告
淘宝号rust号进不了服务器
dota2进游戏连接不了服务器
如何利用电脑搭建游戏服务器