kubernetes中python api的二次封装
发表于:2025-02-01 作者:千家信息网编辑
千家信息网最后更新 2025年02月01日,k8s python api二次封装pip install pprint kubernetesimport urllib3from pprint import pprintfrom kuberne
千家信息网最后更新 2025年02月01日kubernetes中python api的二次封装
k8s python api二次封装
pip install pprint kubernetes
import urllib3from pprint import pprintfrom kubernetes import clientfrom os import pathimport yamlclass K8sApi(object): def __init__(self): # self.config = config.kube_config.load_kube_config() urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) self.configuration = client.Configuration() self.configuration.host = "https://192.168.100.111:6443" self.configuration.api_key[ 'authorization'] = 'Bearer token' self.configuration.verify_ssl = False self.k8s_apps_v1 = client.AppsV1Api(client.ApiClient(self.configuration)) self.Api_Instance = client.CoreV1Api(client.ApiClient(self.configuration)) self.Api_Instance_Extensions = client.ExtensionsV1beta1Api(client.ApiClient(self.configuration)) #################################################################################################################### def list_deployment(self, namespace="default"): api_response = self.k8s_apps_v1.list_namespaced_deployment(namespace) return api_response def read_deployment(self, name="nginx-deployment", namespace="default"): api_response = self.k8s_apps_v1.read_namespaced_deployment(name, namespace) return api_response def create_deployment(self, file="deploy-nginx.yaml"): with open(path.join(path.dirname(__file__), file)) as f: dep = yaml.safe_load(f) resp = self.k8s_apps_v1.create_namespaced_deployment( body=dep, namespace="default") return resp def replace_deployment(self, file="deploy-nginx.yaml", name="nginx-deployment", namespace="default"): with open(path.join(path.dirname(__file__), file)) as f: dep = yaml.safe_load(f) resp = self.k8s_apps_v1.replace_namespaced_deployment(name, namespace, body=dep) return resp def delete_deployment(self, name="nginx-deployment", namespace="default"): api_response = self.k8s_apps_v1.delete_namespaced_deployment(name, namespace) return api_response #################################################################################################################### def list_namespace(self): api_response = self.Api_Instance.list_namespace() return api_response def read_namespace(self, name="default"): api_response = self.Api_Instance.read_namespace(name) return api_response def create_namespace(self, file="pod-nginx.yaml"): with open(path.join(path.dirname(__file__), file)) as f: dep = yaml.safe_load(f) api_response = self.Api_Instance.create_namespace(body=dep) return api_response def replace_namespace(self, file="pod-nginx.yaml", name="default"): with open(path.join(path.dirname(__file__), file)) as f: dep = yaml.safe_load(f) api_response = self.Api_Instance.replace_namespace(name, body=dep) return api_response def delete_namespace(self, name="default", namespace="default"): api_response = self.Api_Instance.delete_namespace(name) return api_response #################################################################################################################### def list_node(self): api_response = self.Api_Instance.list_node() data = {} for i in api_response.items: data[i.metadata.name] = {"name": i.metadata.name, "status": i.status.conditions[-1].type if i.status.conditions[ -1].status == "True" else "NotReady", "ip": i.status.addresses[0].address, "kubelet_version": i.status.node_info.kubelet_version, "os_image": i.status.node_info.os_image, } return data def list_pod(self): api_response = self.Api_Instance.list_pod_for_all_namespaces() data = {} for i in api_response.items: data[i.metadata.name] = {"ip": i.status.pod_ip, "namespace": i.metadata.namespace} return data def read_pod(self, name="nginx-pod", namespace="default"): api_response = self.Api_Instance.read_namespaced_pod(name, namespace) return api_response def create_pod(self, file="pod-nginx.yaml", namespace="default"): with open(path.join(path.dirname(__file__), file)) as f: dep = yaml.safe_load(f) api_response = self.Api_Instance.create_namespaced_pod(namespace, body=dep) return api_response def replace_pod(self, file="pod-nginx.yaml", name="nginx-pod", namespace="default"): with open(path.join(path.dirname(__file__), file)) as f: dep = yaml.safe_load(f) api_response = self.Api_Instance.replace_namespaced_pod(name, namespace, body=dep) return api_response def delete_pod(self, name="nginx-pod", namespace="default"): api_response = self.Api_Instance.delete_namespaced_pod(name, namespace) return api_response #################################################################################################################### def list_service(self): api_response = self.Api_Instance.list_service_for_all_namespaces() return api_response def read_service(self, name="", namespace="default"): api_response = self.Api_Instance.read_namespaced_service(name, namespace) return api_response def create_service(self, file="service-nginx.yaml", namespace="default"): with open(path.join(path.dirname(__file__), file)) as f: dep = yaml.safe_load(f) api_response = self.Api_Instance.create_namespaced_service(namespace, body=dep) return api_response def replace_service(self, file="pod-nginx.yaml", name="hequan", namespace="default"): with open(path.join(path.dirname(__file__), file)) as f: dep = yaml.safe_load(f) api_response = self.Api_Instance.replace_namespaced_service(name, namespace, body=dep) return api_response def delete_service(self, name="hequan", namespace="default"): api_response = self.Api_Instance.delete_namespaced_service(name, namespace) return api_response #################################################################################################################### def list_ingress(self): api_response = self.Api_Instance_Extensions.list_ingress_for_all_namespaces() return api_response def read_ingress(self, name="", namespace="default"): api_response = self.Api_Instance_Extensions.read_namespaced_ingress(name, namespace) return api_response def create_ingress(self, file="ingress-nginx.yaml", namespace="default"): with open(path.join(path.dirname(__file__), file)) as f: dep = yaml.safe_load(f) api_response = self.Api_Instance_Extensions.create_namespaced_ingress(namespace, body=dep) return api_response def replace_ingress(self, name="", file="ingress-nginx.yaml", namespace="default"): with open(path.join(path.dirname(__file__), file)) as f: dep = yaml.safe_load(f) api_response = self.Api_Instance_Extensions.replace_namespaced_ingress(name=name, namespace=namespace, body=dep) return api_response def delete_ingress(self, name="hequan", namespace="default"): api_response = self.Api_Instance_Extensions.delete_namespaced_ingress(name, namespace) return api_response ##################################################################################################################### def list_stateful(self): api_response = self.k8s_apps_v1.list_stateful_set_for_all_namespaces() return api_response def read_stateful(self, name="nginx-deployment", namespace="default"): api_response = self.k8s_apps_v1.read_namespaced_stateful_set(name, namespace) return api_response def create_stateful(self, file="deploy-nginx.yaml"): with open(path.join(path.dirname(__file__), file)) as f: dep = yaml.safe_load(f) resp = self.k8s_apps_v1.create_namespaced_stateful_set( body=dep, namespace="default") return resp def replace_stateful(self, file="deploy-nginx.yaml", name="nginx-deployment", namespace="default"): with open(path.join(path.dirname(__file__), file)) as f: dep = yaml.safe_load(f) resp = self.k8s_apps_v1.replace_namespaced_stateful_set(name, namespace, body=dep) return resp def delete_stateful(self, name="nginx-deployment", namespace="default"): api_response = self.k8s_apps_v1.delete_namespaced_stateful_set(name, namespace) return api_response ####################################################################################################################if __name__ == '__main__': def test(): obj = K8sApi() ret = obj.list_deployment() pprint(ret) test()
封装
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
招银网络技术面
excel服务器有免费版本吗
敏捷团队的首选软件开发工具
软件开发目前市场
数据库查询一列的最大值
佛山广发银行软件开发中心
网展科技-让互联网更真实
莱山区管理系统软件开发推荐
万方数据库医学论文
电子邮件算不算网络安全威胁
软件开发成本要如何计算
深度三六零网络安全
压缩包下载软件开发
服务器隔音板怎么安装
互联网人才科技联合体
网络安全3大公害
第三方软件开发哪家正规
教育产品软件开发招聘信息
南京商邻互联网科技靠谱么
计算机网络技术的应用的毕业设计
诛仙手游官方服务器
我会软件开发的英语
建行深分软件开发中心
网络技术网络词
苹果注册未找到指定服务器
达芬奇为数据库建立新的文件夹
滨州询比价采购软件开发公司
织梦链接数据库的页面在哪
服务器端渲染
常用数据库的比较