千家信息网

python如何实现查询bucket已用量脚本

发表于:2025-01-31 作者:千家信息网编辑
千家信息网最后更新 2025年01月31日,小编给大家分享一下python如何实现查询bucket已用量脚本,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!目前仅支持c
千家信息网最后更新 2025年01月31日python如何实现查询bucket已用量脚本

小编给大家分享一下python如何实现查询bucket已用量脚本,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!

目前仅支持ceph的s3方案,具体配置看说明

# -*- coding: utf-8 -*-import requestsimport jsonfrom email.utils import formatdateimport hmacpy3k = Falsefrom hashlib import sha1 as shatry:    from urlparse import urlparse    from base64 import encodestringexcept:    py3k = True    from urllib.parse import urlparse    from base64 import encodebytes as encodestringclass AuthBase(object):    """Base class that all auth implementations derive from"""    def __call__(self, r):        raise NotImplementedError('Auth hooks must be callable.')class S3Auth(AuthBase):    """Attaches AWS Authentication to the given Request object."""    service_base_url = 's3.amazonaws.com'    # List of Query String Arguments of Interest    special_params = [        'acl', 'location', 'logging', 'partNumber', 'policy', 'requestPayment',        'torrent', 'versioning', 'versionId', 'versions', 'website', 'uploads',        'uploadId', 'response-content-type', 'response-content-language',        'response-expires', 'response-cache-control', 'delete', 'lifecycle',        'response-content-disposition', 'response-content-encoding'    ]    def __init__(self, access_key, secret_key, service_url=None):        if service_url:            self.service_base_url = service_url        self.access_key = str(access_key)        self.secret_key = str(secret_key)        self.au =""    def __call__(self, r):        # Create date header if it is not created yet.        if not 'date' in r.headers and not 'x-amz-date' in r.headers:            r.headers['date'] = formatdate(                timeval=None,                localtime=False,                usegmt=True)        signature = self.get_signature(r)        if py3k:            signature = signature.decode('utf-8')        r.headers['Authorization'] = 'AWS %s:%s' % (self.access_key, signature)        self.au = r.headers        # print self.au        return r    def get_signature(self, r):        canonical_string = self.get_canonical_string(            r.url, r.headers, r.method)        if py3k:            key = self.secret_key.encode('utf-8')            msg = canonical_string.encode('utf-8')        else:            key = self.secret_key            msg = canonical_string        h = hmac.new(key, msg, digestmod=sha)        return encodestring(h.digest()).strip()    def get_canonical_string(self, url, headers, method):        parsedurl = urlparse(url)        objectkey = parsedurl.path[1:]        query_args = sorted(parsedurl.query.split('&'))        bucket = parsedurl.netloc[:-len(self.service_base_url)]        if len(bucket) > 1:            # remove last dot            bucket = bucket[:-1]        interesting_headers = {            'content-md5': '',            'content-type': '',            'date': ''}        for key in headers:            lk = key.lower()            try:                lk = lk.decode('utf-8')            except:                pass            if headers[key] and (lk in interesting_headers.keys() or lk.startswith('x-amz-')):                interesting_headers[lk] = headers[key].strip()        # If x-amz-date is used it supersedes the date header.        if not py3k:            if 'x-amz-date' in interesting_headers:                interesting_headers['date'] = ''        else:            if 'x-amz-date' in interesting_headers:                interesting_headers['date'] = ''        buf = '%s\n' % method        for key in sorted(interesting_headers.keys()):            val = interesting_headers[key]            if key.startswith('x-amz-'):                buf += '%s:%s\n' % (key, val)            else:                buf += '%s\n' % val        # append the bucket if it exists        if bucket != '':            buf += '/%s' % bucket        # add the objectkey. even if it doesn't exist, add the slash        buf += '/%s' % objectkey        params_found = False        # handle special query string arguments        for q in query_args:            k = q.split('=')[0]            if k in self.special_params:                if params_found:                    buf += '&%s' % q                else:                    buf += '?%s' % q                params_found = True        return bufclass S3Admin():    def __init__(self):        self.access_key = '' #填access_key        self.secret_key = '' #填secret_key        self.endpoint = 's3.ceph.work'  #填endpoint    def get_bucket_usage(self,bucketname):        #url = 'http://%s/%s/' % (self.endpoint, bucketname) #path style        url = 'http://%s.%s/' % (bucketname,self.endpoint) #virtual hosted styple        r = requests.head(url, auth=S3Auth(self.access_key, self.secret_key, self.endpoint))        print r.headers        return r.headerss3client = S3Admin()bucket_name= 'xxx'  #替换成相应的bucket名称result = s3client.get_bucket_usage(bucket_name)print 'objects_num= %s , total_Bytes_Used= %s ' % (result['X-RGW-Object-Count'],result['X-RGW-Bytes-Used'])#注意 objects_num 为当前bucket内的object数量,total_Bytes_Used为当前bucket内的已用容量(单位为Byte)

以上是"python如何实现查询bucket已用量脚本"这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注行业资讯频道!

0