前言
本文介绍的内容均是以程序中只启动了一个调度器为前提下。多个调度器在某些配置项上要多个参数指定调度器。
一、Flask_Apscheduler与Apscheduler
Apscheduler: 全名:Advanced Python Scheduler,是Python的一个定时任务框架,能按指定规则时间执行任务(python的函数),并能持久化任务至数据库,实现对定时任务的动态增、删、改、查操作。具备了一个合格定时器该有的所有功能。
Flask_Apscheduler: 是Flask框架的一个扩展库,增加了Flask对apScheduler的支持,即基于Flask框架使用习惯,稍微调整了一个Apscheduler的用法(配置导入,定时器启动,上下文管理),基本原理的概念与Apscheduler一致。
总结: 想要了解定时器框架的原理和用法,百度直接搜索Apscheduler即可。Flask_Apscheduler只是用于flask框架项目中时,部分方法有稍微变动而已,影响不大。
二、Apscheduler介绍
pip install apscheduler
2.1 四个基础组件
2.1.1 调度器(schedulers)
配置任务存储器和执行器都可以在调度器中完成,并对任务进行增、删、改、查操作,调度器通常是只有一个。Apscheduler提供的调度器有7种,我只用过 BlockingScheduler 和 BackgroundScheduler 。
BlockingScheduler : 调度器在当前进程的主线程中运行,也就是会阻塞当前线程。主进程调度器
from apscheduler.schedulers.blocking import BlockingScheduler
def function():
print("定时任务跑起来了")
# 定义主进程调度器
scheduler = BlockingScheduler()
# 添加任务,间隔5s执行一次func()函数。 看不懂没关系后面会介绍
scheduler.add_job(func=function, trigger="interval", seconds=5)
# 启动定时器
scheduler.start()
后台运行调度器
from apscheduler.schedulers.background import BackgroundScheduler
import time
def function():
print("定时任务跑起来了")
# 定义后台执行调度器
scheduler = BackgroundScheduler()
# 添加任务,间隔5s执行一次func()函数。 看不懂没关系后面会介绍
scheduler.add_job(func=function, trigger="interval", seconds=5)
# 启动定时器
scheduler.start()
# 使主程序不关闭
while True:
print(time.sleep(9999))
2.1.2 任务存储器(job stores)
任务存储器是可以存储任务的地方,默认情况下任务保存在内存,服务重启后任务会消失,也可将任务保存在各种数据库中(mysql,redis,mongodb)。任务存储进去后,会进行序列化,然后也可以反序列化提取出来,继续执行。
# 数据持久化至Mysql
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
# 存储到sql文件
jobstores = {
'default': SQLAlchemyJobStore(url="sqlite:///jobs.sqlite")
# 存储到现有的数据库
jobstores = {
'default': SQLAlchemyJobStore(url="mysql://root:[email protected]:3306/sql'")
# 数据持久化至MongoDB
from apscheduler.jobstores.mongodb import MongoDBJobStore
jobstores = {
'default': MongoDBJobStore(host="127.0.0.1",port=27017, database="apscheduler", collection="jobs")
# 数据持久化至redis
from apscheduler.jobstores.redis import RedisJobStore
jobstores = {
'default': RedisJobStore(host="127.0.0.1",port=6379, db=0, password="123456")
# 引入配置的方法,在创建类时加入配置项
scheduler = BackgroundScheduler(jobstores=jobstores)
2.1.3 触发器(executors)
1.date触发器:是最基本的一种调度,任务只会在指定日期时间执行一次。
参数说明:
run_date
:任务的运行日期或时间 (datetime 或 str)
timezone
:指定时区(datetime.tzinfo 或 str)
# 在2020年5月22日执行一次
scheduler.add_job(func=func, trigger="date", run_date=date(2020, 5, 22), timezone="Asia/Shanghai")
# 在2020年8月13日 14:00:01执行一次
scheduler.add_job(func=func, trigger="date", run_date='2020-08-13 14:00:01')
2.interval触发器:根据设置的时间间隔,每间隔执行一次。间隔开始计算时间,为任务创建时间。
参数说明:
weeks
:间隔几周。int
days
:间隔天数。int。
hours
:间隔小时数。int。
minutes
:间隔分钟数。int。
seconds
:间隔秒数。int。
start_date
:间隔触发的起始时间。(datetime 或 str)
end_date
:间隔触发的结束时间。(datetime 或 str)
timezone
:指定时区。(datetime 或 str)
# 每5秒执行一次func()函数
scheduler.add_job(func=func, trigger="interval", seconds=5)
# 在8月13~20日之间,每天执行一次
scheduler.add_job(func=func, trigger="interval", days=1,start_date='2020-08-13 14:00:01', end_date='2020-08-20 14:00:01')
3. crontab触发器:在特定时间周期性地触发,如每天,周循环等场景,它是功能最强大的触发器。
参数范围只适用于int类型,str类型有无限可能,自己踩坑去吧
year
: 年,4位数字。int 或 str
month
: 月 (范围1-12)。int 或 str
day
: 日 (范围1-31)。int 或 str
week
:周 (范围1-53)。int 或 str
day_of_week
: 周内第几天或者星期几 (范围0-6,0是周一,6是周天 或者 mon,tue,wed,thu,fri,sat,sun)。int 或 str
hour
: 时 (范围0-23)。(int 或 str)
minute
: 分 (范围0-59)。(int 或 str)
second
: 秒 (范围0-59)。(int 或 str)
start_date
: 最早开始日期(包含)。(datetime 或 str)
end_date
: 最晚结束时间(包含)。(datetime 或 str)
timezone
: 指定时区。(datetime 或 str)
# 每天23点定时执行
scheduler.add_job(func=func, trigger="cron", day_of_week="0-6", hour=23 )
# 在每年 1、2、3、7、8、9 月份中的每月第4天和星期日中的 00:00, 01:00, 02:00 和 03:00 执行 func 任务
scheduler.add_job(func=func,trigger="cron",month="1-3,7-9",day="4th sun", hour="0-3")
# 每周一早晨9点30分执行func任务
scheduler.add_job(func=func, trigger="cron", day_of_week=0, hour=9, minute=30)
# 间隔5分钟执行一次,与interval触发器使用功能相同
scheduler.add_job(func=func, trigger="cron", minute="*/5" )
crontab触发器所支持的表达式
4.执行器(schedulers):
通常是设置线程池,由多个线程来执行不同的任务,我们的任务涉及到一些 CPU密集计算的操作,那就根据业务情况自行选择。我只用过ThreadPoolExecutor。
ThreadPoolExecutor: 线程池执行器。
ProcessPoolExecutor: 进程池执行器。
GeventExecutor: Gevent程序执行器。
TornadoExecutor: Tornado程序执行器。
TwistedExecutor: Twisted程序执行器。
AsyncIOExecutor: asyncio程序执行器。
# 配置线程池,支持最多20个线程同时执行
from apscheduler.executors.pool import ThreadPoolExecutor
executors = {
'default': ThreadPoolExecutor(20)
# 配置进程池,支持最多3个进程同时执行
from apscheduler.executors.pool import ProcessPoolExecutor
executors = {
'default': ProcessPoolExecutor(3)
#创建类,导入配置
scheduler = BackgroundScheduler(executors=executors)
5.任务相关配置:
coalesce
:是否合并执行任务默认为True。(bool值:True / False )。由于某种原因(系统挂了),定时任务未执行,比如任务A,每秒执行一次,系统挂了5s,积累了5次任务未执行。当系统重启后。
设置为True,则只执行任务A一次。
设置为False,则会将积累的任务都执行一次(5次)。
max_instances
:同一个任务同一时间最多只能有几个实例运行,默认值为1(int)。比如任务A,每3秒执行一次,但是执行一次任务A需要10s。
设置值为1时,在A任务开始执行到第3s时,任务A仍然处于执行中,便不会再次触发任务A执行。
设置值为2时,在任务A执行第3s时,会再次触发任务A执行,此时就有2个任务A处于执行中。即2个同任务实例一起执行。
misfire_grace_time
:添加允许容错的时间,单位为秒,默认值为1(int)。由于当前调度任务较多,线程池都被占满,则未被执行的任务,不会被调度器进行排队。只有当线程池被释放,才会再去检测未被执行的任务触发时间。此时就会判断,任务触发时间 - 当前时间 = 差值,是否小于所配置值。小于则任务仍然会被执行,否则任务将被跳过不执行。
# 任务相关配置
job_defaults = {
# 不合并执行
'coalesce': False,
# 同一时间同个任务最大执行次数为3
'max_instances': 3,
# 任务错过当前时间60s内,仍然可以触发任务
'misfire_grace_time':60
# 创建类,导入配置
scheduler = BackgroundScheduler(job_defaults=job_defaults)
2.2动态配置任务
2.2.1 新增任务
方法:add_job()
主要参数:
func
:定时任务执行的函数名称,注意是名称,不带括号的。
args
:任务执行函数的位置参数。(元组类型)例如args=(3,)
id
:任务id,唯一标识,修改,删除均以任务id作为标识
trigger
:触发器类型,参数可选:date、interval、cron
replace_existing
:将任务持久化至数据库中时,此参数必须添加,值为True。并且id值必须有。不然当程序重新启动时,任务会被重复添加。
其他参数根据trigger设置的类型不同,可以设置响应的时间参数,具体参考2.1.3章节,以及一些不常用的自行百度即可。
# 添加任务,间隔5s执行一次sch_func(a,b)函数。
scheduler.add_job(func=sch_func, args=("1","2"),id="1234", trigger="interval", seconds=5, replace_existing=True)
2.2.2 编辑任务
方法:modify_job()
id
:任务id,唯一标识,为需要修改的任务id
func
:定时任务执行的函数名称,注意是名称,不带括号的。
args
:任务执行函数的位置参数。(元组类型)例如args=(3,)
trigger
:触发器类型,参数可选:date、interval、cron
跟添加任务的参数差不多,只是id为已存在的任务的id,如果用add_job()也能修改任务,只要id传的和已有任务id一致
# 修改任务id为1234的任务,执行时间变为每20s执行一次
scheduler.modify_job(id="1234",func=sch_func, args=("1","2"),trigger="interval", seconds=20 )
# 用add_job()方式修改任务,将任务执行时间变为30s
scheduler.add_job(func=sch_func, args=("1","2"),id="1234", trigger="interval", seconds=30, replace_existing=True)
2.2.3 删除任务
删除单个任务,参数为id
remove_job(id)
删除所有任务,参数可不填写
remove_all_jobs()
2.2.4 暂停任务
方法:pause_job()
参数id
:为需要暂停的任务id
# 任务id为1234的,暂停任务不执行
scheduler.pause_job(id="1234")
2.2.4 恢复任务
方法:resume_job()
参数id
:为需要恢复的任务id
# 任务id为1234的,恢复任务执行
scheduler.resume_job(id="1234")
2.2.5 查询任务
方法:get_job(id="1234")
参数id
:为需要查询的任务id
任务存在,则返回:60763f523fa26ffa0498a1ca (trigger: date[2021-04-14 10:55:32 CST], next run at: 2021-04-14 10:55:32 CST)
类型为: <class 'apscheduler.job.Job'>
任务不存在,则返回:None
三、Flask_Apscheduler简单项目构建
3.1安装Flask_Apscheduler模块
pip install flask_apscheduler
3.2 flask目录结构
my_flask
├── app.py
├── utils
└──schedules.py
3.3 简单启动flask和定时器(定时器任务要自己在写代码添加)
utils/schedules.py
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.executors.pool import ThreadPoolExecutor
from flask_apscheduler import APScheduler as _BaseAPScheduler
# 重写APScheduler,实现上下文管理机制,小优化功能也可以不要。对于任务函数涉及数据库操作有用
class APScheduler(_BaseAPScheduler):
def run_job(self, id, jobstore=None):
with self.app.app_context():
super().run_job(id=id, jobstore=jobstore)
# 定时器配置项
class SchedulerConfig(object):
# 持久化配置,数据持久化至MongoDB
SCHEDULER_JOBSTORES = {'default': MongoDBJobStore(host="127.0.0.1", port=27017,database="apscheduler",collection="jobs")}
# 线程池配置,最大20个线程
SCHEDULER_EXECUTORS = {'default': ThreadPoolExecutor(20)}
# 调度开关开启
SCHEDULER_API_ENABLED = True
# 设置容错时间为 1小时
SCHEDULER_JOB_DEFAULTS = {'misfire_grace_time':3600}
# 配置时区
SCHEDULER_TIMEZONE = 'Asia/Shanghai'
scheduler = APScheduler()
app.py
from flask import Flask
from utils.schedules import SchedulerConfig, scheduler
app = Flask(__name__)
# 导入定时器配置
app.config.from_object(SchedulerConfig())
# 初始化定时器
scheduler.init_app(app)
# 启动定时器,默认后台启动了
scheduler.start()
# 启动flask
app.run()
3.4 小贴士
1.如果不重写APScheduler类,则定时任务函数涉及上下文操作时,app必须与定时任务函数在一个模块内,并且写成如下格式:
app.py
from flask import Flask
app = Flask(__name__)
# 定时任务函数
def fun():
with app.app_context():