千家信息网

s3cmd数据操作怎么实现

发表于:2025-01-31 作者:千家信息网编辑
千家信息网最后更新 2025年01月31日,本篇内容介绍了"s3cmd数据操作怎么实现"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!1. 基本原
千家信息网最后更新 2025年01月31日s3cmd数据操作怎么实现

本篇内容介绍了"s3cmd数据操作怎么实现"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

1. 基本原理

操作流程:
1.客户端完成文件切片,之后向API server提交上传操作请求,生成对应的presign URL(步骤1和2,如果想控制客户端上传数量,可以在这个阶段生成指定数量的token)
2.使用生成的presign URL构造HTTP请求,向S3服务上传数据。(步骤3)
3.客户端完成所有分块上传以后,向API server提交Complete请求,之后再由API server向S3服务发送complete请求(步骤4和5)
4.客户端获取到API server的返回,完成最后的操作。(可以考虑在这里回收token)

2. 优缺点

优点:
1. accesskey和secretkey不会存储在客户端,避免key泄露
2. 每个presignURL对应一个keyname,在有效时间内可以任意上传和覆盖已有文件,比较灵活。
3. 服务端可以结合各种Auth系统完成客户端的认证和授权,方便集成现有业务。
4. 客户端上传下载方式灵活,拿到presignURL以后,可以通过任意支持HTTP协议的客户端进行上传下载操作。
5. 适合大文件上传,对比之介绍的Presign方式,在数据上传阶段支持并发上传,上传效率有非常大的提升。

缺点:
1. 上传步骤需要多次交互,流程稍复杂。
2. 受限S3分块上传标准,文件小于5M不适用该方法。

3. 具体实现

安装服务端依赖

pip install botopip install flask-restful

服务端demo代码如下:

# -*- coding: utf-8 -*-import timeimport hmacfrom hashlib import sha1 as shaimport botoimport boto.s3.connectionimport repy3k = Falsetry:    from urlparse import urlparse, unquote    from base64 import encodestringexcept:    py3k = True    from urllib.parse import urlparse, unquote    from base64 import encodebytes as encodestringfrom flask import Flask, requestfrom flask_restful import Api, Resourceapp = Flask(__name__)api = Api(app)from boto.s3.multipart import MultiPartUploadclass MultiPartUpload_Presign(MultiPartUpload):    def __init__(self,id,bucket,key_name):        MultiPartUpload.__init__(self)        self.id = id        self.bucket = bucket        self.key_name = key_name    def complete_upload(self):        xml = self.to_xml()        return self.bucket.complete_multipart_upload(self.key_name,                                                     self.id, xml)class S3PreSign():    def __init__(self, object_name,metadata=None, policy=None):        self.service_url = 's3.ceph.work' #填S3服务的endpoint        self.access_key = '' #access key        self.secret_key = '' #secret key        self.bucket_name = 'multi-upload' #bucket名称        self.object_name = str(object_name)        # self.Expires = int(time.time()) + int(expires)        conn = boto.connect_s3(        aws_access_key_id = self.access_key,        aws_secret_access_key = self.secret_key,        host = self.service_url,        port = 80,        is_secure=False,               # uncommmnt if you are not using ssl        # calling_format = boto.s3.connection.OrdinaryCallingFormat(),        calling_format = boto.s3.connection.SubdomainCallingFormat(),        )        self.bucket = conn.get_bucket(self.bucket_name)        self.upload_ID = self.Make_uploadID(self.object_name ,metadata=metadata, policy=policy)    def Make_uploadID(self,object_name,metadata=None,policy=None):        mpu = self.bucket.initiate_multipart_upload(object_name, metadata=metadata, policy=policy)        return mpu.id    def complete_upload(self,upload_ID):        mpu = MultiPartUpload_Presign(id=upload_ID, bucket=self.bucket, key_name=self.object_name)        status_ = 200        try:            mpu.complete_upload()        except:            status_ = 422        finally:            return status_    def get_signature_str(self, sign_str):        if py3k:            key = self.secret_key.encode('utf-8')            msg = sign_str.encode('utf-8')        else:            key = self.secret_key            msg = sign_str        h = hmac.new(key, msg, digestmod=sha)        return (encodestring(h.digest()).strip()).replace('+', '%2b')    def build_url(self, expires,partNumber, Signature):        url_ = "http://{bucket_name}.{service_url}/{object_name}?uploadId={uploadId}&partNumber={partNumber}&Expires={Expires}&AWSAccessKeyId={AWSAccessKeyId}&Signature={Signature}".format(            bucket_name=self.bucket_name,            service_url=self.service_url,            object_name=self.object_name,            uploadId=self.upload_ID,            partNumber=partNumber,            Expires= expires,            AWSAccessKeyId=self.access_key,            Signature=Signature        )        return url_    def build_url_with_partid(self,expires, partNumber, partMd5 ):        sign_str = "PUT\n{partMd5}\n\n{Expires}\n/{bucket_name}/{object_name}?partNumber={partNumber}&uploadId={uploadId}".format(            partMd5=partMd5,            Expires=expires,            bucket_name=self.bucket_name,            object_name=self.object_name,            partNumber=partNumber,            uploadId=self.upload_ID)        Signature_ = self.get_signature_str(sign_str)        return self.build_url(expires, partNumber, Signature_)class MultiPart_List(Resource):    def post(self):        PartNumber_ = {}        metadata = {}        policy = None        # print request.form['keyname']        if 'keyname' in request.form:            keyname = request.form['keyname']        else:            return "no key", 400        if 'expires' in request.form:            expires = request.form['expires']        else:            return "no expires", 400        if 'contenttype' in request.form:            metadata['Content-Type'] = str(request.form['contenttype'])        if 'x-amz-acl' in request.form:            policy = str(request.form['x-amz-acl'])        for part_ in request.form:            if re.match(r'^\d{1,}$',part_):                # print part_                PartNumber_[part_] = request.form[part_]            meatadata_rule = 'x-amz-meta-'            if re.match(meatadata_rule, part_):                # print part_                metadata[part_.split(meatadata_rule)[1]] = str(request.form[part_])        print metadata,policy,keyname,expires        s3client = S3PreSign(keyname)        result = {}        result['UploadID'] = s3client.upload_ID        expires = int(time.time()) + int(expires)        for p_ in PartNumber_:            result[p_] = s3client.build_url_with_partid(expires,p_,PartNumber_[p_])        return result, 201class Complete_MultiPart(Resource):    def post(self):        if 'keyname' in request.form:            keyname = request.form['keyname']        else:            return "no key", 400        if 'uploadid' in request.form:            uploadid = request.form['uploadid']        else:            return "no UploadID", 400        s3client = S3PreSign(keyname)        result = s3client.complete_upload(uploadid)        return {"status_code":result}, resultapi.add_resource(MultiPart_List, '/presign')api.add_resource(Complete_MultiPart, '/complete')if __name__ == '__main__':    app.run(debug=True)

安装客户端依赖

pip install requests

客户端demo代码如下:

# -*- coding: utf-8 -*-import requestsfrom base64 import encodestringfrom hashlib import md5import osimport jsonfrom multiprocessing import Pooldef multipart_upload_with_part(url_, part_file_path, partMD5):    headers = {}    headers["Content-MD5"] = partMD5    with open(part_file_path,'r') as fh:        response = requests.put(url_, headers=headers, data=fh.read())        if response.status_code == 200:            print "{} upload Sucessful !".format(part_file_path)class S3client():    def __init__(self, key_name, expires,part_num, uploadfile_path, policy=None, contenttype=None, metadata=None ,processes_num=2):        self.multipart_data = {}        if key_name:            self.multipart_data['keyname'] = key_name        if expires:            self.multipart_data['expires'] = expires        if policy:            self.multipart_data['x-amz-acl'] = policy        if contenttype:            self.multipart_data['contenttype'] = contenttype        if metadata:            for k in metadata:                self.multipart_data[k] = metadata[k]        self.part_num = part_num        self.processes_num = processes_num        self.uploadfile_path = uploadfile_path        self.server = 'http://localhost:5000/' #这里填你API服务器地址        self.upload_file_list_ = {}    def split_file(self):        filelist = []        statinfo = os.stat(self.uploadfile_path)        chunksize = statinfo.st_size / self.part_num        print "File size: %d(MB)" % (statinfo.st_size / (1024 * 1024))        print self.uploadfile_path,chunksize        with open(self.uploadfile_path, "rb") as f:            index = 1            while True:                chunk = f.read(chunksize)                if (chunk):                    fn = "%s.part.%d" % (self.uploadfile_path, index)                    # print "creating", fn                    with open(fn, "wb") as fw:                        fw.write(chunk)                    partMD5 = self.compute_hash(fn)                    tmp_ = {}                    tmp_[fn] = str(partMD5)                    filelist.append(tmp_)                    index = index + 1                else:                    break        return filelist    def compute_hash(self, filepath, buf_size=8192, size=None, hash_algorithm=md5):        hash_obj = hash_algorithm()        with open(filepath) as fp:            spos = fp.tell()            if size and size < buf_size:                s = fp.read(size)            else:                s = fp.read(buf_size)            while s:                if not isinstance(s, bytes):                    s = s.encode('utf-8')                hash_obj.update(s)                if size:                    size -= len(s)                    if size <= 0:                        break                if size and size < buf_size:                    s = fp.read(size)                else:                    s = fp.read(buf_size)            base64_digest = encodestring(hash_obj.digest()).decode('utf-8')            if base64_digest[-1] == '\n':                base64_digest = base64_digest[0:-1]            return base64_digest    def make_upload_list(self):        upload_file_list = self.split_file()        for f in upload_file_list:            part_path = f.keys()[0]            partMD5 = f.values()[0]            # partnum_ = f.keys()[0].split(".")[-1]            yield {part_path:partMD5}    def get_multipart_presignurl(self):        upload_file_list = self.make_upload_list()        for i in upload_file_list:            self.multipart_data[i.keys()[0].split(".")[-1]] = i.values()[0]            self.upload_file_list_[i.keys()[0].split(".")[-1]] = {i.keys()[0]:i.values()[0]}        url_ = self.server + "presign"        r = requests.post(url_, data=self.multipart_data)        allurl_ = json.loads(r.text)        UploadID = allurl_.pop('UploadID')        return UploadID,allurl_    def complete(self,UploadID,key_name):        data = {"uploadid":UploadID,'keyname':key_name}        url_ = self.server + "complete"        r = requests.post(url_, data=data)        if r.status_code == 200:            print "Multipart upload finished!"        else:            print "Multipart upload failed!"    def upload_mulprocess(self,allurl_):        p = Pool(processes=self.processes_num)        for url in allurl_:            partNUm = url            tmp_file = self.upload_file_list_[partNUm]            filepath = tmp_file.keys()[0]            partMD5 = tmp_file.values()[0]            put_url = allurl_[url]            p.apply_async(multipart_upload_with_part, (put_url,filepath,partMD5,))        print 'Waiting for all subprocesses done...'        p.close()        p.join()if __name__ == "__main__":    key_name = 'abc.json' #上传的object名称    part_num = 6 #文件切分数量    expires = 300 #签名有效时长    file_path = '/tmp/abc.json' #上传文件本地路径    processes_num = 2 #上传并发数    contenttype = 'application/json' #文件的Content-type    policy = 'public-read' #设置object的ACL权限    metadata = {'x-amz-meta-abc':'abcd'} #object的metadata    #第一步:参数初始化    s3client = S3client(key_name,expires,part_num,file_path,policy,contenttype,metadata,2)    #第二步:生成PresignURL    UploadID,upload_file_list = s3client.get_multipart_presignurl()    #第三步:使用生成的PresignURL上传数据    s3client.upload_mulprocess(upload_file_list)    #第四步:提交compelte请求,完成最后的各个分块数据逻辑合并    s3client.complete(UploadID,key_name)

"s3cmd数据操作怎么实现"的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注网站,小编将为大家输出更多高质量的实用文章!

0