千家信息网

Python定时库Apscheduler怎么用

发表于:2025-02-03 作者:千家信息网编辑
千家信息网最后更新 2025年02月03日,小编给大家分享一下Python定时库Apscheduler怎么用,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!在Python中需要执行定时任务,可以使用Apscheduler。Aps
千家信息网最后更新 2025年02月03日Python定时库Apscheduler怎么用

小编给大家分享一下Python定时库Apscheduler怎么用,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!

在Python中需要执行定时任务,可以使用Apscheduler。

Apscheduler是基于Quartz的Python定时任务框架,功能上跟Quartz一致,使用上跟Quartz也几乎一致。

核心的四个部分:

①触发器(trigger)、②作业存储(job store)、③执行器(executor)、④调度器(scheduler)

安装依赖:

pip install apscheduler

间隔时间调度:

from apscheduler.schedulers.blocking import BlockingSchedulerfrom datetime import datetime sched = BlockingScheduler()def test_job():    print(f'{datetime.now():%H:%M:%S} Test job') if __name__ == '__main__':     sched.add_job(test_job, 'interval', id='test', seconds=5)    sched.start()

也可以使用装饰器:

from apscheduler.schedulers.blocking import BlockingSchedulerfrom datetime import datetime  sched = BlockingScheduler()  def test_job():    print(f'{datetime.now():%H:%M:%S} Test job')  @sched.scheduled_job('interval', seconds=5)def test_decorator_job():    print(f'{datetime.now():%H:%M:%S} Test decorator job')  if __name__ == '__main__':     sched.add_job(test_job, 'interval', id='test', seconds=5)    sched.start()

运行结果:

BlockingScheduler()是调度器中的一种调度器

sched.add_job()是添加作业

sched.start()是开始任务

定时调度:

定时调度使用cron表达式进行,这里也带参数执行:

from apscheduler.schedulers.blocking import BlockingSchedulerfrom datetime import datetime scheduler = BlockingScheduler()  def test_args(x):    print (f'{datetime.now():%H:%M:%S} Test cron job', x)  if __name__ == '__main__':    scheduler.add_job(test_args, 'cron', args=('cron test',), second='*/5')    scheduler.start()

时间参数设置如下:

year (int|str) - 年,4位数字 month (int|str) - 月 (范围1-12) day (int|str) - 日 (范围1-31) week (int|str) - 周 (范围1-53) day_of_week (int|str) - 周内第几天或者星期几 (范围0-6 或者 mon,tue,wed,thu,fri,sat,sun) hour (int|str) - 时 (范围0-23) minute (int|str) - 分 (范围0-59) second (int|str) - 秒 (范围0-59) start_date (datetime|str) - 最早开始日期(包含) end_date (datetime|str) - 最晚结束时间(包含) timezone (datetime.tzinfo|str) - 指定时区

接下来说一下其中的调度器:

BlockingScheduler:适用于调度程序是进程中唯一运行的进程,调用start函数会阻塞当前线程,不能立即返回。BackgroundScheduler:适用于调度程序在应用程序的后台运行,调用start后主线程不会阻塞。AsyncIOScheduler:适用于使用了asyncio模块的应用程序。GeventScheduler:适用于使用gevent模块的应用程序。TwistedScheduler:适用于构建Twisted的应用程序。QtScheduler:适用于构建Qt的应用程序。

其中使用得比较多的是前三种调度器。

删除任务:

scheduler.remove_job('task_id')

停止任务:

scheduler.pause_job('task_id')

恢复任务:

scheduler.resume_job('task_id')

立即执行任务next_run_time:

scheduler.add_job(            test_job,            'interval',            minutes=5,            next_run_time=datetime.datetime.now()        )

看完了这篇文章,相信你对"Python定时库Apscheduler怎么用"有了一定的了解,如果想了解更多相关知识,欢迎关注行业资讯频道,感谢各位的阅读!

0