python如何实现查询bucket已用量脚本
发表于:2025-01-31 作者:千家信息网编辑
千家信息网最后更新 2025年01月31日,小编给大家分享一下python如何实现查询bucket已用量脚本,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!目前仅支持c
千家信息网最后更新 2025年01月31日python如何实现查询bucket已用量脚本
小编给大家分享一下python如何实现查询bucket已用量脚本,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!
目前仅支持ceph的s3方案,具体配置看说明
# -*- coding: utf-8 -*-import requestsimport jsonfrom email.utils import formatdateimport hmacpy3k = Falsefrom hashlib import sha1 as shatry: from urlparse import urlparse from base64 import encodestringexcept: py3k = True from urllib.parse import urlparse from base64 import encodebytes as encodestringclass AuthBase(object): """Base class that all auth implementations derive from""" def __call__(self, r): raise NotImplementedError('Auth hooks must be callable.')class S3Auth(AuthBase): """Attaches AWS Authentication to the given Request object.""" service_base_url = 's3.amazonaws.com' # List of Query String Arguments of Interest special_params = [ 'acl', 'location', 'logging', 'partNumber', 'policy', 'requestPayment', 'torrent', 'versioning', 'versionId', 'versions', 'website', 'uploads', 'uploadId', 'response-content-type', 'response-content-language', 'response-expires', 'response-cache-control', 'delete', 'lifecycle', 'response-content-disposition', 'response-content-encoding' ] def __init__(self, access_key, secret_key, service_url=None): if service_url: self.service_base_url = service_url self.access_key = str(access_key) self.secret_key = str(secret_key) self.au ="" def __call__(self, r): # Create date header if it is not created yet. if not 'date' in r.headers and not 'x-amz-date' in r.headers: r.headers['date'] = formatdate( timeval=None, localtime=False, usegmt=True) signature = self.get_signature(r) if py3k: signature = signature.decode('utf-8') r.headers['Authorization'] = 'AWS %s:%s' % (self.access_key, signature) self.au = r.headers # print self.au return r def get_signature(self, r): canonical_string = self.get_canonical_string( r.url, r.headers, r.method) if py3k: key = self.secret_key.encode('utf-8') msg = canonical_string.encode('utf-8') else: key = self.secret_key msg = canonical_string h = hmac.new(key, msg, digestmod=sha) return encodestring(h.digest()).strip() def get_canonical_string(self, url, headers, method): parsedurl = urlparse(url) objectkey = parsedurl.path[1:] query_args = sorted(parsedurl.query.split('&')) bucket = parsedurl.netloc[:-len(self.service_base_url)] if len(bucket) > 1: # remove last dot bucket = bucket[:-1] interesting_headers = { 'content-md5': '', 'content-type': '', 'date': ''} for key in headers: lk = key.lower() try: lk = lk.decode('utf-8') except: pass if headers[key] and (lk in interesting_headers.keys() or lk.startswith('x-amz-')): interesting_headers[lk] = headers[key].strip() # If x-amz-date is used it supersedes the date header. if not py3k: if 'x-amz-date' in interesting_headers: interesting_headers['date'] = '' else: if 'x-amz-date' in interesting_headers: interesting_headers['date'] = '' buf = '%s\n' % method for key in sorted(interesting_headers.keys()): val = interesting_headers[key] if key.startswith('x-amz-'): buf += '%s:%s\n' % (key, val) else: buf += '%s\n' % val # append the bucket if it exists if bucket != '': buf += '/%s' % bucket # add the objectkey. even if it doesn't exist, add the slash buf += '/%s' % objectkey params_found = False # handle special query string arguments for q in query_args: k = q.split('=')[0] if k in self.special_params: if params_found: buf += '&%s' % q else: buf += '?%s' % q params_found = True return bufclass S3Admin(): def __init__(self): self.access_key = '' #填access_key self.secret_key = '' #填secret_key self.endpoint = 's3.ceph.work' #填endpoint def get_bucket_usage(self,bucketname): #url = 'http://%s/%s/' % (self.endpoint, bucketname) #path style url = 'http://%s.%s/' % (bucketname,self.endpoint) #virtual hosted styple r = requests.head(url, auth=S3Auth(self.access_key, self.secret_key, self.endpoint)) print r.headers return r.headerss3client = S3Admin()bucket_name= 'xxx' #替换成相应的bucket名称result = s3client.get_bucket_usage(bucket_name)print 'objects_num= %s , total_Bytes_Used= %s ' % (result['X-RGW-Object-Count'],result['X-RGW-Bytes-Used'])#注意 objects_num 为当前bucket内的object数量,total_Bytes_Used为当前bucket内的已用容量(单位为Byte)
以上是"python如何实现查询bucket已用量脚本"这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注行业资讯频道!
篇文章
用量
脚本
查询
内容
不怎么
单位
名称
大部分
容量
数量
方案
更多
知识
行业
资讯
资讯频道
频道
utf-8
参考
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
三公开船软件开发
unturned开服务器
搭建手机游戏服务器
信息工程公司官网网络技术服务
r软件开发合同模板
服务器基板管理系统
labview 与数据库
工业自动化软件开发
天象网络技术有限公司规划
软件开发面试笔试题不及格
在网络安全管理中加强内防
用友输出外部数据的数据库
数据库审计系统市场
网络安全客户客服中心表
如何进行软件开发项目管理
常州塔式服务器价格
西安大数据库安全
小学生网络安全简单的内容
云端服务器数据安全
开平市中心医院网络安全等级保护
互联网支付科技公司
曙光天阔服务器的盖子怎么开
2018软考数据库好难
浦发银行软件开发黄帅
南京互联网科技
软件开发 pd 几个小时
诛仙3哪个服务器好
邮储银行软件开发面试
互联网科技平台同城 电话
计算机网络技术怎么学才学得好