怎么用python + Element实现作业任务Job操作
发表于:2025-02-05 作者:千家信息网编辑
千家信息网最后更新 2025年02月05日,这篇文章主要讲解了"怎么用python + Element实现作业任务Job操作",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"怎么用python +
千家信息网最后更新 2025年02月05日怎么用python + Element实现作业任务Job操作
这篇文章主要讲解了"怎么用python + Element实现作业任务Job操作",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"怎么用python + Element实现作业任务Job操作"吧!
1.前端HTML和CSS
修改 删除 立即 周期-----每隔 分钟执行一次 立即创建 取消 立即 周期-----每隔 分钟执行一次 立即创建 取消
2.前端JS
3.Django代码
urls.py
文件内容
from django.conf.urls import patternsfrom home_application.temp import views as temp_viewfrom home_application.job import views as job_viewurlpatterns = patterns( 'home_application.views', (r'^$', job_view.job), (r'^get_biz_list/$', job_view.get_biz_list), (r'^get_bk_hosts/$', job_view.get_bk_hosts), (r'^temp_view/$', temp_view.TemplateView.as_view()), (r'^job_view/$', job_view.JobView.as_view()), ...)
job\views.py
文件内容
import jsonfrom django.views.generic import Viewfrom django.views.decorators.csrf import csrf_exemptfrom django.utils.decorators import method_decoratorfrom django.http import JsonResponsefrom common.mymako import render_mako_contextfrom home_application.models import Jobdef cc_search_host(): """ 根据条件查询主机 :return: """ url = "{0}/api/c/compapi/v2/cc/search_host/".format(BK_PAAS_HOST) kwargs = { "bk_app_code": APP_ID, "bk_app_secret": APP_TOKEN, "bk_username": "admin", "condition": [ { "bk_obj_id": "host", "fields": ["bk_host_name", "bk_host_innerip", "bk_host_id", "bk_cloud_id", ], }, { "bk_obj_id": "biz", "fields": ["bk_biz_id", "bk_biz_name"], } ], # "page": {"start": 0, "limit": 10} } response = requests.post(url=url, data=json.dumps(kwargs), verify=False) if response.status_code == 200: return json.loads(response.content).get("data")# 获取所有业务,测试OKdef cc_search_business(fields=[]): """ 获取业务列表 :param request: :return: """ client = get_client_by_user("admin") params = {'fields': fields} res = client.cc.search_business(**params) if res.get('code') == 0: return res.get('data', {}).get('info', [])def job(request): return render_mako_context(request, '/home_application/job_center.html')def get_biz_list(request): fields = ["bk_biz_id", "bk_biz_name"] return JsonResponse({"result": True, "data": data})def get_bk_hosts(request): from home_application.utils.cc_by_request import cc_search_host res_data = cc_search_host() return JsonResponse({"result": True, "data": res_data})class CsrfExemptView(View): @method_decorator(csrf_exempt) def dispatch(self, request, *args, **kwargs): return super(CsrfExemptView, self).dispatch(request, *args, **kwargs)class JobView(CsrfExemptView): def get(self, request, *args, **kwargs): search_type = request.GET.get("search_type") query_str = request.GET.get("query_str") try: job_query = Job.objects.all() except Exception: return JsonResponse({"result": False}) if search_type: job_query = job_query.filter(type=search_type) if query_str: job_query = job_query.filter(job_name__icontains=query_str) res_data = [i.to_dict() for i in job_query] return JsonResponse({"result": True, "data": res_data}) def post(self, request, *args, **kwargs): data = json.loads(request.body) job_obj = { "bk_biz_id": data.get("add_biz").split(":")[0], "bk_biz_name": data.get("add_biz").split(":")[1], "temp_id": data.get("add_temp").split(":")[0], "temp_name": data.get("add_temp").split(":")[1], "job_name": data.get("add_job_name"), "type": data.get("add_type"), "interval": data.get("add_interval"), "host_ip": data.get("add_host_ip"), } try: Job.objects.create(**job_obj) return JsonResponse({"result": True}) except Exception as e: print(e) return JsonResponse({"result": False}) def put(self, request, *args, **kwargs): data = json.loads(request.body) pk = data.get("pk") bk_biz_id = data.get("edit_bk_biz").split(":")[0] temp_id = data.get("edit_temp_name").split(":")[0] temp_name = data.get("edit_temp_name").split(":")[1] bk_biz_name = data.get("edit_bk_biz").split(":")[1] job_name = data.get("edit_job_name") type = data.get("edit_type") interval = data.get("edit_interval") host_ip = data.get("edit_host_ip") job_obj = { "bk_biz_id": bk_biz_id, "bk_biz_name": bk_biz_name, "temp_id": temp_id, "temp_name": temp_name, "job_name": job_name, "type": type, "interval": interval, "host_ip": host_ip, } try: Job.objects.filter(pk=pk).update(**job_obj) return JsonResponse({"result": True}) except Exception as e: return JsonResponse({"result": False}) def delete(self, request, *args, **kwargs): data = json.loads(request.body) pk = data.get("id") try: Job.objects.filter(pk=pk).delete() return JsonResponse({"result": True}) except Exception: return JsonResponse({"result": False})
models.py
文件内容
class Job(models.Model): bk_biz_id = models.CharField(u"业务ID", max_length=8, blank=True, null=True) bk_biz_name = models.CharField(u"业务名称", max_length=32, blank=True, null=True) temp_id = models.CharField(u"模板ID", max_length=8, blank=True, null=True) temp_name = models.CharField(u"模板名称", max_length=32, blank=True, null=True) job_name = models.CharField(u"作业名称", max_length=16, blank=True, null=True) type = models.CharField(u"类型", max_length=8, blank=True, null=True) interval = models.CharField(u"间隔时间", max_length=4, blank=True, null=True) host_ip = models.CharField(u"主机IP", max_length=32, blank=True, null=True) create_time = models.DateTimeField(u"创建时间", auto_now_add=True) def to_dict(self): return { "pk": self.pk, "bk_biz_id": self.bk_biz_id, "bk_biz_name": self.bk_biz_name, "temp_id": self.temp_id, "temp_name": self.temp_name, "job_name": self.job_name, "type": self.type, "interval": self.interval, "host_ip": self.host_ip, "create_time": parse_datetime_to_timestr(self.create_time), }
实现效果
感谢各位的阅读,以上就是"怎么用python + Element实现作业任务Job操作"的内容了,经过本文的学习后,相信大家对怎么用python + Element实现作业任务Job操作这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!
任务
作业
模板
业务
内容
名称
周期
成功
文件
学习
主机
前端
时间
类型
更新
代码
就是
思路
情况
效果
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
软件开发工程师入职介绍
ECS架构 软件开发
网络设备管理服务器
java 两个数据库
软件数据库设计应该谁写
湖北盛天网络技术加速器
服务器上传文件夹
杭州赛腾网络技术
国铁集团网络安全工作三同步
请论述数据库完整性概念
r星服务器要更新多久
服务器桌面不显示网络
今日头条服务器安装
c 添加窗口数据库
绍兴市网络安全教育平台官网
香港云服务器自助管理制度
百度nba没有球员数据库
网络技术名词
恒山系列鲲鹏服务器订购
遇到过网络安全问题怎么解决
软件开发方针如何写
阿里云轻量应用服务器搭建虚拟机
网络安全却连不上网怎么回事
绵阳 国企 软件开发
电脑数据库密码忘了怎么办
有关网络安全的建议的英文句子
学校网络安全应急预案范文
王牌战争怎样创造服务器教程
苏州python软件开发有用吗
c连接数据库教程