千家信息网

Hadoop的整文件读取方法

发表于:2024-11-30 作者:千家信息网编辑
千家信息网最后更新 2024年11月30日,这篇文章主要讲解了"Hadoop的整文件读取方法",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"Hadoop的整文件读取方法"吧!写Hadoop程序时,
千家信息网最后更新 2024年11月30日Hadoop的整文件读取方法

这篇文章主要讲解了"Hadoop的整文件读取方法",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"Hadoop的整文件读取方法"吧!

写Hadoop程序时,有时候需要读取整个文件,而不是分片读取,但默认的为分片读取,所以,只有编写自己的整文件读取类。

需要编写的有:

WholeInputFormat类,继承自FileInputFormat类

WholeRecordReader类,继承自RecordReader类

其中,用于读取的类是WholeRecordReader类。以下代码以Text为key值类型,BytesWritable为value的类型,因为大多数格式在hadoop中都没有相应的类型支持,比如jpg,sdf,png等等,在hadoop中都没有相应的类,但是都可以转换为byte[]字节流,然后在转化为BytesWritable类型,最后在Map或者Reduce再转换成java中的相应类型。

代码如下,解释见 :

import java.io.IOException;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.BytesWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.InputSplit;import org.apache.hadoop.mapreduce.JobContext;import org.apache.hadoop.mapreduce.RecordReader;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;public class WholeInputFormat extends FileInputFormat{    @Override    public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context)      throws IOException,InterruptedException      {        return new WholeRecordReader();     }    @Override    //判断是否分片,false表示不分片,true表示分片。     //其实这个不写也可以,因为在WholeRecordReader中一次性全部读完     protected boolean isSplitable(JobContext context,Path file)     {         return false;     }}

下面是WholeRecordReader类:

import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.BytesWritable;import org.apache.hadoop.io.IOUtils;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.InputSplit;import org.apache.hadoop.mapreduce.RecordReader;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.input.FileSplit;public class WholeRecordReader extends RecordReader{     //Hadoop中处理文件的类     private FileSplit fileSplit;     private FSDataInputStream in = null;      private BytesWritable value = null;     private Text key = null;          //用于判断文件是否读取完成     //也就是因为这个,所以WholeInputFormat中的isSplitable方法可以不用写     private boolean processed = false;      @Override     public void close() throws IOException      {        //do nothing     }     @Override     public Text getCurrentKey() throws IOException, InterruptedException      {          return this.key;     }      @Override     public BytesWritable getCurrentValue() throws IOException,InterruptedException      {          return this.value;     }      @Override     public float getProgress() throws IOException, InterruptedException      {          return processed ? fileSplit.getLength() : 0;     }       @Override     public void initialize(InputSplit split, TaskAttemptContext context)throws IOException, InterruptedException      {          //打开一个文件输入流          fileSplit = (FileSplit)split;          Configuration job = context.getConfiguration();          Path file = fileSplit.getPath();          FileSystem temp = file.getFileSystem(job);          in = temp.open(file);     }     @Override     public boolean nextKeyValue() throws IOException, InterruptedException     {          if(key == null)          {              key = new Text();          }            if(value == null)          {              value = new BytesWritable();          }            if(!processed)          {              //申请一个字节数组保存将从文件中读取的内容              byte[] content = new byte[(int)fileSplit.getLength()];              Path file = fileSplit.getPath();              //以文件的名字作为传递给Map函数的key值,可以自行设置              key.set(file.getName());                      try{               //读取文件中的内容               IOUtils.readFully(in,content,0,content.length);               //将value的值设置为byte[]中的值               value.set(new BytesWritable(content));              }catch(IOException e)              {                   e.printStackTrace();              }finally{               //关闭输入流               IOUtils.closeStream(in);              }                      //将processed设置成true,表示读取文件完成,以后不再读取              processed = true;              return true;          }                  return false;     }}

当把这些写好后,在main()函数或者run()函数里面将job的输入格式设置成WholeInputFormat,如下:

job.setInputFormatClass(WholeInputFormat.class);

现在,可以整个文件读取了。其中,key,value的类型可以换成大家需要的类型。不过,当在Hadoop中找不到对应类型的时候建议用BytesWritable类型,然后用byte[]作为中间类型转化为java可以处理的类型。

感谢各位的阅读,以上就是"Hadoop的整文件读取方法"的内容了,经过本文的学习后,相信大家对Hadoop的整文件读取方法这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!

0