APScheduler
即可以单独使用,也可以用于常见的
Web
框架。
高级
Python
调度程序(
APScheduler
)一个
Python
库,可让您安排稍后执行的
Python
代码,可以是一次,也可以是定期执行。您可以根据需要随时添加新作业或删除旧作业。如果将作业存储在数据库中,它们也将在调度程序重新启动后继续运行并保持其状态。重新启动调度程序后,它将运行它应该在脱机时运行的所有作业。
APScheduler
django-apscheduler
scheduler, job store(s), executor(s) and trigger(s)
apscheduler.events
apscheduler.executors.base
apscheduler.executors.debug
apscheduler.executors.pool
- 线程池执行器、进程池执行器
apscheduler.executors.asyncio
- asyncio 程序执行器
apscheduler.executors.gevent
- Gevent 程序执行器
apscheduler.executors.twisted
- Twisted 程序执行器
apscheduler.schedulers.base
apscheduler.schedulers.blocking
- 适用于调度程序是进程中唯一运行的进程,调用 start 函数会阻塞当前线程,不能立即返回
apscheduler.schedulers.background
- 适用于调度程序在应用程序的后台运行,调用 start 后主线程不会阻塞
apscheduler.schedulers.asyncio
- 适用于使用了 Asyncio 模块的应用程序
apscheduler.schedulers.gevent
- 适用于使用 Gevent 模块的应用程序
apscheduler.schedulers.tornado
- 适用于构建 Tornado 的应用程序
apscheduler.schedulers.twisted
- 适用于构建 Twisted 的应用程序
APScheduler
提供了许多不同的方法来配置任务调度程序(scheduler
),这样就可以在任何环境中获得最大的灵活性。你可以使用字典配置;你也可以将选项作为关键字参数传递;你还可以先实例化调度程序,然后添加作业并配置调度程序。
以下命令将为你创建一个名为 scheduler
的后台执行的任务调度程序,默认将任务存储在 MemoryJobStore
内存中,使用的是 ThreadPoolExecutor
线程池且默认最大线程数为 10
个。
from apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler()
- 下面是三种添加任务调度程序(
scheduler
)配置的三种方式,我推荐使用第一种,因为其更加的直观且易于后续代码的修改的调整,所以其余两种我就不展示代码了。
- a MongoDBJobStore named “mongo”
- an SQLAlchemyJobStore named “default” (using SQLite)
- a ThreadPoolExecutor named “default”, with a worker count of 20
- a ProcessPoolExecutor named “processpool”, with a worker count of 5
- UTC as the scheduler’s timezone
- coalescing turned off for new jobs by default
- a default maximum instance limit of 3 for new jobs
from pytz import utc
from datetime import datetime
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
jobstores = {
'mongo': MongoDBJobStore(),
'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
executors = {
'default': ThreadPoolExecutor(20),
'processpool': ProcessPoolExecutor(5)
job_defaults = {
'coalesce': False,
'max_instances': 3
def tick():
print('Tick! The time is: %s' % datetime.now())
if __name__ == '__main__':
scheduler = BackgroundScheduler()
scheduler.configure(
jobstores=jobstores,
executors=executors,
job_defaults=job_defaults,
timezone=utc
scheduler.add_executor('processpool')
scheduler.add_job(tick, 'interval', seconds=3)
try:
print('Press Ctrl+C to exit ...')
scheduler.start()
except (KeyboardInterrupt, SystemExit):