千家信息网

怎么用python + Element实现作业任务Job操作

发表于:2025-02-05 作者:千家信息网编辑
千家信息网最后更新 2025年02月05日,这篇文章主要讲解了"怎么用python + Element实现作业任务Job操作",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"怎么用python +
千家信息网最后更新 2025年02月05日怎么用python + Element实现作业任务Job操作

这篇文章主要讲解了"怎么用python + Element实现作业任务Job操作",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"怎么用python + Element实现作业任务Job操作"吧!

1.前端HTML和CSS

搜索 添加
立即 周期-----每隔 分钟执行一次 立即创建 取消 立即 周期-----每隔 分钟执行一次 立即创建 取消

2.前端JS

3.Django代码

urls.py文件内容

from django.conf.urls import patternsfrom home_application.temp import views as temp_viewfrom home_application.job import views as job_viewurlpatterns = patterns(    'home_application.views',    (r'^$', job_view.job),    (r'^get_biz_list/$', job_view.get_biz_list),    (r'^get_bk_hosts/$', job_view.get_bk_hosts),    (r'^temp_view/$', temp_view.TemplateView.as_view()),    (r'^job_view/$', job_view.JobView.as_view()),    ...)

job\views.py文件内容

import jsonfrom django.views.generic import Viewfrom django.views.decorators.csrf import csrf_exemptfrom django.utils.decorators import method_decoratorfrom django.http import JsonResponsefrom common.mymako import render_mako_contextfrom home_application.models import Jobdef cc_search_host():    """    根据条件查询主机    :return:    """    url = "{0}/api/c/compapi/v2/cc/search_host/".format(BK_PAAS_HOST)    kwargs = {        "bk_app_code": APP_ID,        "bk_app_secret": APP_TOKEN,        "bk_username": "admin",        "condition": [            {                "bk_obj_id": "host",                "fields": ["bk_host_name", "bk_host_innerip", "bk_host_id", "bk_cloud_id", ],            },            {                "bk_obj_id": "biz",                "fields": ["bk_biz_id", "bk_biz_name"],            }        ],        # "page": {"start": 0, "limit": 10}    }    response = requests.post(url=url, data=json.dumps(kwargs), verify=False)    if response.status_code == 200:        return json.loads(response.content).get("data")# 获取所有业务,测试OKdef cc_search_business(fields=[]):    """    获取业务列表    :param request:    :return:    """    client = get_client_by_user("admin")    params = {'fields': fields}    res = client.cc.search_business(**params)    if res.get('code') == 0:        return res.get('data', {}).get('info', [])def job(request):    return render_mako_context(request, '/home_application/job_center.html')def get_biz_list(request):    fields = ["bk_biz_id", "bk_biz_name"]    return JsonResponse({"result": True, "data": data})def get_bk_hosts(request):    from home_application.utils.cc_by_request import cc_search_host    res_data = cc_search_host()    return JsonResponse({"result": True, "data": res_data})class CsrfExemptView(View):    @method_decorator(csrf_exempt)    def dispatch(self, request, *args, **kwargs):        return super(CsrfExemptView, self).dispatch(request, *args, **kwargs)class JobView(CsrfExemptView):    def get(self, request, *args, **kwargs):        search_type = request.GET.get("search_type")        query_str = request.GET.get("query_str")        try:            job_query = Job.objects.all()        except Exception:            return JsonResponse({"result": False})        if search_type:            job_query = job_query.filter(type=search_type)        if query_str:            job_query = job_query.filter(job_name__icontains=query_str)        res_data = [i.to_dict() for i in job_query]        return JsonResponse({"result": True, "data": res_data})    def post(self, request, *args, **kwargs):        data = json.loads(request.body)        job_obj = {            "bk_biz_id": data.get("add_biz").split(":")[0],            "bk_biz_name": data.get("add_biz").split(":")[1],            "temp_id": data.get("add_temp").split(":")[0],            "temp_name": data.get("add_temp").split(":")[1],            "job_name": data.get("add_job_name"),            "type": data.get("add_type"),            "interval": data.get("add_interval"),            "host_ip": data.get("add_host_ip"),        }        try:            Job.objects.create(**job_obj)            return JsonResponse({"result": True})        except Exception as e:            print(e)            return JsonResponse({"result": False})    def put(self, request, *args, **kwargs):        data = json.loads(request.body)        pk = data.get("pk")        bk_biz_id = data.get("edit_bk_biz").split(":")[0]        temp_id = data.get("edit_temp_name").split(":")[0]        temp_name = data.get("edit_temp_name").split(":")[1]        bk_biz_name = data.get("edit_bk_biz").split(":")[1]        job_name = data.get("edit_job_name")        type = data.get("edit_type")        interval = data.get("edit_interval")        host_ip = data.get("edit_host_ip")        job_obj = {            "bk_biz_id": bk_biz_id,            "bk_biz_name": bk_biz_name,            "temp_id": temp_id,            "temp_name": temp_name,            "job_name": job_name,            "type": type,            "interval": interval,            "host_ip": host_ip,        }        try:            Job.objects.filter(pk=pk).update(**job_obj)            return JsonResponse({"result": True})        except Exception as e:            return JsonResponse({"result": False})    def delete(self, request, *args, **kwargs):        data = json.loads(request.body)        pk = data.get("id")        try:            Job.objects.filter(pk=pk).delete()            return JsonResponse({"result": True})        except Exception:            return JsonResponse({"result": False})

models.py文件内容

class Job(models.Model):    bk_biz_id = models.CharField(u"业务ID", max_length=8, blank=True, null=True)    bk_biz_name = models.CharField(u"业务名称", max_length=32, blank=True, null=True)    temp_id = models.CharField(u"模板ID", max_length=8, blank=True, null=True)    temp_name = models.CharField(u"模板名称", max_length=32, blank=True, null=True)    job_name = models.CharField(u"作业名称", max_length=16, blank=True, null=True)    type = models.CharField(u"类型", max_length=8, blank=True, null=True)    interval = models.CharField(u"间隔时间", max_length=4, blank=True, null=True)    host_ip = models.CharField(u"主机IP", max_length=32, blank=True, null=True)    create_time = models.DateTimeField(u"创建时间", auto_now_add=True)    def to_dict(self):        return {            "pk": self.pk,            "bk_biz_id": self.bk_biz_id,            "bk_biz_name": self.bk_biz_name,            "temp_id": self.temp_id,            "temp_name": self.temp_name,            "job_name": self.job_name,            "type": self.type,            "interval": self.interval,            "host_ip": self.host_ip,            "create_time": parse_datetime_to_timestr(self.create_time),        }

实现效果

感谢各位的阅读,以上就是"怎么用python + Element实现作业任务Job操作"的内容了,经过本文的学习后,相信大家对怎么用python + Element实现作业任务Job操作这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!

0