Python3 操作 HDFS
发表于:2025-02-03 作者:千家信息网编辑
千家信息网最后更新 2025年02月03日,【第三方包】pyhdfs(pypi,github,支持HA)【功能】重命名 hdfs 文件或目录# encoding: utf-8# author: walker# date: 2018-03-17
千家信息网最后更新 2025年02月03日Python3 操作 HDFS
【第三方包】
pyhdfs(pypi,github,支持HA)
【功能】
重命名 hdfs 文件或目录
# encoding: utf-8# author: walker# date: 2018-03-17 # summary: 利用 pyhdfs 重命名 hdfs 文件或目录import os, sys, timefrom pyhdfs import HdfsClientSrcPath = '/test/xxx'DstPath = '/test/yyy'NameNode = 'nn1.example.com:50070,nn2.example.com:50070'# 将 SrcPath 改名为 DstPathdef Rename(SrcPath, DstPath): fs = HdfsClient(hosts=NameNode) if not fs.exists(SrcPath): print('Error: not found %s' % SrcPath) sys.exit(-1) print('Reanme ... \n%s\n -> \n%s \n' % (SrcPath, DstPath)) fs.rename(SrcPath, DstPath) if __name__ == '__main__': Rename(SrcPath, DstPath)
上传文件
# encoding: utf-8# author: walker# date: 2018-01-23# summary: 上传本地文件到 hdfs 目录import os, sys, timefrom pyhdfs import HdfsClientfrom configparser import ConfigParsercur_dir_fullpath = os.path.dirname(os.path.abspath(__file__))StartTime = time.time()FileSize = 0 #文件总大小LocalDir = ''HdfsDir = ''NameNode = ''UserName = ''#读取配置文件 def ReadConfig(): global LocalDir, HdfsDir, NameNode, UserName cfg = ConfigParser() cfgFile = os.path.join(cur_dir_fullpath, 'config.ini') if not os.path.exists(cfgFile): input(cfgFile + ' not found') sys.exit(-1) cfgLst = cfg.read(cfgFile) if len(cfgLst) < 1: input('Read config.ini failed...') sys.exit(-1) LocalDir = cfg.get('config', 'LocalDir').strip() if not os.path.exists(LocalDir): input(LocalDir + ' not found') sys.exit(-1) print('LocalDir:' + LocalDir) HdfsDir = cfg.get('config', 'HdfsDir').strip() print('HdfsDir:' + HdfsDir) NameNode = cfg.get('config', 'NameNode').strip() print('NameNode:' + NameNode) UserName = cfg.get('config', 'UserName').strip() print('UserName:' + UserName) print('Read config.ini successed!') #处理一个def ProcOne(client, srcFile, dstFile): global FileSize print('ProcOne \n%s\n -> \n%s ' % (srcFile, dstFile)) #目标文件已经存在且大小相同 if client.exists(dstFile) and \ (os.path.getsize(srcFile) == client.list_status(dstFile)[0].length): print('file exists: %s ' % dstFile) return True #注意,如果已存在会被覆盖 client.copy_from_local(srcFile, dstFile, overwrite=True) #校验文件大小 if os.path.getsize(srcFile) == client.list_status(dstFile)[0].length: FileSize += os.path.getsize(srcFile) return True return False #处理所有def ProcAll(): client = HdfsClient(hosts=NameNode, user_name=UserName) if not client.exists(HdfsDir): print(HdfsDir + ' not found') sys.exit(-1) total = len(os.listdir(LocalDir)) processed = 0 failedList = list() for filename in os.listdir(LocalDir): srcFile = os.path.join(LocalDir, filename) dstFile = HdfsDir + '/' + filename if not ProcOne(client, srcFile, dstFile): failedList.append(srcFile) processed += 1 print('%d/%d/%d, time cost: %.2f s' % (total, processed, len(failedList), time.time()-StartTime)) print('%d B, %.2f MB/s \n' % (FileSize, FileSize/1024/1024/(time.time()-StartTime))) if failedList: print('failedList: %s' % repr(failedList)) else: print('Good! No Error!') print('%d B, %.2f MB, %.2f GB, %.2f MB/s' % \ (FileSize, FileSize/1024/1024, FileSize/1024/1024/1024, FileSize/1024/1024/(time.time()-StartTime))) if __name__ == '__main__': ReadConfig() ProcAll() print('Time total: %.2f s' % (time.time()-StartTime)) print(time.strftime('%Y-%m-%d %H:%M:%S',time.localtime()))
下载 HDFS 文件到本地
# encoding: utf-8# author: walker# date: 2018-06-07# summary: 下载 HDFS 文件(或目录)到本地import os, sys, timefrom pyhdfs import HdfsClientfrom configparser import ConfigParsercur_dir_fullpath = os.path.dirname(os.path.abspath(__file__))StartTime = time.time()FileSize = 0 #文件总大小LocalDir = ''HdfsDir = ''NameNode = ''UserName = ''#读取配置文件 def ReadConfig(): global LocalDir, HdfsDir, NameNode, UserName cfg = ConfigParser() cfgFile = os.path.join(cur_dir_fullpath, 'config.ini') if not os.path.exists(cfgFile): input(cfgFile + ' not found') sys.exit(-1) cfgLst = cfg.read(cfgFile) if len(cfgLst) < 1: input('Read config.ini failed...') sys.exit(-1) LocalDir = cfg.get('config', 'LocalDir').strip() if not os.path.exists(LocalDir): input(LocalDir + ' not found') sys.exit(-1) print('LocalDir:' + LocalDir) HdfsDir = cfg.get('config', 'HdfsDir').strip().rstrip('/') print('HdfsDir:' + HdfsDir) NameNode = cfg.get('config', 'NameNode').strip() print('NameNode:' + NameNode) UserName = cfg.get('config', 'UserName').strip() print('UserName:' + UserName) print('Read config.ini successed!') #处理一个def ProcOne(client, srcFile, dstFile): global FileSize print('ProcOne \n%s\n -> \n%s ' % (srcFile, dstFile)) dstDir = os.path.dirname(dstFile) if not os.path.exists(dstDir): os.makedirs(dstDir) # 目标文件已经存在且大小相同 if os.path.exists(dstFile) and \ (os.path.getsize(dstFile) == client.list_status(srcFile)[0].length): print('file exists: %s ' % dstFile) return True # 注意,如果已存在会被覆盖 client.copy_to_local(srcFile, dstFile, overwrite=True) if os.path.getsize(dstFile) != client.list_status(srcFile)[0].length: #校验文件大小 return False FileSize += os.path.getsize(dstFile) return True #处理所有def ProcAll(): client = HdfsClient(hosts=NameNode, user_name=UserName) if not client.exists(HdfsDir): print(HdfsDir + ' not found') sys.exit(-1) total = 0 # 先遍历一遍,得到总文件个数 for parent, dirnames, filenames in client.walk(HdfsDir): for filename in filenames: total += 1 processed = 0 failedList = list() for parent, dirnames, filenames in client.walk(HdfsDir): for filename in filenames: srcFile = '%s/%s' % (parent, filename) relPath = srcFile[len(HdfsDir)+1:].replace('/', '\\') # 相对于根目录的路径 dstFile = os.path.join(LocalDir, relPath) if not ProcOne(client, srcFile, dstFile): failedList.append(srcFile) processed += 1 print('%d/%d/%d, time cost: %.2f s' % (total, processed, len(failedList), time.time()-StartTime)) print('%d B, %.2f MB/s \n' % (FileSize, FileSize/1024/1024/(time.time()-StartTime))) if failedList: print('failedList: %s' % repr(failedList)) else: print('Good! No Error!') print('%d B, %.2f MB, %.2f GB, %.2f MB/s' % \(FileSize, FileSize/1024/1024, FileSize/1024/1024/1024, FileSize/1024/1024/(time.time()-StartTime))) if __name__ == '__main__': ReadConfig() ProcAll() print('Time total: %.2f s' % (time.time()-StartTime)) print(time.strftime('%Y-%m-%d %H:%M:%S',time.localtime()))
*** walker ***
文件
大小
目录
相同
目标
处理
配置
个数
功能
根目录
第三方
路径
支持
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
数据库可以当作办公软件
做数据库用什么软件
信息安全技术应用软件开发
网络安全五个方面
网络安全常见攻防技术
sql查数据库数据总大小
崩坏3怎么不能选择服务器
软件开发设计中的思想
杭州衡泰软件开发
区长 网络安全 讲话
高校网络安全保障部署会
新乡市软件开发
数据库如何检查大批量数据更新
东莞聚聘互联网科技
无线传感器网络技术6
青岛优乐网络技术
数据库添加约束独特
罗斯文数据库工作界面基础功能
存储服务器能不能给电脑用
网络安全伴我同行黑板报
战舰世界亚服选哪个服务器
郑州点创科网络技术
网络安全小技巧
网络安全对经济发展重要性
中国铁建网络安全会议
乳山青青软件开发服务中心
梦码软件开发公司
朗尼科视频服务器管理软件
会计信息网络技术的发展趋势
软件开发公司总帐会计科目