千家信息网

怎么用python+Element实现主机Host操作

发表于:2025-01-23 作者:千家信息网编辑
千家信息网最后更新 2025年01月23日,这篇文章主要讲解了"怎么用python+Element实现主机Host操作",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"怎么用python+Eleme
千家信息网最后更新 2025年01月23日怎么用python+Element实现主机Host操作

这篇文章主要讲解了"怎么用python+Element实现主机Host操作",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"怎么用python+Element实现主机Host操作"吧!

1.前端HTML和CSS

<%inherit file="/base.html"/>
搜索

2.前端JS

3.Django代码

urls.py文件内容

from django.conf.urls import patternsfrom home_application.temp import views as temp_viewfrom home_application.job import views as job_viewfrom home_application.host import views as host_viewfrom home_application.exam import views as exam_viewurlpatterns = patterns(    'home_application.views',    (r'^$', job_view.job),    (r'^dev-guide/$', 'dev_guide'),    (r'^contactus/$', 'contactus'),    (r'^api/test/$', 'test'),    (r'^temp/$', 'temp'),    (r'^host/$', host_view.host),    (r'^status/$', host_view.status),    (r'^host_view/$', host_view.HostView.as_view()),    (r'^get_all_hosts/$', host_view.get_all_hosts),    (r'^get_perform_data/$', host_view.get_perform_data),    ...)

host\views.py文件内容

import jsonfrom django.views.generic import Viewfrom django.views.decorators.csrf import csrf_exemptfrom django.utils.decorators import method_decoratorfrom django.http import JsonResponsefrom common.mymako import render_mako_contextfrom home_application.models import Host, LoadDatafrom home_application.utils.job_api import FastJobApiperform_script = """#!/bin/bashMEMORY=$(free -m | awk 'NR==2{printf "%.2f%%",$3*100/$2}')DISK=$(df -h| awk '$NF=="/"{printf "%s",$5}')CPU=$(top -bn1 | grep load | awk '{printf "%.2f%%",$(NF-2)}')DATE=$(date "+%Y-%m-%d %H:%M:%S")echo -e "$DATE|$MEMORY|$DISK|$CPU""""def host(request):    return render_mako_context(request, '/home_application/host.html')def status(request):    return render_mako_context(request, "/home_application/status.html")class CsrfExemptView(View):    @method_decorator(csrf_exempt)    def dispatch(self, request, *args, **kwargs):        return super(CsrfExemptView, self).dispatch(request, *args, **kwargs)def get_all_hosts(request):    from home_application.utils.cc_by_request import cc_search_host    res_data = cc_search_host().get("info")    for host in res_data:        bk_host_innerip = host.get("host", {}).get("bk_host_innerip")        bk_host_id = host.get("host", {}).get("bk_host_id")        bk_host_name = host.get("host", {}).get("bk_host_name")        bk_os_name = host.get("host", {}).get("bk_os_name")        bk_biz_id = host.get("biz", [])[0].get("bk_biz_id")        bk_biz_name = host.get("biz", [])[0].get("bk_biz_name")        cloud_name = host.get("host", {}).get("bk_cloud_id")[0].get("bk_inst_name")        cloud_id = host.get("host", {}).get("bk_cloud_id")[0].get("bk_inst_id")        host_obj = {            "ip": bk_host_innerip,            "bk_biz_id": bk_biz_id,            "os_name": bk_os_name,            "host_name": bk_host_name,            "bk_biz_name": bk_biz_name,            "cloud_id": cloud_id,            "cloud_name": cloud_name,        }        Host.objects.update_or_create(host_id=bk_host_id, defaults=host_obj)    return JsonResponse({"result": res_data})@csrf_exemptdef get_perform_data(request):    data = json.loads(request.body)    host_id = data.get("host_id")    try:        obj = LoadData.objects.filter(host_id=host_id).last()        if obj:            res_data = {                "cpu": obj.cpu,                "mem": obj.mem,                "disk": obj.disk,            }            return JsonResponse({"result": True, "data": res_data})        bk_biz_id = int(data.get("bk_biz_id"))        ip = data.get("ip")        cloud_id = data.get("cloud_id")        # res_data = execute_script_and_return(request,ip_list, bk_biz_id, perform_script)        obj = FastJobApi(bk_biz_id, cloud_id, ip, perform_script)        job_res = obj.execute_script_and_return()        if job_res:            log_content = job_res[0].get("log_content")            res_data = {                "result": True,                "cpu": log_content.split("|")[3],                "mem": log_content.split("|")[1],                "disk": log_content.split("|")[2],            }            return JsonResponse({"result": True, "data": res_data})        return JsonResponse({"result": False})    except Exception:        return JsonResponse({"result": False})class HostView(CsrfExemptView):    def get(self, request, *args, **kwargs):        try:            host_query = Host.objects.all()        except Exception:            return JsonResponse({"result": False})        search_biz_id = request.GET.get("search_biz_id")        query_str = request.GET.get("query_str")        if search_biz_id:            host_query = host_query.filter(bk_biz_id=search_biz_id)        if query_str:            host_query = host_query.filter(ip__in=query_str.split(","))        host_query = host_query[:30] if host_query.count() > 10 else host_query        res_data = [i.to_dict() for i in host_query]        return JsonResponse({"result": True, "data": res_data})    def post(self, request, *args, **kwargs):        try:            data = json.loads(request.body)            host_id = data.get("host_id")            is_monitored = data.get("is_monitored")            if is_monitored:                Host.objects.filter(host_id=host_id).update(is_monitored=False)            else:                Host.objects.filter(host_id=host_id).update(is_monitored=True)            return JsonResponse({"result": True})        except Exception:            return JsonResponse({"result": False})

models.py文件内容

from django.db import modelsfrom home_application.utils.parse_time import parse_datetime_to_timestrclass Host(models.Model):    host_id = models.IntegerField(u"主机ID", primary_key=True, unique=True)    ip = models.CharField(u"IP地址", max_length=32, blank=True, null=True)    bk_biz_id = models.CharField(u"业务ID", max_length=16, blank=True, null=True)    bk_biz_name = models.CharField(u"业务名称", max_length=512, blank=True, null=True)    os_name = models.CharField(u"系统名", max_length=128, blank=True, null=True)    host_name = models.CharField(u"主机名", max_length=128, blank=True, null=True)    cloud_id = models.IntegerField(u"云区域ID", blank=True, null=True)    cloud_name = models.CharField(u"云区域名称", max_length=32, blank=True, null=True)    is_monitored = models.BooleanField(u"是否已监控", default=False)    def to_dict(self):        return {            "host_id": self.host_id,            "ip": self.ip,            "pk": self.pk,            "bk_biz_id": self.bk_biz_id,            "bk_biz_name": self.bk_biz_name,            "os_name": self.os_name,            "host_name": self.host_name,            "cloud_name": self.cloud_name,            "cloud_id": self.cloud_id,            "is_monitored": self.is_monitored,            "mem_use": "-",            "cpu_use": "-",            "disk_use": "-",        }    def get_load_data(self):        load_query = LoadData.objects.filter(host_id=self.pk).order_by("create_time")        return [i.to_dict() for i in load_query]class LoadData(models.Model):    host_id = models.IntegerField(u"主机ID", default=0)    cpu = models.IntegerField(u"CPU使用率", default=0)    mem = models.IntegerField(u"内存使用率", default=0)    disk = models.IntegerField(u"硬盘使用率", default=0)    create_time = models.DateTimeField(u"创建时间", auto_now_add=True)    def to_dict(self):        return {            "host_id": self.host_id,            "cpu": self.cpu,            "mem": self.mem,            "disk": self.disk,            "create_time": parse_datetime_to_timestr(self.create_time)        }

FastJobApi部分代码

import base64import timefrom conf.default import APP_ID, APP_TOKENfrom blueking.component.shortcuts import get_client_by_user, get_client_by_requestcount = 0class FastJobApi(object):    def __init__(self, bk_biz_id, bk_cloud_id, ip_list, script_content):        self.client = get_client_by_user('admin')        self.username = "admin"        self.biz_id = bk_biz_id        self.script_content = script_content        self.ip_list = [{"bk_cloud_id": bk_cloud_id, "ip": i} for i in ip_list.split(",")]    def _fast_execute_script(self, execute_account="root", param_content='', script_timeout=1000):        """        快速执行脚本        :param execute_account: 执行脚本的账户名        :param param_content: 执行脚本的参数        :param script_timeout: 脚本执行超时时间        :return: job_instance_id        """        kwargs = {            "bk_app_code": APP_ID,            "bk_app_secret": APP_TOKEN,            "bk_biz_id": self.biz_id,            "bk_username": self.username,            "script_content": base64.b64encode(self.script_content),            "ip_list": self.ip_list,            "script_type": 1,            "account": execute_account,            "script_param": base64.b64encode(param_content),            "script_timeout": script_timeout        }        result = self.client.job.fast_execute_script(kwargs)        if result["result"]:            return result.get("data").get("job_instance_id")        return False    def _get_job_instance_status(self, task_id):        """        获取脚本执行状态        :param task_id: 执行脚本的 job_instance_id        :return:        """        global count        count += 1        # 查询执行状态        resp = self.client.job.get_job_instance_status(            bk_username=self.username,            bk_biz_id=self.biz_id,            job_instance_id=task_id        )        if resp.get('data').get('is_finished'):            count = 0            return True        elif not resp.get('data').get('is_finished') and count <= 5:            time.sleep(2)            return self._get_job_instance_status(task_id)        else:            count = 0            return False    def _get_job_instance_log(self, task_id):        """        查询作业日志        :param task_id: 执行脚本的 job_instance_id        :return:        """        if self._get_job_instance_status(task_id):            resp = self.client.job.get_job_instance_log(                job_instance_id=task_id,                bk_biz_id=self.biz_id,                bk_username='admin'            )            return resp['data'][0]['step_results'][0]['ip_logs']    def execute_script_and_return(self):        """        执行脚本并获取脚本执行结果        :return: 脚本执行结果        """        job_instance_id = self._fast_execute_script()        if job_instance_id:            ip_logs = self._get_job_instance_log(job_instance_id)            return ip_logs

实现效果

感谢各位的阅读,以上就是"怎么用python+Element实现主机Host操作"的内容了,经过本文的学习后,相信大家对怎么用python+Element实现主机Host操作这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!

0