千家信息网

hdfs常用API和putMerge功能实现

发表于:2025-02-01 作者:千家信息网编辑
千家信息网最后更新 2025年02月01日,所需jar包一、URL API操作方式import java.io.InputStream;import java.net.URL;import org.apache.hadoop.fs.FsUrlS
千家信息网最后更新 2025年02月01日hdfs常用API和putMerge功能实现

所需jar包

一、URL API操作方式

import java.io.InputStream;import java.net.URL;import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;import org.apache.hadoop.io.IOUtils;import org.junit.Test;public class HDFSUrlTest {    /**     * HDFS URL API操作方式     * 不需要读取core-site.xml和hdfs-site.xml配置文件     */    // 让JAVA程序识别HDFS的URL    static {        URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());    }    // 查看文件内容    @Test    public void testRead() throws Exception {        InputStream in = null;        // 文件路径        String fileUrl = "hdfs://hadoop-master.dragon.org:9000/opt/data/test/01.data";        try {            // 获取文件输入流            in = new URL(fileUrl).openStream();            // 将文件内容读取出来,打印控制台            IOUtils.copyBytes(in, System.out, 4096, false);        } finally {            IOUtils.closeStream(in);        }    }}


二、通过FileSystem API操作HDFS


HDFS工具类


import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;public class HDFSUtils {/** *     HDFS工具类 */        public static FileSystem  getFileSystem() {        //声明FileSystem        FileSystem hdfs=null;        try {                        //获取文件配置信息            Configuration conf =new Configuration();            //获取文件系统            hdfs=FileSystem.get(conf);        } catch (IOException e) {                        e.printStackTrace();        }        return hdfs;            }    }



常用操作实现类


import org.apache.hadoop.fs.BlockLocation;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.FSDataOutputStream;import org.apache.hadoop.fs.FileStatus;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.hdfs.DistributedFileSystem;import org.apache.hadoop.hdfs.protocol.DatanodeInfo;import org.apache.hadoop.io.IOUtils;import org.apache.hadoop.mapred.gethistory_jsp;import org.junit.Test;public class HDFSFsTest {    /**     *      * 通过FileSystem API操作HDFS     */    // 读取文件内容    @Test    public void testRead() throws Exception {        // 获取文件系统        FileSystem hdfs = HDFSUtils.getFileSystem();        // 文件名称        Path path = new Path("/opt/data/test/touch.data");        // 打开文件输入流        FSDataInputStream inStream = hdfs.open(path);        // 读取文件到控制台显示        IOUtils.copyBytes(inStream, System.out, 4096, false);        // 关闭流        IOUtils.closeStream(inStream);    }    // 查看目录    @Test    public void testList() throws Exception {        FileSystem hdfs = HDFSUtils.getFileSystem();        // 文件名称        Path path = new Path("/opt/data");        FileStatus[] fileStatus = hdfs.listStatus(path);        for (FileStatus file : fileStatus) {            Path p = file.getPath();            String info = file.isDir() ? "目录" : "文件";            System.out.println(info + ":" + p);        }    }    // 创建目录    @Test    public void testDirectory() throws Exception {        FileSystem hdfs = HDFSUtils.getFileSystem();        // 要创建的目录        Path path = new Path("/opt/data/dir");        boolean isSuccessful = hdfs.mkdirs(path);// 相当于 linux下 mkdir -p                                                    // /opt/data/dir        String info = isSuccessful ? "成功" : "失败";        System.out.println("创建目录【" + path + "】" + info);    }    // 上传文件-- put copyFromLocal    @Test    public void testPut() throws Exception {        FileSystem hdfs = HDFSUtils.getFileSystem();        // 本地文件(目录+文件名称)        Path srcPath = new Path("c:/0125.log");        // hdfs文件上传路径        Path dstPath = new Path("/opt/data/dir/");        hdfs.copyFromLocalFile(srcPath, dstPath);    }    // 创建hdfs文件并写入内容    @Test    public void testCreate() throws Exception {        FileSystem hdfs = HDFSUtils.getFileSystem();        Path path = new Path("/opt/data/dir/touch.data");        // 创建文件并获取输出流        FSDataOutputStream fSDataOutputStream = hdfs.create(path);        // 通过输出流写入数据        fSDataOutputStream.write("你好".getBytes());        fSDataOutputStream.writeUTF("hello hadoop!");        IOUtils.closeStream(fSDataOutputStream);    }    // 文件重命名    @Test    public void testRename() throws Exception {        FileSystem hdfs = HDFSUtils.getFileSystem();        Path oldPath = new Path("/opt/data/dir/touch.data");        Path newPath = new Path("/opt/data/dir/rename.data");        boolean flag = hdfs.rename(oldPath, newPath);        System.out.println(flag);    }    // 删除文件    public void testDelete() throws Exception {        FileSystem hdfs = HDFSUtils.getFileSystem();        Path path = new Path("/opt/data/dir/touch.data");        boolean flag = hdfs.deleteOnExit(path);        System.out.println(flag);    }    // 删除目录    public void testDeleteDir() throws Exception {        FileSystem hdfs = HDFSUtils.getFileSystem();        Path path = new Path("/opt/data/dir");        boolean flag = hdfs.delete(path, true);// 如果是目录第二个参数必须为true        System.out.println(flag);    }    // 查找某个文件在hdfs集群的位置    public void testLocation() throws Exception {        FileSystem hdfs = HDFSUtils.getFileSystem();        Path path = new Path("/opt/data/test.file");        FileStatus fileStatus = hdfs.getFileStatus(path);        BlockLocation[] blockLocations = hdfs.getFileBlockLocations(fileStatus,                0, fileStatus.getLen());        for (BlockLocation blockLocation : blockLocations) {            String[] hosts = blockLocation.getHosts();            for (String host : hosts) {                System.out.print(host + " ");            }            System.out.println();        }    }    // 获取hdfs集群上所有节点名称信息    public void testCluster() throws Exception {        FileSystem hdfs = HDFSUtils.getFileSystem();        DistributedFileSystem distributedFileSystem = (DistributedFileSystem) hdfs;        DatanodeInfo[] datanodeInfos = distributedFileSystem.getDataNodeStats();        for (DatanodeInfo datanodeInfo : datanodeInfos) {            String hostName = datanodeInfo.getHostName();            System.out.println(hostName);        }    }}



三、上传合并小文件到hdfs

实现思想:循环遍历本地文件输入流

import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.FSDataOutputStream;import org.apache.hadoop.fs.FileStatus;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;/** *  * 向hdfs上传复制文件的过程中,进行合并文件 *  */public class PutMerge {    /**     *      * @param localDir     *            本地要上传的文件目录     * @param hdfsFile     *            HDFS上的文件名称,包括路径     */    public static void put(String localDir, String hdfsFile) throws Exception {        // 获取配置信息        Configuration conf = new Configuration();        Path localPath = new Path(localDir);        Path hdfsPath = new Path(hdfsFile);        // 获取本地文件系统        FileSystem localFs = FileSystem.getLocal(conf);        // 获取HDFS        FileSystem hdfs = FileSystem.get(conf);        // 本地文件系统指定目录中的所有文件        FileStatus[] status = localFs.listStatus(localPath);        // 打开hdfs上文件的输出流        FSDataOutputStream fSDataOutputStream = hdfs.create(hdfsPath);        // 循环遍历本地文件        for (FileStatus fileStatus : status) {            // 获取文件            Path path = fileStatus.getPath();            System.out.println("文件为:" + path.getName());            // 打开文件输入流            FSDataInputStream fSDataInputStream = localFs.open(path);            // 进行流的读写操作            byte[] buff = new byte[1024];            int len = 0;            while ((len = fSDataInputStream.read(buff)) > 0) {                fSDataOutputStream.write(buff, 0, len);            }            fSDataInputStream.close();        }        fSDataOutputStream.close();    }        public static void main(String[] args) {        String localDir="D:/logs";        String hdfsFile="hdfs://hadoop-master.dragon.org:9000/opt/data/logs.data";        try {            put(localDir,hdfsFile);        } catch (Exception e) {                        e.printStackTrace();        }}        }


0