hdfs常用API和putMerge功能实现
发表于:2025-02-01 作者:千家信息网编辑
千家信息网最后更新 2025年02月01日,所需jar包一、URL API操作方式import java.io.InputStream;import java.net.URL;import org.apache.hadoop.fs.FsUrlS
千家信息网最后更新 2025年02月01日hdfs常用API和putMerge功能实现
所需jar包
一、URL API操作方式
import java.io.InputStream;import java.net.URL;import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;import org.apache.hadoop.io.IOUtils;import org.junit.Test;public class HDFSUrlTest { /** * HDFS URL API操作方式 * 不需要读取core-site.xml和hdfs-site.xml配置文件 */ // 让JAVA程序识别HDFS的URL static { URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory()); } // 查看文件内容 @Test public void testRead() throws Exception { InputStream in = null; // 文件路径 String fileUrl = "hdfs://hadoop-master.dragon.org:9000/opt/data/test/01.data"; try { // 获取文件输入流 in = new URL(fileUrl).openStream(); // 将文件内容读取出来,打印控制台 IOUtils.copyBytes(in, System.out, 4096, false); } finally { IOUtils.closeStream(in); } }}
二、通过FileSystem API操作HDFS
HDFS工具类
import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;public class HDFSUtils {/** * HDFS工具类 */ public static FileSystem getFileSystem() { //声明FileSystem FileSystem hdfs=null; try { //获取文件配置信息 Configuration conf =new Configuration(); //获取文件系统 hdfs=FileSystem.get(conf); } catch (IOException e) { e.printStackTrace(); } return hdfs; } }
常用操作实现类
import org.apache.hadoop.fs.BlockLocation;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.FSDataOutputStream;import org.apache.hadoop.fs.FileStatus;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.hdfs.DistributedFileSystem;import org.apache.hadoop.hdfs.protocol.DatanodeInfo;import org.apache.hadoop.io.IOUtils;import org.apache.hadoop.mapred.gethistory_jsp;import org.junit.Test;public class HDFSFsTest { /** * * 通过FileSystem API操作HDFS */ // 读取文件内容 @Test public void testRead() throws Exception { // 获取文件系统 FileSystem hdfs = HDFSUtils.getFileSystem(); // 文件名称 Path path = new Path("/opt/data/test/touch.data"); // 打开文件输入流 FSDataInputStream inStream = hdfs.open(path); // 读取文件到控制台显示 IOUtils.copyBytes(inStream, System.out, 4096, false); // 关闭流 IOUtils.closeStream(inStream); } // 查看目录 @Test public void testList() throws Exception { FileSystem hdfs = HDFSUtils.getFileSystem(); // 文件名称 Path path = new Path("/opt/data"); FileStatus[] fileStatus = hdfs.listStatus(path); for (FileStatus file : fileStatus) { Path p = file.getPath(); String info = file.isDir() ? "目录" : "文件"; System.out.println(info + ":" + p); } } // 创建目录 @Test public void testDirectory() throws Exception { FileSystem hdfs = HDFSUtils.getFileSystem(); // 要创建的目录 Path path = new Path("/opt/data/dir"); boolean isSuccessful = hdfs.mkdirs(path);// 相当于 linux下 mkdir -p // /opt/data/dir String info = isSuccessful ? "成功" : "失败"; System.out.println("创建目录【" + path + "】" + info); } // 上传文件-- put copyFromLocal @Test public void testPut() throws Exception { FileSystem hdfs = HDFSUtils.getFileSystem(); // 本地文件(目录+文件名称) Path srcPath = new Path("c:/0125.log"); // hdfs文件上传路径 Path dstPath = new Path("/opt/data/dir/"); hdfs.copyFromLocalFile(srcPath, dstPath); } // 创建hdfs文件并写入内容 @Test public void testCreate() throws Exception { FileSystem hdfs = HDFSUtils.getFileSystem(); Path path = new Path("/opt/data/dir/touch.data"); // 创建文件并获取输出流 FSDataOutputStream fSDataOutputStream = hdfs.create(path); // 通过输出流写入数据 fSDataOutputStream.write("你好".getBytes()); fSDataOutputStream.writeUTF("hello hadoop!"); IOUtils.closeStream(fSDataOutputStream); } // 文件重命名 @Test public void testRename() throws Exception { FileSystem hdfs = HDFSUtils.getFileSystem(); Path oldPath = new Path("/opt/data/dir/touch.data"); Path newPath = new Path("/opt/data/dir/rename.data"); boolean flag = hdfs.rename(oldPath, newPath); System.out.println(flag); } // 删除文件 public void testDelete() throws Exception { FileSystem hdfs = HDFSUtils.getFileSystem(); Path path = new Path("/opt/data/dir/touch.data"); boolean flag = hdfs.deleteOnExit(path); System.out.println(flag); } // 删除目录 public void testDeleteDir() throws Exception { FileSystem hdfs = HDFSUtils.getFileSystem(); Path path = new Path("/opt/data/dir"); boolean flag = hdfs.delete(path, true);// 如果是目录第二个参数必须为true System.out.println(flag); } // 查找某个文件在hdfs集群的位置 public void testLocation() throws Exception { FileSystem hdfs = HDFSUtils.getFileSystem(); Path path = new Path("/opt/data/test.file"); FileStatus fileStatus = hdfs.getFileStatus(path); BlockLocation[] blockLocations = hdfs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen()); for (BlockLocation blockLocation : blockLocations) { String[] hosts = blockLocation.getHosts(); for (String host : hosts) { System.out.print(host + " "); } System.out.println(); } } // 获取hdfs集群上所有节点名称信息 public void testCluster() throws Exception { FileSystem hdfs = HDFSUtils.getFileSystem(); DistributedFileSystem distributedFileSystem = (DistributedFileSystem) hdfs; DatanodeInfo[] datanodeInfos = distributedFileSystem.getDataNodeStats(); for (DatanodeInfo datanodeInfo : datanodeInfos) { String hostName = datanodeInfo.getHostName(); System.out.println(hostName); } }}
三、上传合并小文件到hdfs
实现思想:循环遍历本地文件输入流
import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.FSDataOutputStream;import org.apache.hadoop.fs.FileStatus;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;/** * * 向hdfs上传复制文件的过程中,进行合并文件 * */public class PutMerge { /** * * @param localDir * 本地要上传的文件目录 * @param hdfsFile * HDFS上的文件名称,包括路径 */ public static void put(String localDir, String hdfsFile) throws Exception { // 获取配置信息 Configuration conf = new Configuration(); Path localPath = new Path(localDir); Path hdfsPath = new Path(hdfsFile); // 获取本地文件系统 FileSystem localFs = FileSystem.getLocal(conf); // 获取HDFS FileSystem hdfs = FileSystem.get(conf); // 本地文件系统指定目录中的所有文件 FileStatus[] status = localFs.listStatus(localPath); // 打开hdfs上文件的输出流 FSDataOutputStream fSDataOutputStream = hdfs.create(hdfsPath); // 循环遍历本地文件 for (FileStatus fileStatus : status) { // 获取文件 Path path = fileStatus.getPath(); System.out.println("文件为:" + path.getName()); // 打开文件输入流 FSDataInputStream fSDataInputStream = localFs.open(path); // 进行流的读写操作 byte[] buff = new byte[1024]; int len = 0; while ((len = fSDataInputStream.read(buff)) > 0) { fSDataOutputStream.write(buff, 0, len); } fSDataInputStream.close(); } fSDataOutputStream.close(); } public static void main(String[] args) { String localDir="D:/logs"; String hdfsFile="hdfs://hadoop-master.dragon.org:9000/opt/data/logs.data"; try { put(localDir,hdfsFile); } catch (Exception e) { e.printStackTrace(); }} }
文件
目录
名称
内容
系统
输入
信息
路径
输出
配置
工具
控制台
方式
集群
循环
控制
常用
成功
位置
你好
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
php数据库null
软件开发的方法有关论文
vs根据数据库生成属性
网络安全靶场系统搭建
开票服务器管理系统后台
网络安全 研究方法
亚洲服务器英文版
湛江手机软件开发市场价
魔豆网络技术 李晨
视图会影响数据库吗
网络安全品牌全球
杨浦区专业软件开发服务销售公司
福建蓝牙软件开发价格
互联网科技网站哪个好
网络技术初步 教案
数据库sql语句sum函数
水星wifi服务器无法连接
软件开发过程中有趣的故事
平板电脑安装软件开发
web服务器程序下载
网络安全宣传小知识
太原市网络安全等保制度
下列关于数据库模型
青浦区信息软件开发服务收费
麦块怎么开服务器
计算机三级网络技术宝典
网络安全提权
南昌市app软件开发公司
手机软件开发需要哪些
青少年涉及网络安全事件