千家信息网

贴一段java读取hdfs 解压gz zip tar.gz保存到hdfs的代码

发表于:2025-01-23 作者:千家信息网编辑
千家信息网最后更新 2025年01月23日,package main.java;import java.io.*;import java.util.LinkedList;import java.util.List;import java.uti
千家信息网最后更新 2025年01月23日贴一段java读取hdfs 解压gz zip tar.gz保存到hdfs的代码

package main.java;

import java.io.*;
import java.util.LinkedList;
import java.util.List;
import java.util.zip.*;


import org.apache.commons.compress.archivers.ArchiveException;

import org.apache.commons.compress.archivers.ArchiveInputStream;

import org.apache.commons.compress.archivers.ArchiveStreamFactory;

import org.apache.commons.compress.archivers.tar.TarArchiveEntry;

import java.io.IOException;
import java.net.URI;

import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;;


/**

* 解压tar.gz zip gz文件包 这里的数据源和输出目录都为HDFS

*

*/

public class GZipHdfs {


private BufferedOutputStream bufferedOutputStream;
String zipfileName = null;


public GZipHdfs(String fileName) {

this.zipfileName = fileName;

}

/*

* 执行入口,rarFileName为需要解压的文件路径(具体到文件),destDir为解压目标路径 路径为HDFS

*/

public List unTargzFile(String rarFileName, String destDir) throws IOException {

GZipHdfs GZipHdfs = new GZipHdfs(rarFileName);
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(destDir), conf);
boolean result = fs.isDirectory(new Path(destDir));
if (!result) {
fs.mkdirs(new Path(destDir));
}
String outputDirectory = destDir;

List r = GZipHdfs.defUnTargzFile(outputDirectory, fs);
fs.close();
return r;

}


public List defUnTargzFile(String outputDirectory, FileSystem fs) {

FileInputStream fis = null;
ArchiveInputStream in = null;
BufferedInputStream bufferedInputStream = null;
List tarList = new LinkedList();

try {

FSDataInputStream hdfsInputStream = fs.open(new Path(zipfileName));


GZIPInputStream is = new GZIPInputStream(new BufferedInputStream(
hdfsInputStream));

in = new ArchiveStreamFactory().createArchiveInputStream("tar", is);

bufferedInputStream = new BufferedInputStream(in);

TarArchiveEntry entry = (TarArchiveEntry) in.getNextEntry();


while (entry != null) {

String name = entry.getName();

String[] names = name.split("/");

String fileName = outputDirectory;

for (int i = 0; i < names.length; i++) {

String str = names[i];

fileName = fileName + "/" + str;

}

FSDataOutputStream hdfsOutStream = fs.create(new Path(fileName));


bufferedOutputStream = new BufferedOutputStream(
hdfsOutStream);

int b;

while ((b = bufferedInputStream.read()) != -1) {

bufferedOutputStream.write(b);

}

bufferedOutputStream.flush();

bufferedOutputStream.close();


entry = (TarArchiveEntry) in.getNextEntry();

tarList.add(name);

}


} catch (FileNotFoundException e) {

e.printStackTrace();

} catch (IOException e) {

e.printStackTrace();

} catch (ArchiveException e) {

e.printStackTrace();

} finally {

try {

if (bufferedInputStream != null) {

bufferedInputStream.close();


}

} catch (IOException e) {

e.printStackTrace();

}

}
return tarList;

}
/*

* 执行入口,rarFileName为需要解压的文件路径(具体到文件),destDir为解压目标路径 路径为HDFS

*/

public List unZipFile(String rarFileName, String destDir) throws IOException {

GZipHdfs GZipHdfs = new GZipHdfs(rarFileName);
Configuration conf = new Configuration();

FileSystem fs = FileSystem.get(URI.create(destDir), conf);
boolean result = fs.isDirectory(new Path(destDir));
if (!result) {
fs.mkdirs(new Path(destDir));
}
String outputDirectory = destDir;

List r = GZipHdfs.defUnZipFile(outputDirectory, fs);
fs.close();
return r;

}


public List defUnZipFile(String outputDirectory, FileSystem fs) {

FileInputStream fis = null;
ArchiveInputStream in = null;
BufferedInputStream bufferedInputStream = null;
List zipList = new LinkedList();

try {


FSDataInputStream hdfsInputStream = fs.open(new Path(zipfileName));
ZipInputStream is = new ZipInputStream(new BufferedInputStream(
hdfsInputStream));


bufferedInputStream = new BufferedInputStream(is);
ZipEntry entry =is.getNextEntry();


while (entry != null) {

String name = entry.getName();

String[] names = name.split("/");

String fileName = outputDirectory;

for (int i = 0; i < names.length; i++) {

String str = names[i];

fileName = fileName + "/" + str;

}

FSDataOutputStream hdfsOutStream = fs.create(new Path(fileName));

bufferedOutputStream = new BufferedOutputStream(
hdfsOutStream);

int b;

while ((b = bufferedInputStream.read()) != -1) {

bufferedOutputStream.write(b);

}

bufferedOutputStream.flush();

bufferedOutputStream.close();

entry = (ZipEntry) is.getNextEntry();

zipList.add(name);

}


} catch (FileNotFoundException e) {

e.printStackTrace();

} catch (IOException e) {

e.printStackTrace();

} finally {

try {

if (bufferedInputStream != null) {

bufferedInputStream.close();


}

} catch (IOException e) {

e.printStackTrace();

}

}
return zipList;

}
/*

* 执行入口,rarFileName为需要解压的文件路径(具体到文件),destDir为解压目标路径 路径为HDFS

*/


public List unGZipFile(String rarFileName, String destDir) throws IOException {

GZipHdfs GZipHdfs = new GZipHdfs(rarFileName);
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(destDir), conf);
boolean result = fs.isDirectory(new Path(destDir));
if (!result) {
fs.mkdirs(new Path(destDir));
}
String outputDirectory = destDir;

List r = GZipHdfs.defUnGZipFile(outputDirectory, fs);
fs.close();
return r;

}


public List defUnGZipFile(String outputDirectory, FileSystem fs) {

FileInputStream fis = null;
ArchiveInputStream in = null;
BufferedInputStream bufferedInputStream = null;
List tarList = new LinkedList();

try {

FSDataInputStream hdfsInputStream = fs.open(new Path(zipfileName));


GzipCompressorInputStream is = new GzipCompressorInputStream(new BufferedInputStream(
hdfsInputStream));
bufferedInputStream = new BufferedInputStream(is);


String[] nameList = zipfileName.split("/");
String name=nameList[nameList.length-1].replace(".gz","");
String fileName = outputDirectory+"/"+name;
FSDataOutputStream hdfsOutStream = fs.create(new Path(fileName));
bufferedOutputStream = new BufferedOutputStream(
hdfsOutStream);
int b;
while ((b = bufferedInputStream.read()) != -1) {
bufferedOutputStream.write(b);

}
bufferedOutputStream.flush();
bufferedOutputStream.close();
tarList.add(name);




} catch (FileNotFoundException e) {

e.printStackTrace();

} catch (IOException e) {

e.printStackTrace();

} finally {

try {

if (bufferedInputStream != null) {

bufferedInputStream.close();


}

} catch (IOException e) {

e.printStackTrace();

}

}
return tarList;

}


}

0