千家信息网

kubernetes中python api的二次封装

发表于:2025-02-01 作者:千家信息网编辑
千家信息网最后更新 2025年02月01日,k8s python api二次封装pip install pprint kubernetesimport urllib3from pprint import pprintfrom kuberne
千家信息网最后更新 2025年02月01日kubernetes中python api的二次封装

k8s python api二次封装

pip install pprint   kubernetes
import urllib3from pprint import pprintfrom kubernetes import clientfrom os import pathimport yamlclass K8sApi(object):    def __init__(self):        # self.config = config.kube_config.load_kube_config()        urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)        self.configuration = client.Configuration()        self.configuration.host = "https://192.168.100.111:6443"         self.configuration.api_key[            'authorization'] = 'Bearer  token'        self.configuration.verify_ssl = False        self.k8s_apps_v1 = client.AppsV1Api(client.ApiClient(self.configuration))        self.Api_Instance = client.CoreV1Api(client.ApiClient(self.configuration))        self.Api_Instance_Extensions = client.ExtensionsV1beta1Api(client.ApiClient(self.configuration))    ####################################################################################################################    def list_deployment(self, namespace="default"):        api_response = self.k8s_apps_v1.list_namespaced_deployment(namespace)        return api_response    def read_deployment(self, name="nginx-deployment", namespace="default"):        api_response = self.k8s_apps_v1.read_namespaced_deployment(name, namespace)        return api_response    def create_deployment(self, file="deploy-nginx.yaml"):        with open(path.join(path.dirname(__file__), file)) as f:            dep = yaml.safe_load(f)            resp = self.k8s_apps_v1.create_namespaced_deployment(                body=dep, namespace="default")            return resp    def replace_deployment(self, file="deploy-nginx.yaml", name="nginx-deployment", namespace="default"):        with open(path.join(path.dirname(__file__), file)) as f:            dep = yaml.safe_load(f)            resp = self.k8s_apps_v1.replace_namespaced_deployment(name, namespace,                                                                  body=dep)            return resp    def delete_deployment(self, name="nginx-deployment", namespace="default"):        api_response = self.k8s_apps_v1.delete_namespaced_deployment(name, namespace)        return api_response    ####################################################################################################################    def list_namespace(self):        api_response = self.Api_Instance.list_namespace()        return api_response    def read_namespace(self, name="default"):        api_response = self.Api_Instance.read_namespace(name)        return api_response    def create_namespace(self, file="pod-nginx.yaml"):        with open(path.join(path.dirname(__file__), file)) as f:            dep = yaml.safe_load(f)            api_response = self.Api_Instance.create_namespace(body=dep)            return api_response    def replace_namespace(self, file="pod-nginx.yaml", name="default"):        with open(path.join(path.dirname(__file__), file)) as f:            dep = yaml.safe_load(f)        api_response = self.Api_Instance.replace_namespace(name, body=dep)        return api_response    def delete_namespace(self, name="default", namespace="default"):        api_response = self.Api_Instance.delete_namespace(name)        return api_response    ####################################################################################################################    def list_node(self):        api_response = self.Api_Instance.list_node()        data = {}        for i in api_response.items:            data[i.metadata.name] = {"name": i.metadata.name,                                     "status": i.status.conditions[-1].type if i.status.conditions[                                                                                   -1].status == "True" else "NotReady",                                     "ip": i.status.addresses[0].address,                                     "kubelet_version": i.status.node_info.kubelet_version,                                     "os_image": i.status.node_info.os_image,                                     }        return data    def list_pod(self):        api_response = self.Api_Instance.list_pod_for_all_namespaces()        data = {}        for i in api_response.items:            data[i.metadata.name] = {"ip": i.status.pod_ip, "namespace": i.metadata.namespace}        return data    def read_pod(self, name="nginx-pod", namespace="default"):        api_response = self.Api_Instance.read_namespaced_pod(name, namespace)        return api_response    def create_pod(self, file="pod-nginx.yaml", namespace="default"):        with open(path.join(path.dirname(__file__), file)) as f:            dep = yaml.safe_load(f)            api_response = self.Api_Instance.create_namespaced_pod(namespace, body=dep)            return api_response    def replace_pod(self, file="pod-nginx.yaml", name="nginx-pod", namespace="default"):        with open(path.join(path.dirname(__file__), file)) as f:            dep = yaml.safe_load(f)            api_response = self.Api_Instance.replace_namespaced_pod(name, namespace, body=dep)        return api_response    def delete_pod(self, name="nginx-pod", namespace="default"):        api_response = self.Api_Instance.delete_namespaced_pod(name, namespace)        return api_response    ####################################################################################################################    def list_service(self):        api_response = self.Api_Instance.list_service_for_all_namespaces()        return api_response    def read_service(self, name="", namespace="default"):        api_response = self.Api_Instance.read_namespaced_service(name, namespace)        return api_response    def create_service(self, file="service-nginx.yaml", namespace="default"):        with open(path.join(path.dirname(__file__), file)) as f:            dep = yaml.safe_load(f)            api_response = self.Api_Instance.create_namespaced_service(namespace, body=dep)            return api_response    def replace_service(self, file="pod-nginx.yaml", name="hequan", namespace="default"):        with open(path.join(path.dirname(__file__), file)) as f:            dep = yaml.safe_load(f)            api_response = self.Api_Instance.replace_namespaced_service(name, namespace, body=dep)        return api_response    def delete_service(self, name="hequan", namespace="default"):        api_response = self.Api_Instance.delete_namespaced_service(name, namespace)        return api_response    ####################################################################################################################    def list_ingress(self):        api_response = self.Api_Instance_Extensions.list_ingress_for_all_namespaces()        return api_response    def read_ingress(self, name="", namespace="default"):        api_response = self.Api_Instance_Extensions.read_namespaced_ingress(name, namespace)        return api_response    def create_ingress(self, file="ingress-nginx.yaml", namespace="default"):        with open(path.join(path.dirname(__file__), file)) as f:            dep = yaml.safe_load(f)            api_response = self.Api_Instance_Extensions.create_namespaced_ingress(namespace, body=dep)            return api_response    def replace_ingress(self, name="", file="ingress-nginx.yaml", namespace="default"):        with open(path.join(path.dirname(__file__), file)) as f:            dep = yaml.safe_load(f)            api_response = self.Api_Instance_Extensions.replace_namespaced_ingress(name=name, namespace=namespace,                                                                                   body=dep)            return api_response    def delete_ingress(self, name="hequan", namespace="default"):        api_response = self.Api_Instance_Extensions.delete_namespaced_ingress(name, namespace)        return api_response    #####################################################################################################################    def list_stateful(self):        api_response = self.k8s_apps_v1.list_stateful_set_for_all_namespaces()        return api_response    def read_stateful(self, name="nginx-deployment", namespace="default"):        api_response = self.k8s_apps_v1.read_namespaced_stateful_set(name, namespace)        return api_response    def create_stateful(self, file="deploy-nginx.yaml"):        with open(path.join(path.dirname(__file__), file)) as f:            dep = yaml.safe_load(f)            resp = self.k8s_apps_v1.create_namespaced_stateful_set(                body=dep, namespace="default")            return resp    def replace_stateful(self, file="deploy-nginx.yaml", name="nginx-deployment", namespace="default"):        with open(path.join(path.dirname(__file__), file)) as f:            dep = yaml.safe_load(f)            resp = self.k8s_apps_v1.replace_namespaced_stateful_set(name, namespace,                                                                    body=dep)            return resp    def delete_stateful(self, name="nginx-deployment", namespace="default"):        api_response = self.k8s_apps_v1.delete_namespaced_stateful_set(name, namespace)        return api_response    ####################################################################################################################if __name__ == '__main__':    def test():        obj = K8sApi()        ret = obj.list_deployment()        pprint(ret)    test()
0