Flask_Apscheduler定时器入门

前言

本文介绍的内容均是以程序中只启动了一个调度器为前提下。多个调度器在某些配置项上要多个参数指定调度器。

一、Flask_Apscheduler与Apscheduler

Apscheduler: 全名:Advanced Python Scheduler,是Python的一个定时任务框架,能按指定规则时间执行任务(python的函数),并能持久化任务至数据库,实现对定时任务的动态增、删、改、查操作。具备了一个合格定时器该有的所有功能。

Flask_Apscheduler: 是Flask框架的一个扩展库,增加了Flask对apScheduler的支持,即基于Flask框架使用习惯,稍微调整了一个Apscheduler的用法(配置导入,定时器启动,上下文管理),基本原理的概念与Apscheduler一致。

总结: 想要了解定时器框架的原理和用法,百度直接搜索Apscheduler即可。Flask_Apscheduler只是用于flask框架项目中时,部分方法有稍微变动而已,影响不大。

二、Apscheduler介绍

  • 要使用Apscheduler,当然是要先安装这个模块了,执行 pip install apscheduler

    2.1 四个基础组件

    2.1.1 调度器(schedulers)

    配置任务存储器和执行器都可以在调度器中完成,并对任务进行增、删、改、查操作,调度器通常是只有一个。Apscheduler提供的调度器有7种,我只用过 BlockingScheduler BackgroundScheduler

    BlockingScheduler : 调度器在当前进程的主线程中运行,也就是会阻塞当前线程。
  • BackgroundScheduler : 调度器在后台线程中运行,不会阻塞当前线程。 AsyncIOScheduler : 结合 asyncio 模块(一个异步框架)一起使用。 GeventScheduler : 程序中使用 gevent(高性能的Python并发框架)作为IO模型,和 GeventExecutor 配合使用。 TornadoScheduler : 程序中使用 Tornado(一个web框架)的IO模型,用 ioloop.add_timeout 完成定时唤醒。 TwistedScheduler : 配合 TwistedExecutor,用 reactor.callLater 完成定时唤醒。 QtScheduler : 你的应用是一个 Qt 应用,需使用QTimer完成定时唤醒。
    主进程调度器
    from apscheduler.schedulers.blocking import BlockingScheduler
    def function():
        print("定时任务跑起来了")
    # 定义主进程调度器
    scheduler = BlockingScheduler()
    # 添加任务,间隔5s执行一次func()函数。  看不懂没关系后面会介绍
    scheduler.add_job(func=function, trigger="interval", seconds=5)
    # 启动定时器
    scheduler.start()
    
    后台运行调度器
    from apscheduler.schedulers.background import BackgroundScheduler
    import time
    def function():
        print("定时任务跑起来了")
    # 定义后台执行调度器
    scheduler = BackgroundScheduler()
    # 添加任务,间隔5s执行一次func()函数。  看不懂没关系后面会介绍
    scheduler.add_job(func=function, trigger="interval", seconds=5)
    # 启动定时器
    scheduler.start()
    # 使主程序不关闭
    while True:
        print(time.sleep(9999))
    
    2.1.2 任务存储器(job stores)
  • 任务存储器是可以存储任务的地方,默认情况下任务保存在内存,服务重启后任务会消失,也可将任务保存在各种数据库中(mysql,redis,mongodb)。任务存储进去后,会进行序列化,然后也可以反序列化提取出来,继续执行。
  • # 数据持久化至Mysql
    from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
    # 存储到sql文件
    jobstores = {
        'default': SQLAlchemyJobStore(url="sqlite:///jobs.sqlite")
    # 存储到现有的数据库
    jobstores = {
        'default': SQLAlchemyJobStore(url="mysql://root:[email protected]:3306/sql'")
    # 数据持久化至MongoDB
    from apscheduler.jobstores.mongodb import MongoDBJobStore
    jobstores = {
        'default': MongoDBJobStore(host="127.0.0.1",port=27017, database="apscheduler", collection="jobs")
    # 数据持久化至redis
    from apscheduler.jobstores.redis import RedisJobStore 
    jobstores = {
        'default': RedisJobStore(host="127.0.0.1",port=6379, db=0, password="123456")
    # 引入配置的方法,在创建类时加入配置项
    scheduler = BackgroundScheduler(jobstores=jobstores)
    
    2.1.3 触发器(executors)

    1.date触发器:是最基本的一种调度,任务只会在指定日期时间执行一次。
    参数说明:
    run_date:任务的运行日期或时间 (datetime 或 str)
    timezone:指定时区(datetime.tzinfo 或 str)

    # 在2020年5月22日执行一次
    scheduler.add_job(func=func, trigger="date", run_date=date(2020, 5, 22), timezone="Asia/Shanghai")
    # 在2020年8月13日 14:00:01执行一次
    scheduler.add_job(func=func, trigger="date", run_date='2020-08-13 14:00:01')
    

    2.interval触发器:根据设置的时间间隔,每间隔执行一次。间隔开始计算时间,为任务创建时间。
    参数说明:
    weeks:间隔几周。int
    days:间隔天数。int。
    hours:间隔小时数。int。
    minutes:间隔分钟数。int。
    seconds:间隔秒数。int。
    start_date:间隔触发的起始时间。(datetime 或 str)
    end_date:间隔触发的结束时间。(datetime 或 str)
    timezone:指定时区。(datetime 或 str)

    # 每5秒执行一次func()函数
    scheduler.add_job(func=func, trigger="interval", seconds=5)
    # 在8月13~20日之间,每天执行一次
    scheduler.add_job(func=func, trigger="interval", days=1,start_date='2020-08-13 14:00:01', end_date='2020-08-20 14:00:01')
    

    3. crontab触发器:在特定时间周期性地触发,如每天,周循环等场景,它是功能最强大的触发器。

    参数范围只适用于int类型,str类型有无限可能,自己踩坑去吧
    year: 年,4位数字。int 或 str
    month: 月 (范围1-12)。int 或 str
    day: 日 (范围1-31)。int 或 str
    week:周 (范围1-53)。int 或 str
    day_of_week: 周内第几天或者星期几 (范围0-6,0是周一,6是周天 或者 mon,tue,wed,thu,fri,sat,sun)。int 或 str
    hour: 时 (范围0-23)。(int 或 str)
    minute: 分 (范围0-59)。(int 或 str)
    second: 秒 (范围0-59)。(int 或 str)
    start_date: 最早开始日期(包含)。(datetime 或 str)
    end_date: 最晚结束时间(包含)。(datetime 或 str)
    timezone: 指定时区。(datetime 或 str)

    # 每天23点定时执行
    scheduler.add_job(func=func, trigger="cron", day_of_week="0-6", hour=23 )
    # 在每年 1、2、3、7、8、9 月份中的每月第4天和星期日中的 00:00, 01:00, 02:00 和 03:00 执行 func 任务
    scheduler.add_job(func=func,trigger="cron",month="1-3,7-9",day="4th sun", hour="0-3")
    # 每周一早晨9点30分执行func任务
    scheduler.add_job(func=func, trigger="cron", day_of_week=0,  hour=9, minute=30)
    # 间隔5分钟执行一次,与interval触发器使用功能相同
    scheduler.add_job(func=func, trigger="cron", minute="*/5" )
    

    crontab触发器所支持的表达式

    4.执行器(schedulers):
    通常是设置线程池,由多个线程来执行不同的任务,我们的任务涉及到一些 CPU密集计算的操作,那就根据业务情况自行选择。我只用过ThreadPoolExecutor。

    ThreadPoolExecutor: 线程池执行器。 ProcessPoolExecutor: 进程池执行器。 GeventExecutor: Gevent程序执行器。 TornadoExecutor: Tornado程序执行器。 TwistedExecutor: Twisted程序执行器。 AsyncIOExecutor: asyncio程序执行器。
    # 配置线程池,支持最多20个线程同时执行
    from apscheduler.executors.pool import ThreadPoolExecutor
    executors = {
        'default': ThreadPoolExecutor(20) 
    # 配置进程池,支持最多3个进程同时执行
    from apscheduler.executors.pool import ProcessPoolExecutor
    executors = {
        'default': ProcessPoolExecutor(3) 
    #创建类,导入配置
    scheduler = BackgroundScheduler(executors=executors)
    

    5.任务相关配置:
    coalesce:是否合并执行任务默认为True。(bool值:True / False )。由于某种原因(系统挂了),定时任务未执行,比如任务A,每秒执行一次,系统挂了5s,积累了5次任务未执行。当系统重启后。
    设置为True,则只执行任务A一次。
    设置为False,则会将积累的任务都执行一次(5次)。

    max_instances:同一个任务同一时间最多只能有几个实例运行,默认值为1(int)。比如任务A,每3秒执行一次,但是执行一次任务A需要10s。
    设置值为1时,在A任务开始执行到第3s时,任务A仍然处于执行中,便不会再次触发任务A执行。
    设置值为2时,在任务A执行第3s时,会再次触发任务A执行,此时就有2个任务A处于执行中。即2个同任务实例一起执行。

    misfire_grace_time:添加允许容错的时间,单位为秒,默认值为1(int)。由于当前调度任务较多,线程池都被占满,则未被执行的任务,不会被调度器进行排队。只有当线程池被释放,才会再去检测未被执行的任务触发时间。此时就会判断,任务触发时间 - 当前时间 = 差值,是否小于所配置值。小于则任务仍然会被执行,否则任务将被跳过不执行。

    # 任务相关配置
    job_defaults = {
        # 不合并执行
        'coalesce': False,
        # 同一时间同个任务最大执行次数为3
        'max_instances': 3,
        # 任务错过当前时间60s内,仍然可以触发任务
        'misfire_grace_time':60
    # 创建类,导入配置
    scheduler = BackgroundScheduler(job_defaults=job_defaults)
    

    2.2动态配置任务

    2.2.1 新增任务

    方法:add_job()
    主要参数:
    func:定时任务执行的函数名称,注意是名称,不带括号的。
    args:任务执行函数的位置参数。(元组类型)例如args=(3,)
    id:任务id,唯一标识,修改,删除均以任务id作为标识
    trigger:触发器类型,参数可选:date、interval、cron
    replace_existing:将任务持久化至数据库中时,此参数必须添加,值为True。并且id值必须有。不然当程序重新启动时,任务会被重复添加。

    其他参数根据trigger设置的类型不同,可以设置响应的时间参数,具体参考2.1.3章节,以及一些不常用的自行百度即可。

    # 添加任务,间隔5s执行一次sch_func(a,b)函数。 
    scheduler.add_job(func=sch_func, args=("1","2"),id="1234", trigger="interval", seconds=5, replace_existing=True)
    
    2.2.2 编辑任务

    方法:modify_job()
    id:任务id,唯一标识,为需要修改的任务id
    func:定时任务执行的函数名称,注意是名称,不带括号的。
    args:任务执行函数的位置参数。(元组类型)例如args=(3,)
    trigger:触发器类型,参数可选:date、interval、cron

    跟添加任务的参数差不多,只是id为已存在的任务的id,如果用add_job()也能修改任务,只要id传的和已有任务id一致

    # 修改任务id为1234的任务,执行时间变为每20s执行一次
    scheduler.modify_job(id="1234",func=sch_func, args=("1","2"),trigger="interval", seconds=20 )
    # 用add_job()方式修改任务,将任务执行时间变为30s
    scheduler.add_job(func=sch_func, args=("1","2"),id="1234", trigger="interval", seconds=30, replace_existing=True)
    
    2.2.3 删除任务
    删除单个任务,参数为id
    remove_job(id)
    删除所有任务,参数可不填写
    remove_all_jobs()

    2.2.4 暂停任务

    方法:pause_job()
    参数id:为需要暂停的任务id

    # 任务id为1234的,暂停任务不执行
    scheduler.pause_job(id="1234")
    
    2.2.4 恢复任务

    方法:resume_job()
    参数id:为需要恢复的任务id

    # 任务id为1234的,恢复任务执行
    scheduler.resume_job(id="1234")
    
    2.2.5 查询任务

    方法:get_job(id="1234")
    参数id:为需要查询的任务id
    任务存在,则返回:60763f523fa26ffa0498a1ca (trigger: date[2021-04-14 10:55:32 CST], next run at: 2021-04-14 10:55:32 CST)
    类型为: <class 'apscheduler.job.Job'>
    任务不存在,则返回:None

    三、Flask_Apscheduler简单项目构建

    3.1安装Flask_Apscheduler模块

    pip install flask_apscheduler

    3.2 flask目录结构

    my_flask
    ├── app.py
    ├── utils
          └──schedules.py

    3.3 简单启动flask和定时器(定时器任务要自己在写代码添加)

    utils/schedules.py

    from apscheduler.jobstores.mongodb import MongoDBJobStore
    from apscheduler.executors.pool import ThreadPoolExecutor
    from flask_apscheduler import APScheduler as _BaseAPScheduler
    # 重写APScheduler,实现上下文管理机制,小优化功能也可以不要。对于任务函数涉及数据库操作有用
    class APScheduler(_BaseAPScheduler):
        def run_job(self, id, jobstore=None):
            with self.app.app_context():
                super().run_job(id=id, jobstore=jobstore)
    # 定时器配置项
    class SchedulerConfig(object):
        # 持久化配置,数据持久化至MongoDB
        SCHEDULER_JOBSTORES = {'default': MongoDBJobStore(host="127.0.0.1", port=27017,database="apscheduler",collection="jobs")}
        # 线程池配置,最大20个线程
        SCHEDULER_EXECUTORS = {'default': ThreadPoolExecutor(20)}
        # 调度开关开启
        SCHEDULER_API_ENABLED = True
        # 设置容错时间为 1小时
        SCHEDULER_JOB_DEFAULTS = {'misfire_grace_time':3600}
        # 配置时区
        SCHEDULER_TIMEZONE = 'Asia/Shanghai'
    scheduler = APScheduler()
    

    app.py

    from flask import Flask
    from utils.schedules import SchedulerConfig, scheduler
    app = Flask(__name__)
    # 导入定时器配置
    app.config.from_object(SchedulerConfig())
    # 初始化定时器
    scheduler.init_app(app)
    # 启动定时器,默认后台启动了
    scheduler.start()
    # 启动flask
    app.run()
    
    3.4 小贴士
  • 1.如果不重写APScheduler类,则定时任务函数涉及上下文操作时,app必须与定时任务函数在一个模块内,并且写成如下格式:
  • app.py

    from flask import Flask
    app = Flask(__name__)
    # 定时任务函数
    def fun():
        with app.app_context():