怎么用python+Element实现主机Host操作
发表于:2025-01-23 作者:千家信息网编辑
千家信息网最后更新 2025年01月23日,这篇文章主要讲解了"怎么用python+Element实现主机Host操作",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"怎么用python+Eleme
千家信息网最后更新 2025年01月23日怎么用python+Element实现主机Host操作
这篇文章主要讲解了"怎么用python+Element实现主机Host操作",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"怎么用python+Element实现主机Host操作"吧!
1.前端HTML和CSS
<%inherit file="/base.html"/>{{ scope.row.is_monitored ? "移除监控":"加入监控" }} 查看性能 查看状态
2.前端JS
3.Django代码
urls.py
文件内容
from django.conf.urls import patternsfrom home_application.temp import views as temp_viewfrom home_application.job import views as job_viewfrom home_application.host import views as host_viewfrom home_application.exam import views as exam_viewurlpatterns = patterns( 'home_application.views', (r'^$', job_view.job), (r'^dev-guide/$', 'dev_guide'), (r'^contactus/$', 'contactus'), (r'^api/test/$', 'test'), (r'^temp/$', 'temp'), (r'^host/$', host_view.host), (r'^status/$', host_view.status), (r'^host_view/$', host_view.HostView.as_view()), (r'^get_all_hosts/$', host_view.get_all_hosts), (r'^get_perform_data/$', host_view.get_perform_data), ...)
host\views.py
文件内容
import jsonfrom django.views.generic import Viewfrom django.views.decorators.csrf import csrf_exemptfrom django.utils.decorators import method_decoratorfrom django.http import JsonResponsefrom common.mymako import render_mako_contextfrom home_application.models import Host, LoadDatafrom home_application.utils.job_api import FastJobApiperform_script = """#!/bin/bashMEMORY=$(free -m | awk 'NR==2{printf "%.2f%%",$3*100/$2}')DISK=$(df -h| awk '$NF=="/"{printf "%s",$5}')CPU=$(top -bn1 | grep load | awk '{printf "%.2f%%",$(NF-2)}')DATE=$(date "+%Y-%m-%d %H:%M:%S")echo -e "$DATE|$MEMORY|$DISK|$CPU""""def host(request): return render_mako_context(request, '/home_application/host.html')def status(request): return render_mako_context(request, "/home_application/status.html")class CsrfExemptView(View): @method_decorator(csrf_exempt) def dispatch(self, request, *args, **kwargs): return super(CsrfExemptView, self).dispatch(request, *args, **kwargs)def get_all_hosts(request): from home_application.utils.cc_by_request import cc_search_host res_data = cc_search_host().get("info") for host in res_data: bk_host_innerip = host.get("host", {}).get("bk_host_innerip") bk_host_id = host.get("host", {}).get("bk_host_id") bk_host_name = host.get("host", {}).get("bk_host_name") bk_os_name = host.get("host", {}).get("bk_os_name") bk_biz_id = host.get("biz", [])[0].get("bk_biz_id") bk_biz_name = host.get("biz", [])[0].get("bk_biz_name") cloud_name = host.get("host", {}).get("bk_cloud_id")[0].get("bk_inst_name") cloud_id = host.get("host", {}).get("bk_cloud_id")[0].get("bk_inst_id") host_obj = { "ip": bk_host_innerip, "bk_biz_id": bk_biz_id, "os_name": bk_os_name, "host_name": bk_host_name, "bk_biz_name": bk_biz_name, "cloud_id": cloud_id, "cloud_name": cloud_name, } Host.objects.update_or_create(host_id=bk_host_id, defaults=host_obj) return JsonResponse({"result": res_data})@csrf_exemptdef get_perform_data(request): data = json.loads(request.body) host_id = data.get("host_id") try: obj = LoadData.objects.filter(host_id=host_id).last() if obj: res_data = { "cpu": obj.cpu, "mem": obj.mem, "disk": obj.disk, } return JsonResponse({"result": True, "data": res_data}) bk_biz_id = int(data.get("bk_biz_id")) ip = data.get("ip") cloud_id = data.get("cloud_id") # res_data = execute_script_and_return(request,ip_list, bk_biz_id, perform_script) obj = FastJobApi(bk_biz_id, cloud_id, ip, perform_script) job_res = obj.execute_script_and_return() if job_res: log_content = job_res[0].get("log_content") res_data = { "result": True, "cpu": log_content.split("|")[3], "mem": log_content.split("|")[1], "disk": log_content.split("|")[2], } return JsonResponse({"result": True, "data": res_data}) return JsonResponse({"result": False}) except Exception: return JsonResponse({"result": False})class HostView(CsrfExemptView): def get(self, request, *args, **kwargs): try: host_query = Host.objects.all() except Exception: return JsonResponse({"result": False}) search_biz_id = request.GET.get("search_biz_id") query_str = request.GET.get("query_str") if search_biz_id: host_query = host_query.filter(bk_biz_id=search_biz_id) if query_str: host_query = host_query.filter(ip__in=query_str.split(",")) host_query = host_query[:30] if host_query.count() > 10 else host_query res_data = [i.to_dict() for i in host_query] return JsonResponse({"result": True, "data": res_data}) def post(self, request, *args, **kwargs): try: data = json.loads(request.body) host_id = data.get("host_id") is_monitored = data.get("is_monitored") if is_monitored: Host.objects.filter(host_id=host_id).update(is_monitored=False) else: Host.objects.filter(host_id=host_id).update(is_monitored=True) return JsonResponse({"result": True}) except Exception: return JsonResponse({"result": False})
models.py
文件内容
from django.db import modelsfrom home_application.utils.parse_time import parse_datetime_to_timestrclass Host(models.Model): host_id = models.IntegerField(u"主机ID", primary_key=True, unique=True) ip = models.CharField(u"IP地址", max_length=32, blank=True, null=True) bk_biz_id = models.CharField(u"业务ID", max_length=16, blank=True, null=True) bk_biz_name = models.CharField(u"业务名称", max_length=512, blank=True, null=True) os_name = models.CharField(u"系统名", max_length=128, blank=True, null=True) host_name = models.CharField(u"主机名", max_length=128, blank=True, null=True) cloud_id = models.IntegerField(u"云区域ID", blank=True, null=True) cloud_name = models.CharField(u"云区域名称", max_length=32, blank=True, null=True) is_monitored = models.BooleanField(u"是否已监控", default=False) def to_dict(self): return { "host_id": self.host_id, "ip": self.ip, "pk": self.pk, "bk_biz_id": self.bk_biz_id, "bk_biz_name": self.bk_biz_name, "os_name": self.os_name, "host_name": self.host_name, "cloud_name": self.cloud_name, "cloud_id": self.cloud_id, "is_monitored": self.is_monitored, "mem_use": "-", "cpu_use": "-", "disk_use": "-", } def get_load_data(self): load_query = LoadData.objects.filter(host_id=self.pk).order_by("create_time") return [i.to_dict() for i in load_query]class LoadData(models.Model): host_id = models.IntegerField(u"主机ID", default=0) cpu = models.IntegerField(u"CPU使用率", default=0) mem = models.IntegerField(u"内存使用率", default=0) disk = models.IntegerField(u"硬盘使用率", default=0) create_time = models.DateTimeField(u"创建时间", auto_now_add=True) def to_dict(self): return { "host_id": self.host_id, "cpu": self.cpu, "mem": self.mem, "disk": self.disk, "create_time": parse_datetime_to_timestr(self.create_time) }
FastJobApi
部分代码
import base64import timefrom conf.default import APP_ID, APP_TOKENfrom blueking.component.shortcuts import get_client_by_user, get_client_by_requestcount = 0class FastJobApi(object): def __init__(self, bk_biz_id, bk_cloud_id, ip_list, script_content): self.client = get_client_by_user('admin') self.username = "admin" self.biz_id = bk_biz_id self.script_content = script_content self.ip_list = [{"bk_cloud_id": bk_cloud_id, "ip": i} for i in ip_list.split(",")] def _fast_execute_script(self, execute_account="root", param_content='', script_timeout=1000): """ 快速执行脚本 :param execute_account: 执行脚本的账户名 :param param_content: 执行脚本的参数 :param script_timeout: 脚本执行超时时间 :return: job_instance_id """ kwargs = { "bk_app_code": APP_ID, "bk_app_secret": APP_TOKEN, "bk_biz_id": self.biz_id, "bk_username": self.username, "script_content": base64.b64encode(self.script_content), "ip_list": self.ip_list, "script_type": 1, "account": execute_account, "script_param": base64.b64encode(param_content), "script_timeout": script_timeout } result = self.client.job.fast_execute_script(kwargs) if result["result"]: return result.get("data").get("job_instance_id") return False def _get_job_instance_status(self, task_id): """ 获取脚本执行状态 :param task_id: 执行脚本的 job_instance_id :return: """ global count count += 1 # 查询执行状态 resp = self.client.job.get_job_instance_status( bk_username=self.username, bk_biz_id=self.biz_id, job_instance_id=task_id ) if resp.get('data').get('is_finished'): count = 0 return True elif not resp.get('data').get('is_finished') and count <= 5: time.sleep(2) return self._get_job_instance_status(task_id) else: count = 0 return False def _get_job_instance_log(self, task_id): """ 查询作业日志 :param task_id: 执行脚本的 job_instance_id :return: """ if self._get_job_instance_status(task_id): resp = self.client.job.get_job_instance_log( job_instance_id=task_id, bk_biz_id=self.biz_id, bk_username='admin' ) return resp['data'][0]['step_results'][0]['ip_logs'] def execute_script_and_return(self): """ 执行脚本并获取脚本执行结果 :return: 脚本执行结果 """ job_instance_id = self._fast_execute_script() if job_instance_id: ip_logs = self._get_job_instance_log(job_instance_id) return ip_logs
实现效果
感谢各位的阅读,以上就是"怎么用python+Element实现主机Host操作"的内容了,经过本文的学习后,相信大家对怎么用python+Element实现主机Host操作这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!
主机
脚本
监控
内容
查询
业务
状态
队列
使用率
情况
文件
系统
学习
成功
代码
前端
区域
名称
时间
模板
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
协企网络技术有限公司怎么样
北京哪家服务器托管好
中医常用的数据库
linux服务器性能分析
杨浦区海航软件开发供应商机构
重庆钣金服务器机柜虚拟主机
互联网陕西西安科技大学
日本网络安全感
国家网络安全宣传周活动海报
dtl 数据库
excel数据库中单独提出
格雷5 服务器
对政府网络安全部门的调查问卷
数据库被lock
金融分析师和软件开发哪个好
怎么找代理服务器
技校网络技术专业
格力校招软件开发怎么样
货运软件开发定制
旅游网站设计数据库代码
软著的软件开发工具
区块链是数据库技术
重庆嵌入式软件开发
多人数据库
大众互联网科技
富仓网络技术
linux管理运维服务器
数据库迁移工具最新中文版
数据库分库分表怎么迁移
成都摩宝网络技术公司