千家信息网

Python3 操作 HDFS

发表于:2025-02-03 作者:千家信息网编辑
千家信息网最后更新 2025年02月03日,【第三方包】pyhdfs(pypi,github,支持HA)【功能】重命名 hdfs 文件或目录# encoding: utf-8# author: walker# date: 2018-03-17
千家信息网最后更新 2025年02月03日Python3 操作 HDFS

【第三方包】

  • pyhdfs(pypi,github,支持HA)


【功能】

  • 重命名 hdfs 文件或目录

# encoding: utf-8# author: walker# date: 2018-03-17 # summary: 利用 pyhdfs 重命名 hdfs 文件或目录import os, sys, timefrom pyhdfs import HdfsClientSrcPath = '/test/xxx'DstPath = '/test/yyy'NameNode = 'nn1.example.com:50070,nn2.example.com:50070'# 将 SrcPath 改名为 DstPathdef Rename(SrcPath, DstPath):        fs = HdfsClient(hosts=NameNode)        if not fs.exists(SrcPath):                print('Error: not found %s' % SrcPath)                sys.exit(-1)                        print('Reanme ... \n%s\n -> \n%s \n' % (SrcPath, DstPath))                fs.rename(SrcPath, DstPath)                if __name__ == '__main__':        Rename(SrcPath, DstPath)
  • 上传文件

# encoding: utf-8# author: walker# date: 2018-01-23# summary: 上传本地文件到 hdfs 目录import os, sys, timefrom pyhdfs import HdfsClientfrom configparser import ConfigParsercur_dir_fullpath = os.path.dirname(os.path.abspath(__file__))StartTime = time.time()FileSize = 0          #文件总大小LocalDir = ''HdfsDir = ''NameNode = ''UserName = ''#读取配置文件 def ReadConfig():        global LocalDir, HdfsDir, NameNode, UserName        cfg = ConfigParser()        cfgFile = os.path.join(cur_dir_fullpath, 'config.ini')        if not os.path.exists(cfgFile):                input(cfgFile + ' not found')                sys.exit(-1)        cfgLst = cfg.read(cfgFile)        if len(cfgLst) < 1:                input('Read config.ini failed...')                sys.exit(-1)                LocalDir = cfg.get('config', 'LocalDir').strip()           if not os.path.exists(LocalDir):                input(LocalDir + ' not found')                sys.exit(-1)        print('LocalDir:' + LocalDir)                HdfsDir = cfg.get('config', 'HdfsDir').strip()         print('HdfsDir:' + HdfsDir)                   NameNode = cfg.get('config', 'NameNode').strip()         print('NameNode:' + NameNode)         UserName = cfg.get('config', 'UserName').strip()         print('UserName:' + UserName)                 print('Read config.ini successed!')        #处理一个def ProcOne(client, srcFile, dstFile):        global FileSize        print('ProcOne \n%s\n -> \n%s ' % (srcFile, dstFile))                #目标文件已经存在且大小相同        if client.exists(dstFile) and \                (os.path.getsize(srcFile) == client.list_status(dstFile)[0].length):                print('file exists: %s ' % dstFile)                return True                #注意,如果已存在会被覆盖        client.copy_from_local(srcFile, dstFile, overwrite=True)                      #校验文件大小        if os.path.getsize(srcFile) == client.list_status(dstFile)[0].length:                    FileSize += os.path.getsize(srcFile)                return True                        return False        #处理所有def ProcAll():             client = HdfsClient(hosts=NameNode, user_name=UserName)        if not client.exists(HdfsDir):                print(HdfsDir + ' not found')                sys.exit(-1)            total = len(os.listdir(LocalDir))        processed = 0        failedList = list()        for filename in os.listdir(LocalDir):                srcFile = os.path.join(LocalDir, filename)                dstFile = HdfsDir + '/' + filename                if not ProcOne(client, srcFile, dstFile):                        failedList.append(srcFile)                processed += 1                                print('%d/%d/%d, time cost: %.2f s' % (total, processed, len(failedList), time.time()-StartTime))                print('%d B, %.2f MB/s \n' % (FileSize, FileSize/1024/1024/(time.time()-StartTime)))                if failedList:                print('failedList: %s' % repr(failedList))        else:                print('Good! No Error!')                print('%d B, %.2f MB, %.2f GB, %.2f MB/s' % \            (FileSize, FileSize/1024/1024, FileSize/1024/1024/1024, FileSize/1024/1024/(time.time()-StartTime)))        if __name__ == '__main__':        ReadConfig()        ProcAll()        print('Time total: %.2f s' % (time.time()-StartTime))        print(time.strftime('%Y-%m-%d %H:%M:%S',time.localtime()))
  • 下载 HDFS 文件到本地

# encoding: utf-8# author: walker# date: 2018-06-07# summary: 下载 HDFS 文件(或目录)到本地import os, sys, timefrom pyhdfs import HdfsClientfrom configparser import ConfigParsercur_dir_fullpath = os.path.dirname(os.path.abspath(__file__))StartTime = time.time()FileSize = 0        #文件总大小LocalDir = ''HdfsDir = ''NameNode = ''UserName = ''#读取配置文件 def ReadConfig():    global LocalDir, HdfsDir, NameNode, UserName    cfg = ConfigParser()    cfgFile = os.path.join(cur_dir_fullpath, 'config.ini')    if not os.path.exists(cfgFile):        input(cfgFile + ' not found')        sys.exit(-1)    cfgLst = cfg.read(cfgFile)    if len(cfgLst) < 1:        input('Read config.ini failed...')        sys.exit(-1)        LocalDir = cfg.get('config', 'LocalDir').strip()    if not os.path.exists(LocalDir):        input(LocalDir + ' not found')        sys.exit(-1)    print('LocalDir:' + LocalDir)        HdfsDir = cfg.get('config', 'HdfsDir').strip().rstrip('/')    print('HdfsDir:' + HdfsDir)         NameNode = cfg.get('config', 'NameNode').strip()    print('NameNode:' + NameNode)       UserName = cfg.get('config', 'UserName').strip()    print('UserName:' + UserName)           print('Read config.ini successed!')    #处理一个def ProcOne(client, srcFile, dstFile):    global FileSize    print('ProcOne \n%s\n -> \n%s ' % (srcFile, dstFile))    dstDir = os.path.dirname(dstFile)    if not os.path.exists(dstDir):        os.makedirs(dstDir)        # 目标文件已经存在且大小相同    if os.path.exists(dstFile) and \        (os.path.getsize(dstFile) == client.list_status(srcFile)[0].length):        print('file exists: %s ' % dstFile)        return True        # 注意,如果已存在会被覆盖    client.copy_to_local(srcFile, dstFile, overwrite=True)        if os.path.getsize(dstFile) != client.list_status(srcFile)[0].length:   #校验文件大小        return False    FileSize += os.path.getsize(dstFile)    return True    #处理所有def ProcAll():      client = HdfsClient(hosts=NameNode, user_name=UserName)    if not client.exists(HdfsDir):        print(HdfsDir + ' not found')        sys.exit(-1)                total = 0    # 先遍历一遍,得到总文件个数    for parent, dirnames, filenames in client.walk(HdfsDir):        for filename in filenames:            total += 1    processed = 0    failedList = list()    for parent, dirnames, filenames in client.walk(HdfsDir):        for filename in filenames:            srcFile = '%s/%s' % (parent, filename)            relPath = srcFile[len(HdfsDir)+1:].replace('/', '\\')   # 相对于根目录的路径            dstFile = os.path.join(LocalDir, relPath)            if not ProcOne(client, srcFile, dstFile):                failedList.append(srcFile)            processed += 1                  print('%d/%d/%d, time cost: %.2f s' % (total, processed, len(failedList), time.time()-StartTime))            print('%d B, %.2f MB/s \n' % (FileSize, FileSize/1024/1024/(time.time()-StartTime)))        if failedList:        print('failedList: %s' % repr(failedList))    else:        print('Good! No Error!')        print('%d B, %.2f MB, %.2f GB, %.2f MB/s' % \(FileSize, FileSize/1024/1024, FileSize/1024/1024/1024, FileSize/1024/1024/(time.time()-StartTime)))    if __name__ == '__main__':    ReadConfig()    ProcAll()    print('Time total: %.2f s' % (time.time()-StartTime))    print(time.strftime('%Y-%m-%d %H:%M:%S',time.localtime()))


*** walker ***




0