您现在的位置是: 网站首页 >Python Python

【cron、APScheduler】Python动态定时任务创建工具

admin2020年2月3日 11:59 Django | Linux | Python 2758人已围观

# 定时任务 - `celery`:是经过生产级考量,但遇到问题,排查不简单,它的优势重在异步队列,不过Celery也同时提供了定时任务功能。一般不单独搭建Celery作为定时任务。Django中可以使用`django-celery-beat`插件实现动态添加任务的功能,该[插件](https://django-celery-beat.readthedocs.io/en/latest/)依赖celery异步,所以先要配置好异步任务,才能使用定时任务。 - `airflow`:[链接](https://github.com/apache/airflow),[文档](https://airflow.apache.org/docs/stable/tutorial.html),文档有点难懂,apache 基金会的孵化项目。以编程方式编写、计划和监视工作流的平台。 - `apscheduler`:[链接](https://github.com/agronholm/apscheduler/),[文档](https://apscheduler.readthedocs.io/en/latest/),专注于定时任务,功能全面,不仅可以让我们在程序中动态添加和删除我们的定时任务,还支持持久化,且其持久化方案支持很多形式,包括(Memory, MongoDB, SQLAlchemy, Redis, RethinkDB, ZooKeeper)。 - `rq-scheduler`:[链接](https://github.com/rq/rq-scheduler),向 RQ (Redis Queue)Redis队列添加作业调度功能的轻量级库。将作业放入计划程序,运行计划程序,在时间到时将计划作业移到队列中。 - `schedule`:[链接](https://github.com/dbader/schedule),[文档](https://schedule.readthedocs.io/en/latest/),类似于linux的cron,轻量级的定时任务方案,简单好用,不需要做什么配置,但无法动态添加删除任务,也无法将任务持久化。 - `django-q`:[链接](https://github.com/Koed00/django-q),[文档](https://django-q.readthedocs.io/en/latest/),Django Q 是使用 Python 多处理分布式的本机 Django 任务队列、计划程序和辅助应用程序。 # APScheduler实战 ![BLOG_20200403_120032_59](/media/blog/images/2020/04/BLOG_20200403_120032_59.png "博客图集BLOG_20200403_120032_59.png") ![BLOG_20200403_120041_66](/media/blog/images/2020/04/BLOG_20200403_120041_66.png "博客图集BLOG_20200403_120041_66.png") ## Python代码实现Ver0.1 ```python #! /usr/bin/env python # -*- coding: utf-8 -*- """ @Version : Ver0.1 @Author : StarMeow @License : (C) Copyright 2018-2020, blog.starmeow.cn @Contact : starmeow@qq.com @Software: PyCharm @File : scheduled_tasks.py @Time : 2020/2/17 14:59 @Desc : """ from pytz import utc, timezone import datetime import time import os import sys import random import json import logging from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.schedulers.blocking import BlockingScheduler from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore from apscheduler.executors.pool import ThreadPoolExecutor from apscheduler.jobstores.base import JobLookupError from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_EXECUTED # 调度器时间类型 # 三种类型的触发器 from apscheduler.triggers.date import DateTrigger from apscheduler.triggers.interval import IntervalTrigger from apscheduler.triggers.cron import CronTrigger """ 使用SQLAlchemyJobStore,需要pip install sqlalchemy,否则会提示ImportError: SQLAlchemyJobStore requires SQLAlchemy installed """ # 通过logging模块,可以添加apscheduler日志至DEBUG级别,这样就能捕获异常信息,format参数为格式化输出,datefmt为日志日期时间格式 logging.basicConfig(filename='scheduled_run.log', level=logging.DEBUG, format='%(asctime)s|%(levelname)s|%(message)s', datefmt='%Y-%m-%d %H:%M:%S') jobstores = { 'default': SQLAlchemyJobStore(url='sqlite:///schedule.sqlite') # 会自动在当前目录创建该sqlite文件 } executors = { 'default': ThreadPoolExecutor(max_workers=20) # 派生线程的最大数目 } job_defaults = { 'coalesce': False, 'max_instances': 3 } scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=timezone('Asia/Shanghai')) # scheduler = BlockingScheduler() scheduler.start() # 调度器允许添加事件侦听器。作业出错或运行完成时通知 def job_run_listener(event): print(event.job_id, event.scheduled_run_time) job_id = event.job_id scheduled_run_time = event.scheduled_run_time.strftime("%Y-%m-%d %H:%M:%S") if event.exception: print('作业ID:{} 在 {} 执行失败 :( 错误原因:{}'.format(job_id, scheduled_run_time, event.exception.args[0])) else: print('作业ID:{} 在 {} 执行成功 :)'.format(job_id, scheduled_run_time)) # 当任务执行完或任务出错时,调用job_run_listener scheduler.add_listener(job_run_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR) def job_schedule_reminder(msg): """ 定时通知 :param msg: 消息内容 :return: """ with open('schedule.log', 'a', encoding='utf-8') as f: f.write('{} 执行任务:{}\n'.format(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), msg)) f.close() print('系统通知:', msg) class ScheduleManage(object): def __init__(self, sch): self.sch = sch # 暂停作业 def pause_job(self, job_id, jobstore=None): self.sch.pause_job(job_id, jobstore=jobstore) msg = '作业ID:{} 暂停成功'.format(job_id) print(msg) # 恢复作业 def resume_job(self, job_id, jobstore=None): self.sch.resume_job(job_id, jobstore=jobstore) msg = '作业ID:{} 恢复成功'.format(job_id) print(msg) # 删除作业 def remove_job(self, job_id=None, jobstore=None): if job_id is None: if input('确认删除所有作业?') == 'y': self.sch.remove_all_jobs(jobstore=jobstore) msg = '所有作业删除成功' else: try: self.sch.remove_job(job_id, jobstore=jobstore) msg = '作业ID:{} 删除成功'.format(job_id) except JobLookupError as e: msg = '作业ID不存在:{}'.format(e) print(msg) # 获取作业的触发器和配置的时间字符串,以及作业下次运行时间等信息 def get_job(self, job_id, jobstore=None): """ 获取作业的所有信息 :param job_id: 作业ID :param jobstore: :return: id、name、func、func_args、func_kwargs、trigger、trigger_time、state、next_run_time """ job = self.sch.get_job(job_id, jobstore) job_info = dict() job_info['id'] = job.id job_info['name'] = job.name job_info['func'] = job.func.__name__ job_info['func_args'] = job.args job_info['func_kwargs'] = job.kwargs if isinstance(job.trigger, DateTrigger): job_info['trigger'] = 'date' job_info['trigger_time'] = job.trigger.run_date.strftime('%Y-%m-%d %H:%M:%S') elif isinstance(job.trigger, IntervalTrigger): job_info['trigger'] = 'interval' # print(job.trigger.interval.days) w, d = divmod(job.trigger.interval.days, 7) # 天转换为周、天 # print(job.trigger.interval.seconds) m, s = divmod(job.trigger.interval.seconds, 60) # 秒转换为时、分、秒 h, m = divmod(m, 60) job_info['trigger_time'] = '{} {} {} {} {}'.format(s, m, h, d, w) elif isinstance(job.trigger, CronTrigger): job_info['trigger'] = 'cron' job_info['trigger_time'] = '{} {} {} {} {} {} {}'.format(job.trigger.fields[7], job.trigger.fields[6], job.trigger.fields[5], job.trigger.fields[4], job.trigger.fields[3], job.trigger.fields[2], job.trigger.fields[1]) else: job_info['trigger'] = job_info['trigger_time'] = None next_run_time = job.next_run_time if next_run_time: # 作业运行中 job_info['state'] = '运行中' job_info['next_run_time'] = next_run_time.strftime("%Y-%m-%d %H:%M:%S") else: # 作业暂停中,next_run_time为None,进行获取 job_info['state'] = '暂停中' job_info['next_run_time'] = '{}(恢复运行后)'.format(job.trigger.get_next_fire_time(None, datetime.datetime.now(timezone('Asia/Shanghai'))).strftime("%Y-%m-%d %H:%M:%S")) print(job_info) return job_info # 获取所有作业 def get_jobs(self): all_jobs = self.sch.get_jobs() job_infos = [] for job in all_jobs: job_infos.append(self.get_job(job.id)) return json.dumps(job_infos) # 处理add、modify传入的kwargs @staticmethod def run_trigger_time(trigger, trigger_time, start_end_date): kwargs = {} if trigger == 'date': kwargs['run_date'] = trigger_time elif trigger == 'interval': """ :param int weeks: number of weeks to wait :param int days: number of days to wait :param int hours: number of hours to wait :param int minutes: number of minutes to wait :param int seconds: number of seconds to wait """ kwargs['seconds'], kwargs['minutes'], kwargs['hours'], kwargs['days'], kwargs['weeks'] = map(int, trigger_time.split(' ')) kwargs.update(start_end_date) # 添加起止日期 elif trigger == 'cron': """ :param int|str year: 4-digit year :param int|str month: month (1-12) :param int|str day: day of the (1-31) :param int|str day_of_week: number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun) :param int|str hour: hour (0-23) :param int|str minute: minute (0-59) :param int|str second: second (0-59) """ kwargs['second'], kwargs['minute'], kwargs['hour'], kwargs['day'], kwargs['month'], kwargs['day_of_week'], kwargs['year'] = trigger_time.split(' ') kwargs.update(start_end_date) # 添加起止日期 else: pass return kwargs # 添加作业 def add_job(self, func, trigger, trigger_time, func_args=None, func_kwargs=None, jobstore='default', **kwargs): """ date: .add_job(job_function, 'date', args=['msg'], run_date='2020-02-20 12:12:00') interval: .add_job(job_function, 'interval', hours=2, start_date='2020-02-20 12:12:00', end_date='2020-02-22 12:12:00') cron: .add_job(job_function, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3') :param func: 执行的函数名 :param trigger: 触发器类型:date、interval、cron :param trigger_time: 执行时间信息: date->'2020-02-20 12:12:00' interval->'秒 分 时 日 周':每2s(2 0 0 0 0),每1天12小时(0 0 12 1 0),每1周(0 0 0 0 1) cron->'秒 分 时 日 月 周 年':每日xx:xx:00(0 x x * * * *)、每周x(0 x x * * x *)、每月x日(0 x x x * * *)、每年x月x日(0 x x x x * *) :param func_args: 要调用func的位置参数list :param func_kwargs: 调用func的关键字参数dict :param jobstore: 存储器,默认 :return: job_id """ random_num = random.randint(100, 999) job_id = '{}_{}_{}'.format(trigger, int(time.time()), random_num) # 时间戳+随机数为id job_name = '{}_{}'.format(func.__name__, random_num) start_end_date = {'start_date': kwargs.get('start_date', None), 'end_date': kwargs.get('end_date', None)} # interval和cron可能传入的起止日期 add_kwargs = {'func': func, 'trigger': trigger, 'args': func_args, 'kwargs': func_kwargs, 'id': job_id, 'name': job_name} # 指定添加作业的参数 add_kwargs.update(self.run_trigger_time(trigger=trigger, trigger_time=trigger_time, start_end_date=start_end_date)) # 将时间参数合并 # print(add_kwargs) if trigger in ('date', 'interval', 'cron'): try: job = self.sch.add_job(**add_kwargs) print('当前新建任务:', job) # 获取当前创建作业的下次运行时间 # next_run_time = scheduler.get_job(job_id).next_run_time next_run_time = job.next_run_time # <class 'datetime.datetime'>offset-aware类型,包含时区的 next_run_time = next_run_time.replace(tzinfo=None) # offset-aware类型的datetime转换为offset-naive类型的datetime,即去掉时间戳 now_datetime = datetime.datetime.now() # 获取当前时间 if next_run_time <= now_datetime: print('任务时间需大于当前时间,任务已被系统自动删除,创建失败!') else: week = { '0': '日', '1': '一', '2': '二', '3': '三', '4': '四', '5': '五', '6': '六', } print('定时任务创建成功,job_id:{},下次运行时间:{}(周{})'.format(job_id, next_run_time, week[next_run_time.strftime('%w')])) except ValueError as e: print('异常:', e) return job_id else: print('创建失败,指定触发器错误') return None sch = ScheduleManage(scheduler) sch.get_jobs() # sch.pause_job('interval_1582123807_794') # job_id = sch.add_job(func=job_schedule_reminder, trigger='date', trigger_time='2020-2-20 21:00:00', func_args=['date:今天晚上8:00部门开会']) # job_id = sch.add_job(func=job_schedule_reminder, trigger='interval', trigger_time='2 0 0 0 0', func_args=['interval:每隔2s发个通知']) # 每隔xx时间 # job_id = sch.add_job(func=job_schedule_reminder, trigger='interval', trigger_time='0 0 12 1 0', func_args=['interval:每隔1天12小时发个通知']) # 每隔xx时间 # job_id = sch.add_job(func=job_schedule_reminder, trigger='interval', trigger_time='1 0 12 0 1', func_args=['interval:每隔1周12小时1秒发个通知']) # 每隔xx时间 # job_id = sch.add_job(func=job_schedule_reminder, trigger='interval', trigger_time='1 0 1 1 1', func_args=['interval:每隔1周1天1小时1秒发个通知']) # 每隔xx时间 # # job_id = sch.add_job(func=job_schedule_reminder, trigger='cron', trigger_time='0 55 20 * * * *', func_args=['cron:每天20:55定时消息']) # job_id = sch.add_job(func=job_schedule_reminder, trigger='cron', trigger_time='0 0 12 * * 0,2,4 *', func_args=['cron:每周日、三、五 12:00定时消息']) # job_id = sch.add_job(func=job_schedule_reminder, trigger='cron', trigger_time='0 0 18 1 * * *', func_args=['cron:每月1号 18:00定时消息']) # job_id = sch.add_job(func=job_schedule_reminder, trigger='cron', trigger_time='0 30 9 10 12 * *', func_args=['cron:每年12月10日 9:30定时消息']) time.sleep(10) sch.remove_job(job_id) print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C')) try: while True: time.sleep(20) print('延迟2s') except (KeyboardInterrupt, SystemExit): scheduler.shutdown() # wait=False参数可选,代表立即停止,不用等待。 print('调度器启动失败') ``` ## Python代码实现Ver0.2 ### apscheduler_core.py ```python #! /usr/bin/env python # -*- coding: utf-8 -*- """ @Version : Ver0.2 @Author : StarMeow @License : (C) Copyright 2018-2020, blog.starmeow.cn @Contact : starmeow@qq.com @Software: PyCharm @File : apscheduler_core.py @Time : 2020年2月21日 00:02:08 @Desc : 动态定时任务工具,自动创建apscheduler_run.log、apscheduler_job.log、apscheduler_db_sqlite三个文件 """ import random import json import datetime import time import logging from pytz import utc, timezone from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.schedulers.blocking import BlockingScheduler from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore from apscheduler.jobstores.base import JobLookupError from apscheduler.executors.pool import ThreadPoolExecutor from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_EXECUTED, EVENT_SCHEDULER_SHUTDOWN # 调度器事件类型 # 三种类型的触发器 from apscheduler.triggers.date import DateTrigger from apscheduler.triggers.interval import IntervalTrigger from apscheduler.triggers.cron import CronTrigger def run_scheduler(): """ 使用SQLAlchemyJobStore,需要pip install sqlalchemy,否则会提示ImportError: SQLAlchemyJobStore requires SQLAlchemy installed """ # 通过logging模块,可以添加apscheduler日志至DEBUG级别,这样就能捕获异常信息,format参数为格式化输出,datefmt为日志日期时间格式 logging.basicConfig(filename='apscheduler_run.log', level=logging.DEBUG, format='%(asctime)s|%(levelname)s|%(message)s', datefmt='%Y-%m-%d %H:%M:%S') # 配置作业存储器 # redis_store = RedisJobStore(host='192.168.1.100', port='6379', db=0) # mysql_store = SQLAlchemyJobStore(url='mysql+pymysql://root:password@192.168.1.100:3306/apscheduler_db?charset=utf8') # postgres_store = SQLAlchemyJobStore(url='postgresql://postgres:postgres@192.168.1.100:5432') jobstores = { 'default': SQLAlchemyJobStore(url='sqlite:///apscheduler_db.sqlite') # 会自动在当前目录创建该sqlite文件 } # 配置执行器,并设置线程数 executors = { 'default': ThreadPoolExecutor(max_workers=20) # 派生线程的最大数目 } job_defaults = { 'coalesce': False, # 累计的作业是否执行。True不执行,False,执行。比如进场挂了,导致任务多次没有调用,则前几次的累计任务的任务是否执行的策略。 'max_instances': 3 # 同一个任务在线程池中最多跑的实例数 } scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=timezone('Asia/Shanghai')) # scheduler = BlockingScheduler() scheduler.start() # 调度器允许添加事件侦听器。作业出错或运行完成时通知 def job_run_listener(event): if event.code == 2 ** 1: print('监听-> 调度器关闭') else: # print(event.job_id, event.scheduled_run_time) job_id = event.job_id scheduled_run_time = datetime2str(event.scheduled_run_time) if event.exception: print('【core】监听-> 作业ID:{} 在 {} 执行失败 :( 错误原因:{}'.format(job_id, scheduled_run_time, event.exception.args[0])) else: print('【core】监听-> 作业ID:{} 在 {} 执行成功 :)'.format(job_id, scheduled_run_time)) # 当任务执行完或任务出错时,调用job_run_listener scheduler.add_listener(job_run_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR | EVENT_SCHEDULER_SHUTDOWN) return scheduler def datetime2str(dt): """ 当前时间转换为时间字符串 :param dt: :return: """ if dt: # 如果时间补位None return dt.strftime("%Y-%m-%d %H:%M:%S") else: return '' class SchedulerManage(object): def __init__(self, sch, jobstore='default'): self.sch = sch self.jobstore = jobstore # 使用default存储器 # 暂停作业 def pause_job(self, job_id): try: self.sch.pause_job(job_id, jobstore=self.jobstore) msg = '【core】暂停-> 作业ID:{} 暂停成功,暂停时间:{}'.format(job_id, datetime2str(datetime.datetime.now())) except JobLookupError as e: msg = '【core】暂停-> 作业ID:{}不存在:{}'.format(job_id, e) print(msg) # 恢复作业,或在作业完成后(下次运行时间为None)删除作业 def resume_job(self, job_id): try: self.sch.resume_job(job_id, jobstore=self.jobstore) msg = '【core】恢复-> 作业ID:{} 恢复成功,恢复时间:{},恢复后将在 {} 执行'.format(job_id, datetime2str(datetime.datetime.now()), self.get_job(job_id).get('next_run_time')) except JobLookupError as e: msg = '【core】恢复-> 作业ID:{}不存在:{}'.format(job_id, e) print(msg) # 删除作业 def remove_job(self, job_id=None): if job_id is None: if input('【core】删除-> 确认删除所有作业?') == 'y': self.sch.remove_all_jobs(jobstore=self.jobstore) msg = '【core】删除-> 所有作业删除成功' else: try: self.sch.remove_job(job_id, jobstore=self.jobstore) msg = '【core】删除-> 作业ID:{} 删除成功,删除时间:{}'.format(job_id, datetime2str(datetime.datetime.now())) except JobLookupError as e: msg = '【core】删除-> 作业ID:{}不存在:{}'.format(job_id, e) print(msg) # 获取作业的触发器和配置的时间字符串,以及作业下次运行时间等信息 def get_job(self, job_id): """ 获取作业的所有信息 :param job_id: 作业ID :return: id、name、func、func_args、func_kwargs、trigger、trigger_time、state、next_run_time """ job = self.sch.get_job(job_id, self.jobstore) if job is None: return {'msg': '作业ID:{} 不存在'.format(job_id)} job_info = dict() job_info['id'] = job.id job_info['name'] = job.name job_info['func'] = job.func.__name__ job_info['func_args'] = job.args job_info['func_kwargs'] = job.kwargs if isinstance(job.trigger, DateTrigger): job_info['trigger'] = 'date' job_info['trigger_time'] = datetime2str(job.trigger.run_date) elif isinstance(job.trigger, IntervalTrigger): job_info['trigger'] = 'interval' # print(job.trigger.interval.days) w, d = divmod(job.trigger.interval.days, 7) # 天转换为周、天 # print(job.trigger.interval.seconds) m, s = divmod(job.trigger.interval.seconds, 60) # 秒转换为时、分、秒 h, m = divmod(m, 60) job_info['trigger_time'] = '{} {} {} {} {}'.format(s, m, h, d, w) job_info['start_date'] = datetime2str(job.trigger.start_date) job_info['end_date'] = datetime2str(job.trigger.end_date) elif isinstance(job.trigger, CronTrigger): job_info['trigger'] = 'cron' job_info['trigger_time'] = '{} {} {} {} {} {} {}'.format(job.trigger.fields[7], job.trigger.fields[6], job.trigger.fields[5], job.trigger.fields[4], job.trigger.fields[3], job.trigger.fields[2], job.trigger.fields[1]) job_info['start_date'] = datetime2str(job.trigger.start_date) job_info['end_date'] = datetime2str(job.trigger.end_date) else: job_info['trigger'] = job_info['trigger_time'] = None next_run_time = job.next_run_time if next_run_time: # 作业运行中 job_info['state'] = '运行中' job_info['next_run_time'] = datetime2str(next_run_time) else: # 作业暂停中,next_run_time为None,进行获取 job_info['state'] = '暂停中' job_info['next_run_time'] = '{}(恢复运行后)'.format(datetime2str(job.trigger.get_next_fire_time(None, datetime.datetime.now(timezone('Asia/Shanghai'))))) # print(job_info) return job_info # 获取所有作业 def get_jobs(self): all_jobs = self.sch.get_jobs() job_infos = [] for job in all_jobs: job_infos.append(self.get_job(job.id)) return json.dumps(job_infos) # 处理add、modify传入的kwargs:触发器时间字符串->触发器时间参数 @staticmethod def trigger_time2trigger_args(trigger, trigger_time, start_date=None, end_date=None): """ 根据触发器别名str,将触发时间字符串转触发器时间参数 :param trigger: 触发器别名str :param trigger_time: 触发时间str :param start_date: (str)interval和cron的开始时间 :param end_date: (str)interval和cron的结束时间 :return: 触发时间的dict """ kwargs = {} if trigger == 'date': kwargs['run_date'] = trigger_time elif trigger == 'interval': """ :param int weeks: number of weeks to wait :param int days: number of days to wait :param int hours: number of hours to wait :param int minutes: number of minutes to wait :param int seconds: number of seconds to wait """ kwargs['seconds'], kwargs['minutes'], kwargs['hours'], kwargs['days'], kwargs['weeks'] = map(int, trigger_time.split(' ')) elif trigger == 'cron': """ :param int|str year: 4-digit year :param int|str month: month (1-12) :param int|str day: day of the (1-31) :param int|str day_of_week: number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun) :param int|str hour: hour (0-23) :param int|str minute: minute (0-59) :param int|str second: second (0-59) """ kwargs['second'], kwargs['minute'], kwargs['hour'], kwargs['day'], kwargs['month'], kwargs['day_of_week'], kwargs['year'] = trigger_time.split(' ') else: pass # 添加起止日期 if trigger != 'date': if start_date: kwargs['start_date'] = start_date if end_date: kwargs['end_date'] = end_date return kwargs # 添加作业 def add_job(self, func, trigger, trigger_time, func_args=None, func_kwargs=None, start_date=None, end_date=None): """ date: .add_job(job_function, 'date', args=['msg'], run_date='2020-02-20 12:12:00') interval: .add_job(job_function, 'interval', hours=2, start_date='2020-02-20 12:12:00', end_date='2020-02-22 12:12:00') cron: .add_job(job_function, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3') :param func: 执行的函数名 :param trigger: (str)触发器类型:date、interval、cron :param trigger_time: (str)执行时间信息: date->'2020-02-20 12:12:00' interval->'秒 分 时 日 周':每2s(2 0 0 0 0),每1天12小时(0 0 12 1 0),每1周(0 0 0 0 1) cron->'秒 分 时 日 月 周 年':每日xx:xx:00(0 x x * * * *)、每周x(0 x x * * x *)、每月x日(0 x x x * * *)、每年x月x日(0 x x x x * *) :param func_args: (tuple|list)要调用func的位置参数list :param func_kwargs: (dict)调用func的关键字参数dict :param start_date: (str)interval和cron的开始时间,格式为:xxxx-xx-xx xx:xx:xx :param end_date: (str)interval和cron的结束时间 :return: job_id """ random_num = random.randint(100, 999) job_id = '{}_{}_{}'.format(trigger, int(time.time()), random_num) # 时间戳+随机数为id job_name = '{}_{}'.format(func.__name__, random_num) add_kwargs = {'func': func, 'trigger': trigger, 'args': func_args, 'kwargs': func_kwargs, 'id': job_id, 'name': job_name} # 指定添加作业的参数 add_kwargs.update(self.trigger_time2trigger_args(trigger=trigger, trigger_time=trigger_time, start_date=start_date, end_date=end_date)) # 将触发器时间参数合并 # print(add_kwargs) if trigger in ('date', 'interval', 'cron'): try: """ add_job() 'func': '函数名', 'trigger': '触发器别名', 'args': '函数参数', 'func_kwargs': '函数参数', 'id': '作业ID', 'name': '作业名称', '**trigger_args': 触发器时间参数,trigger_time字符串转换来, jobstore='default':指定存储器名称 executor='default':指定执行器名称 另外还有3个参数: misfire_grace_time=undefined:如果一个job本来14:00有一次执行,但是由于某种原因没有被调度上,现在14:01了,这个14:00的运行实例被提交时, 会检查它预订运行的时间和当下时间的差值(这里是1分钟),大于我们设置的30秒限制,那么这个运行实例不会被执行。 coalesce=undefined:最常见的情形是scheduler被shutdown后重启,某个任务会积攒了好几次没执行, 如5次,下次这个job被submit给executor时,执行5次。将coalesce=True后,只会执行一次 replace_existing=False:如果在程序初始化时,是从数据库读取任务的,那么必须为每个任务定义一个明确的ID, 并且使用replace_existing=True,否则每次重启程序,你都会得到一份新的任务拷贝,也就意味着任务的状态不会保存。 """ job = self.sch.add_job(**add_kwargs) print('【core】创建-> 当前新建任务:', job) # 获取当前创建作业的下次运行时间 # next_run_time = scheduler.get_job(job_id).next_run_time next_run_time = job.next_run_time # <class 'datetime.datetime'>offset-aware类型,包含时区的 next_run_time = next_run_time.replace(tzinfo=None) # offset-aware类型的datetime转换为offset-naive类型的datetime,即去掉时间戳 now_datetime = datetime.datetime.now() # 获取当前时间 if next_run_time <= now_datetime: print('【core】创建-> 任务时间需大于当前时间,任务已被系统自动删除,创建失败!') else: week = { '0': '日', '1': '一', '2': '二', '3': '三', '4': '四', '5': '五', '6': '六', } print('【core】创建-> 定时任务创建成功,job_id:{},下次运行时间:{}(周{})'.format(job_id, next_run_time, week[next_run_time.strftime('%w')])) except ValueError as e: print('【core】创建-> 异常:', e) return job_id else: print('【core】创建-> 创建失败,指定触发器错误') return None # 修改作业 def modify_job(self, job_id, trigger=None, trigger_time=None, name=None, func=None, func_args=None, func_kwargs=None, start_date=None, end_date=None, reschedule=True): """ 可以修改任务当中除了id的任何属性 :param job_id: :param trigger: :param trigger_time: :param name: (str) - 此作业的描述 :param func: 可执行的调用函数 :param func_args: (tuple|list) - 可调用位置参数 :param func_kwargs: (dict) - 可调用的关键字参数 :param start_date: (str)interval和cron的开始时间 :param end_date: (str)interval和cron的结束时间 :param reschedule: 是否重新计算时间 :return: """ changes = {'job_id': job_id, 'jobstore': self.jobstore} if trigger and trigger_time: # if trigger == 'date': # trigger_obj = DateTrigger(**self.trigger_time2trigger_args(trigger, trigger_time)) # elif trigger == 'interval': # trigger_obj = IntervalTrigger(**self.trigger_time2trigger_args(trigger, trigger_time)) # elif trigger == 'cron': # trigger_obj = CronTrigger(**self.trigger_time2trigger_args(trigger, trigger_time)) # else: # trigger_obj = None # print(trigger_obj) trigger_obj_dict = { # 构建触发器别名和对象字典 'date': DateTrigger, 'interval': IntervalTrigger, 'cron': CronTrigger } # 传入触发器时间参数,返回dict,在进行触发器实例化得到新触发器对象 trigger_obj = trigger_obj_dict[trigger](**self.trigger_time2trigger_args(trigger, trigger_time, start_date, end_date)) print('【core】修改-> 原计划,下次运行时间:{}'.format(self.get_job(job_id).get('next_run_time'))) changes['trigger'] = trigger_obj if name: changes['name'] = name if func: changes['func'] = func if func_args: changes['args'] = func_args if func_kwargs: changes['kwargs'] = func_kwargs # print(changes) # 执行修改 self.sch.modify_job(**changes) if reschedule and trigger and trigger_time: # 修改触发器时间参数后,重新计算下次运行时间,即按照新的触发时间执行下一次作业 self.sch.reschedule_job(job_id=job_id, jobstore=self.jobstore, trigger=changes['trigger']) print('【core】修改-> 修改触发器时间后,下次运行时间:{}'.format(self.get_job(job_id).get('next_run_time'))) return job_id # 退出 def shutdown(self): self.sch.shutdown() # 实例化 scheduler = SchedulerManage(run_scheduler()) ``` ### apscheduler_main.py #### 作业管理基本方法 ```python # -*- coding: utf-8 -*- """ @Version : Ver0.2 @Author : StarMeow @License : (C) Copyright 2018-2020, blog.starmeow.cn @Contact : starmeow@qq.com @Software: PyCharm @File : apscheduler_main.py @Time : 2020年2月21日 00:02:12 @Desc : 操作作业 """ import os import time import datetime from apscheduler_core import datetime2str, scheduler def job_schedule_reminder(msg): """ 定时通知 :param msg: 消息内容 :return: """ with open('apscheduler_job.log', 'a', encoding='utf-8') as f: f.write('{} 执行任务:{}\n'.format(datetime2str(datetime.datetime.now()), msg)) f.close() print('系统通知:', msg) """ 创建作业:date、interval、cron trigger_time格式为: date->'2020-02-20 12:12:00' interval->'秒 分 时 日 周':每2s(2 0 0 0 0),每1天12小时(0 0 12 1 0),每1周(0 0 0 0 1) cron->'秒 分 时 日 月 周 年':每日xx:xx:00(0 x x * * * *)、每周x(0 x x * * x *)、每月x日(0 x x x * * *)、每年x月x日(0 x x x x * *) """ # job_id = scheduler.add_job(func=job_schedule_reminder, trigger='date', trigger_time='2020-2-20 21:00:00', func_args=['date:今天晚上8:00部门开会']) # job_id = scheduler.add_job(func=job_schedule_reminder, trigger='interval', trigger_time='0 0 12 1 0', func_args=['interval:每隔1天12小时发个通知']) # 每隔xx时间 # job_id = scheduler.add_job(func=job_schedule_reminder, trigger='interval', trigger_time='1 0 12 0 1', func_args=['interval:每隔1周12小时1秒发个通知']) # 每隔xx时间 # job_id = scheduler.add_job(func=job_schedule_reminder, trigger='interval', trigger_time='1 0 1 1 1', func_args=['interval:每隔1周1天1小时1秒发个通知']) # 每隔xx时间 # job_id = scheduler.add_job(func=job_schedule_reminder, trigger='cron', trigger_time='0 55 20 * * * *', func_args=['cron:每天20:55定时消息']) # job_id = scheduler.add_job(func=job_schedule_reminder, trigger='cron', trigger_time='0 0 12 * * 0,2,4 *', func_args=['cron:每周日、三、五 12:00定时消息']) # job_id = scheduler.add_job(func=job_schedule_reminder, trigger='cron', trigger_time='0 0 18 1 * * *', func_args=['cron:每月1号 18:00定时消息']) # job_id = scheduler.add_job(func=job_schedule_reminder, trigger='cron', trigger_time='0 30 9 10 12 * *', func_args=['cron:每年12月10日 9:30定时消息']) job_id = scheduler.add_job(func=job_schedule_reminder, trigger='interval', trigger_time='5 0 0 0 0', func_args=['interval:每隔5s发个通知'], start_date='2020-02-23 09:53:43') # 每隔xx时间 """ 修改作业: 支持修改:trigger&trigger_time、name、func、func_args、func_kwargs reschedule默认为True,表示修改触发器时间后更新下次执行的时间,否则按照原触发器执行一次再按新触发器时间执行 """ scheduler.modify_job(job_id, name='修改了名称和触发器时间', trigger='interval', trigger_time='20 0 0 0 0', start_date='2021-02-23 09:53:43', reschedule=True) """ 查看作业: 指定作业的id,获取作业相关属性dict """ print(scheduler.get_job(job_id)) """ 暂停作业: 指定作业的id,暂停 """ scheduler.pause_job(job_id) """ 恢复作业: 指定作业的id,恢复 """ scheduler.resume_job(job_id) """ 删除作业: 指定作业的id,删除该作业 """ scheduler.remove_job(job_id) ``` #### 模拟下单未付款删除订单 ```python import os import time import datetime from apscheduler_core import datetime2str, scheduler def del_user_order(order_id): print('用户未付款,删除订单:{},该订单所有商品补充库存。'.format(order_id)) print('<购物系统> 商品抢购,抢购后请10s内付款!如果付款完成,则删除自动任务') order_id = '20200220xxxx' print('<购物系统> 用户已下单,10s内未付款自动删除订单:{}'.format(order_id)) now_time = datetime.datetime.now() del_time = now_time + datetime.timedelta(seconds=10) job_id = scheduler.add_job(func=del_user_order, trigger='date', trigger_time=datetime2str(del_time), func_args=[order_id]) if input('<购物系统> 判断已付款?(输入y表示用户已付款):') == 'y': scheduler.remove_job(job_id) """ —————————————————————————————————已付款运行结果————————————————————————————— <购物系统> 商品抢购,抢购后请10s内付款!如果付款完成,则删除自动任务 <购物系统> 用户已下单,10s内未付款自动删除订单:20200220xxxx 当前新建任务: del_user_order_652 (trigger: date[2020-02-20 12:44:35 CST], next run at: 2020-02-20 12:44:35 CST) 定时任务创建成功,job_id:date_1582173865_652,下次运行时间:2020-02-20 12:44:35(周四) <购物系统> 判断已付款?(输入y表示用户已付款):y 作业ID:date_1582173865_652 删除成功,删除时间:2020-02-20 12:44:29 —————————————————————————————————未付款运行结果————————————————————————————— <购物系统> 商品抢购,抢购后请10s内付款!如果付款完成,则删除自动任务 <购物系统> 用户已下单,10s内未付款自动删除订单:20200220xxxx 当前新建任务: del_user_order_411 (trigger: date[2020-02-20 12:44:49 CST], next run at: 2020-02-20 12:44:49 CST) 定时任务创建成功,job_id:date_1582173879_411,下次运行时间:2020-02-20 12:44:49(周四) <购物系统> 判断已付款?(输入y表示用户已付款):用户未付款,删除订单:20200220xxxx,该订单所有商品补充库存。 监听-> 作业ID:date_1582173879_411 在 2020-02-20 12:44:49 执行成功 :) """ print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C')) try: while True: time.sleep(600) print('延迟2s') import sys sys.exit(0) except (KeyboardInterrupt, SystemExit): scheduler.shutdown() # wait=False参数可选,代表立即停止,不用等待。 print('调度器关闭') ``` #### 模拟事件处理中暂停推迟 暂停后预期处理时间应该增加。 ```python import os import time import datetime from apscheduler_core import datetime2str, scheduler def fault_handling(fault_id): print('\n\n未在规定时间处理好故障单:{},推送通知!\n'.format(fault_id)) mock_require_time = 60 # 拟订处理要求时间 mock_used_time = 30 # 模拟已处理时间 mock_pause_time = 20 # 模拟暂停时间 print(f'<限时处理> 事件系统派单,要求{mock_require_time}s内处理完成') fault_id = 'fault_xxxx' print(f'<限时处理> 你有新的事件:{fault_id}需处理,请在{mock_require_time}s内处理完成') start_time = datetime.datetime.now() # 处理开始时间 end_time = start_time + datetime.timedelta(seconds=mock_require_time) # 预期处理结束时间 print('<限时处理> 开始处理于:{},需要在:{} 前处理完成。'.format(datetime2str(start_time), datetime2str(end_time))) print('<限时处理> 事件紧急处理中...') # 添加作业,如果没在规定时间处理完成,将推送通知。 job_id = scheduler.add_job(func=fault_handling, trigger='date', trigger_time=datetime2str(end_time), func_args=[fault_id]) time.sleep(mock_used_time) # 暂停作业 pause_time = datetime.datetime.now() # 点击暂停时的时间 print('<限时处理> 已处理 {} s'.format((pause_time - start_time).total_seconds())) print(f'<限时处理> 假如维护人员此时有其他紧急事情处理,点击了暂停事件') scheduler.pause_job(job_id) time.sleep(mock_pause_time) # 恢复作业时,需要先将定时时间修改,得到增加剩余时间后的时间 resume_time = datetime.datetime.now() # 点击恢复时的时间 pause_seconds = (resume_time - pause_time).total_seconds() # 获取从开始暂停到恢复的时间转换为s print(f'<限时处理> 事件已恢复,该事件已暂停了 {pause_seconds} s') new_end_time = end_time + datetime.timedelta(seconds=pause_seconds) scheduler.modify_job(job_id, trigger='date', trigger_time=datetime2str(new_end_time)) # 修改新的结束时间 scheduler.resume_job(job_id) print('<限时处理> 请尽快处理,需要 {} 前处理完成。'.format(scheduler.get_job(job_id).get('next_run_time'))) """ ————————————————————总的时间:del_time-start_time————————————————————————————end ——————已处理时间——————pause ———————————暂停的时间———————————resume ———————————————————————剩余的时间——————————————————————end ———————————暂停的时间———————————resume """ if input('<限时处理> 是否已结单:') == 'y': scheduler.remove_job(job_id) """ 运行结果: <限时处理> 事件系统派单,要求60s内处理完成 <限时处理> 你有新的事件:fault_xxxx需处理,请在60s内处理完成 <限时处理> 开始处理于:2020-02-21 15:11:28,需要在:2020-02-21 15:12:28 前处理完成。 <限时处理> 事件紧急处理中... 【core】创建-> 当前新建任务: fault_handling_557 (trigger: date[2020-02-21 15:12:28 CST], next run at: 2020-02-21 15:12:28 CST) 【core】创建-> 定时任务创建成功,job_id:date_1582269088_557,下次运行时间:2020-02-21 15:12:28(周五) <限时处理> 已处理 30.042661 s <限时处理> 假如维护人员此时有其他紧急事情处理,点击了暂停事件 【core】暂停-> 作业ID:date_1582269088_557 暂停成功,暂停时间:2020-02-21 15:11:58 <限时处理> 事件已恢复,该事件已暂停了 20.023601 s 【core】修改-> 原计划,下次运行时间:2020-02-21 15:12:28(恢复运行后) 【core】修改-> 修改触发器时间后,下次运行时间:2020-02-21 15:12:48 【core】恢复-> 作业ID:date_1582269088_557 恢复成功,恢复时间:2020-02-21 15:12:18,恢复后将在 2020-02-21 15:12:48 执行 <限时处理> 请尽快处理,需要 2020-02-21 15:12:48 前处理完成。 <限时处理> 是否已结单: 未在规定时间处理好故障单:fault_xxxx,推送通知! 【core】监听-> 作业ID:date_1582269088_557 在 2020-02-21 15:12:48 执行成功 :) """ # 当完成处理后删除该定时作业。 ``` ### APScheduler在Django中的应用 将`apscheduler_core.py`放在和`settings.py`同级目录下,在一个app的`views.py`中引入 ```python from django.shortcuts import render, HttpResponse from DjangoEncryptLicense.apscheduler_core import scheduler, datetime2str import datetime def index(request): return HttpResponse('<h1>主页</h1>') def job_schedule_reminder(msg): """ 定时通知 :param msg: 消息内容 :return: """ with open('apscheduler_job.log', 'a', encoding='utf-8') as f: f.write('{} 执行任务:{}\n'.format(datetime2str(datetime.datetime.now()), msg)) f.close() print('系统通知:', msg) def start_job(request): job_id = scheduler.add_job(func=job_schedule_reminder, trigger='interval', trigger_time='5 0 0 0 0', func_args=['interval:每隔5s发个通知']) # 每隔xx时间 return HttpResponse(f'作业已创建:{job_id}') def pause_job(request, job_id): scheduler.pause_job(job_id) return HttpResponse(f'作业已暂停:{job_id}') def resume_job(request, job_id): scheduler.resume_job(job_id) return HttpResponse(f'作业已恢复:{job_id}') def del_job(request, job_id): scheduler.remove_job(job_id) return HttpResponse(f'作业已删除:{job_id}') ``` 设置`urls.py` ```python from django.urls import path from .views import index, start_job, pause_job, resume_job, del_job urlpatterns = [ path('', index), path('start/', start_job), path('pause/<str:job_id>/', pause_job), path('resume/<str:job_id>/', resume_job), path('del/<str:job_id>/', del_job), ] ``` > 有一个问题,假如同时启动多个Server,则APScheduler作业将会在不同的进程中同时多次执行。 https://www.cnblogs.com/zhangliang91/p/11603916.html https://github.com/guomaoqiu/JobCenter/blob/master/app/job/views.py ## 分布式场景下使用APScheduler https://cloud.tencent.com/developer/article/1586884 # APScheduler文档翻译 翻译复制 https://www.jianshu.com/p/4f5305e220f0 ## 安装 APScheduler ```bash pip install apscheduler ``` ## 快速开始 ```python from apscheduler.schedulers.blocking import BlockingScheduler scheduler = BlockingScheduler() @scheduler.scheduled_job('cron', hour='8-23') def request_update_status(): print('Doing job') scheduler.start() ``` ## 基本概念-四个组件 APScheduler四大组件: - 触发器 `triggers` :用于设定触发任务的条件 - 任务储存器 `job stores`:用于存放任务,把任务存放在内存或数据库中 - 执行器 `executors`: 用于执行任务,可以设定执行模式为单线程或线程池 - 调度器 `schedulers`: 把上方三个组件作为参数,通过创建调度器实例来运行 ### 触发器 每一个任务都有自己的触发器,触发器用于决定任务下次运行的时间。 ### 任务储存器 默认情况下,任务存放在内存中。也可以配置存放在不同类型的数据库中。如果任务存放在数据库中,那么任务的存取有一个序列化和反序列化的过程,同时修改和搜索任务的功能也是由任务储存器实现。 > **注!一个任务储存器不要共享给多个调度器,否则会导致状态混乱** ### 执行器 任务会被执行器放入线程池或进程池去执行,执行完毕后,执行器会通知调度器。 ### 调度器 一个调度器由上方三个组件构成,一般来说,一个程序只要有一个调度器就可以了。开发者也不必直接操作任务储存器、执行器以及触发器,因为调度器提供了统一的接口,通过调度器就可以操作组件,比如任务的增删改查。 ## 调度器组件详解 根据开发需求选择相应的组件,下面是不同的**调度器**组件: - `BlockingScheduler` 阻塞式调度器:适用于只跑调度器的程序。 - `BackgroundScheduler` 后台调度器:适用于非阻塞的情况,调度器会在后台独立运行。 - `AsyncIOScheduler` AsyncIO调度器,适用于应用使用AsnycIO的情况。 - `GeventScheduler` Gevent调度器,适用于应用通过Gevent的情况。 - `TornadoScheduler` Tornado调度器,适用于构建Tornado应用。 - `TwistedScheduler` Twisted调度器,适用于构建Twisted应用。 - `QtScheduler` Qt调度器,适用于构建Qt应用。 **任务储存器**的选择,要看任务是否需要持久化。如果你运行的任务是无状态的,选择默认任务储存器`MemoryJobStore`就可以应付。但是,如果你需要在程序关闭或重启时,保存任务的状态,那么就要选择持久化的任务储存器。如果,作者推荐使用`SQLAlchemyJobStore`并搭配`PostgreSQL`作为后台数据库。这个方案可以提供强大的数据整合与保护功能。 **执行器**的选择,同样要看你的实际需求。默认的`ThreadPoolExecutor`线程池执行器方案可以满足大部分需求。如果,你的程序是计算密集型的,那么最好用`ProcessPoolExecutor`进程池执行器方案来充分利用多核算力。也可以将`ProcessPoolExecutor`作为第二执行器,混合使用两种不同的执行器。 配置一个任务,就要设置一个任务**触发器**。触发器可以设定任务运行的周期、次数和时间。APScheduler有三种内置的触发器: - `date` 日期:触发任务运行的具体日期 - `interval` 间隔:触发任务运行的时间间隔 - `cron` 周期:触发任务运行的周期 一个任务也可以设定多种触发器,比如,可以设定同时满足所有触发器条件而触发,或者满足一项即触发。复合触发器,请查阅一下文档:[链接](https://apscheduler.readthedocs.io/en/latest/modules/triggers/combining.html#module-apscheduler.triggers.combining) ## 触发器详解 ### date 在指定时间点触发任务 [文档](https://apscheduler.readthedocs.io/en/latest/modules/triggers/date.html#module-apscheduler.triggers.date) ```python from datetime import date from apscheduler.schedulers.blocking import BlockingScheduler sched = BlockingScheduler() def my_job(text): print(text) # 在2009年11月6日执行 sched.add_job(my_job, 'date', run_date=date(2009, 11, 6), args=['text']) sched.start() ``` 其中`run_date`参数可以是date类型、datetime类型或文本类型。 datetime类型(用于精确时间) ```python # 在2009年11月6日 16:30:05执行 sched.add_job(my_job, 'date', run_date=datetime(2009, 11, 6, 16, 30, 5), args=['text']) ``` 文本类型 ```python sched.add_job(my_job, 'date', run_date='2009-11-06 16:30:05', args=['text']) ``` 未指定时间,则会立即执行 ```python # 未显式指定,那么则立即执行 sched.add_job(my_job, args=['text']) ``` ### interval 周期触发任务 [文档](https://apscheduler.readthedocs.io/en/latest/modules/triggers/interval.html#module-apscheduler.triggers.interval) ```python from datetime import datetime from apscheduler.schedulers.blocking import BlockingScheduler def job_function(): print("Hello World") sched = BlockingScheduler() # 每2小时触发 sched.add_job(job_function, 'interval', hours=2) sched.start() ``` 你可以框定周期开始时间`start_date`和结束时间`end_date`。 ```python # 周期触发的时间范围在2010-10-10 9:30 至 2014-06-15 11:00 sched.add_job(job_function, 'interval', hours=2, start_date='2010-10-10 09:30:00', end_date='2014-06-15 11:00:00') ``` 也可以通过`scheduled_job()`装饰器实现 ```python from apscheduler.scheduler import BlockingScheduler @sched.scheduled_job('interval', id='my_job_id', hours=2) def job_function(): print("Hello World") ``` `jitter`振动参数,给每次触发添加一个随机浮动秒数,一般适用于多服务器,避免同时运行造成服务拥堵。 ```python # 每小时(上下浮动120秒区间内)运行`job_function` sched.add_job(job_function, 'interval', hours=1, jitter=120) ``` ### cron 强大的类crontab表达式 [文档](https://apscheduler.readthedocs.io/en/latest/modules/triggers/cron.html#module-apscheduler.triggers.cron) ```python # 注意参数顺序 class apscheduler.triggers.cron.CronTrigger( year=None, month=None, day=None, week=None, day_of_week=None, hour=None, minute=None, second=None, start_date=None, end_date=None, timezone=None, jitter=None) ``` 当省略时间参数时,在显式指定参数之前的参数会被设定为`*`,之后的参数会被设定为最小值,`week` 和`day_of_week`的最小值为`*`。比如,设定`day=1, minute=20`等同于设定`year='*', month='*', day=1, week='*', day_of_week='*', hour='*', minute=20, second=0`,即每个月的第一天,且当分钟到达20时就触发。 #### 表达式类型 | 表达式 | 参数类型 | 描述 | | --- | --- | --- | | `*` | 所有 | 通配符。例:`minutes=*`即每分钟触发 | | `*/a` | 所有 | 可被a整除的通配符。 | | `a-b` | 所有 | 范围a-b触发 | | `a-b/c` | 所有 | 范围a-b,且可被c整除时触发 | | `xth y` | 日 | 第几个星期几触发。x为第几个,y为星期几 | | `last x` | 日 | 一个月中,最后个星期几触发 | | `last` | 日 | 一个月最后一天触发 | | `x,y,z` | 所有 | 组合表达式,可以组合确定值或上方的表达式 | > **注!`month`和`day_of_week`参数分别接受的是英语缩写`jan`– `dec` 和 `mon` – `sun`** ```python from apscheduler.schedulers.blocking import BlockingScheduler def job_function(): print "Hello World" sched = BlockingScheduler() # 任务会在6月、7月、8月、11月和12月的第三个周五,00:00、01:00、02:00和03:00触发 sched.add_job(job_function, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3') sched.start() ``` `start_date` 和 `end_date` 可以用来适用时间范围 ```python # 在2014-05-30 00:00:00前,每周一到每周五 5:30运行 sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2014-05-30') ``` 通过 [`scheduled_job()`](https://apscheduler.readthedocs.io/en/latest/modules/schedulers/base.html#apscheduler.schedulers.base.BaseScheduler.scheduled_job) 装饰器实现: ```python @sched.scheduled_job('cron', id='my_job_id', day='last sun') def some_decorated_task(): print("I am printed at 00:00:00 on the last Sunday of every month!") ``` 使用标准crontab表达式: ```python sched.add_job(job_function, CronTrigger.from_crontab('0 0 1-15 may-aug *')) ``` 也可以添加`jitter`振动参数 ```python # 每小时上下浮动120秒触发 sched.add_job(job_function, 'cron', hour='*', jitter=120) ``` ### 夏令时问题 有些`timezone`时区可能会有夏令时的问题。这个可能导致令时切换时,任务不执行或任务执行两次。避免这个问题,可以使用`UTC`时间,或提前预知并规划好执行的问题。 ```python # 在Europe/Helsinki时区, 在三月最后一个周一就不会触发;在十月最后一个周一会触发两次 sched.add_job(job_function, 'cron', hour=3, minute=30) ``` ## 配置调度器 APScheduler 有多种不同的配置方法,你可以选择直接传字典或传参的方式创建调度器;也可以先实例一个调度器对象,再添加配置信息。灵活的配置方式可以满足各种应用场景的需要。 整套的配置选项可以参考API文档[`BaseScheduler`](https://apscheduler.readthedocs.io/en/latest/modules/schedulers/base.html#apscheduler.schedulers.base.BaseScheduler)类。一些调度器子类可能有它们自己特有的配置选项,以及独立的任务储存器和执行器也可能有自己特有的配置选项,可以查阅API文档了解。 下面举一个例子,创建一个使用默认任务储存器和执行器的`BackgroundScheduler`: ```python from apscheduler.schedulers.background import BackgroundScheduler scheduler = BackgroundScheduler() # 因为是非阻塞的后台调度器,所以程序会继续向下执行 ``` 这样就可以创建了一个后台调度器。这个调度器有一个名称为`default`的`MemoryJobStore`(内存任务储存器)和一个名称是`default`且最大线程是10的`ThreadPoolExecutor`(线程池执行器)。 假如你现在有这样的需求,两个任务储存器分别搭配两个执行器;同时,还要修改任务的默认参数;最后还要改时区。可以参考下面例子,它们是完全等价的。 - 名称为“mongo”的`MongoDBJobStore` - 名称为“default”的`SQLAlchemyJobStore` - 名称为“ThreadPoolExecutor ”的`ThreadPoolExecutor`,最大线程20个 - 名称“processpool”的`ProcessPoolExecutor`,最大进程5个 - UTC时间作为调度器的时区 - 默认为新任务关闭*合并模式*() - 设置新任务的默认最大实例数为3 方法一: ```python from pytz import utc from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.jobstores.mongodb import MongoDBJobStore from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor jobstores = { 'mongo': MongoDBJobStore(), 'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite') } executors = { 'default': ThreadPoolExecutor(20), 'processpool': ProcessPoolExecutor(5) } job_defaults = { 'coalesce': False, 'max_instances': 3 } scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc) ``` 方法二: ```python from apscheduler.schedulers.background import BackgroundScheduler # The "apscheduler." prefix is hard coded scheduler = BackgroundScheduler({ 'apscheduler.jobstores.mongo': { 'type': 'mongodb' }, 'apscheduler.jobstores.default': { 'type': 'sqlalchemy', 'url': 'sqlite:///jobs.sqlite' }, 'apscheduler.executors.default': { 'class': 'apscheduler.executors.pool:ThreadPoolExecutor', 'max_workers': '20' }, 'apscheduler.executors.processpool': { 'type': 'processpool', 'max_workers': '5' }, 'apscheduler.job_defaults.coalesce': 'false', 'apscheduler.job_defaults.max_instances': '3', 'apscheduler.timezone': 'UTC', }) ``` 方法三: ```python from pytz import utc from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore from apscheduler.executors.pool import ProcessPoolExecutor jobstores = { 'mongo': {'type': 'mongodb'}, 'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite') } executors = { 'default': {'type': 'threadpool', 'max_workers': 20}, 'processpool': ProcessPoolExecutor(max_workers=5) } job_defaults = { 'coalesce': False, 'max_instances': 3 } scheduler = BackgroundScheduler() # ..这里可以添加任务 scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc) ``` ## 启动调度器 启动调度器是只需调用`start()`即可。除了`BlockingScheduler`,非阻塞调度器都会立即返回,可以继续运行之后的代码,比如添加任务等。 对于`BlockingScheduler`,程序则会阻塞在`start()`位置,所以,要运行的代码必须写在`start()`之前。 > **注!调度器启动后,就不能修改配置了。** ## 添加任务 添加任务的方法有两种: 1. 通过调用`add_job()` 2. 通过装饰器`scheduled_job()` 第一种方法是最常用的;第二种方法是最方便的,但缺点就是运行时,不能修改任务。第一种`add_job()`方法会返回一个`apscheduler.job.Job`实例,这样就可以在运行时,修改或删除任务。 在*任何时候*你都能配置任务。但是如果调度器还没有启动,此时添加任务,那么任务就处于一个暂存的状态。只有当调度器启动时,才会开始计算下次运行时间。 还有一点要注意,如果你的执行器或任务储存器是会序列化任务的,那么这些任务就必须符合: 1. 回调函数必须全局可用 2. 回调函数参数必须也是可以被序列化的 内置任务储存器中,只有`MemoryJobStore`不会序列化任务;内置执行器中,只有`ProcessPoolExecutor`会序列化任务。 > **重要提醒!** > 如果在程序初始化时,是从数据库读取任务的,那么必须为每个任务定义一个明确的ID,并且使用`replace_existing=True`,否则每次重启程序,你都会得到一份新的任务拷贝,也就意味着任务的状态不会保存。 > **建议** > 如果想要立刻运行任务,可以在添加任务时省略`trigger`参数 ## 移除任务 如果想从调度器移除一个任务,那么你就要从相应的任务储存器中移除它,这样才算移除了。有两种方式: 1. 调用`remove_job()`,参数为:任务ID,任务储存器名称 2. 在通过`add_job()`创建的任务实例上调用`remove()`方法 第二种方式更方便,但前提必须在创建任务实例时,实例被保存在变量中。对于通过`scheduled_job()`创建的任务,只能选择第一种方式。 当任务调度结束时(比如,某个任务的触发器不再产生下次运行的时间),任务就会自动移除。 ```python job = scheduler.add_job(myfunc, 'interval', minutes=2) job.remove() ``` 同样,通过任务的具体ID: ```python scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id') scheduler.remove_job('my_job_id') ``` ## 暂停和恢复任务 通过任务实例或调度器,就能暂停和恢复任务。如果一个任务被暂停了,那么该任务的下一次运行时间就会被移除。在恢复任务前,运行次数计数也不会被统计。 暂停任务,有以下两个方法: - `apscheduler.job.Job.pause()` - `apscheduler.schedulers.base.BaseScheduler.pause_job()` 恢复任务, - `apscheduler.job.Job.resume()` - `apscheduler.schedulers.base.BaseScheduler.resume_job()` ## 获取任务列表 通过`get_jobs()`就可以获得一个可修改的任务列表。`get_jobs()`第二个参数可以指定任务储存器名称,那么就会获得对应任务储存器的任务列表。 `print_jobs()`可以快速打印格式化的任务列表,包含触发器,下次运行时间等信息。 ## 修改任务 通过`apscheduler.job.Job.modify()`或`modify_job()`,你可以修改任务当中除了`id`的任何属性。 比如: ```python job.modify(max_instances=6, name='Alternate name') ``` 如果想要重新调度任务(就是改变触发器),你能通过`apscheduler.job.Job.reschedule()`或`reschedule_job()`来实现。这些方法会重新创建触发器,并重新计算下次运行时间。 比如: ```python scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5') ``` ## 关闭调度器 关闭方法如下: ```python scheduler.shutdown() ``` 默认情况下,调度器会先把正在执行的任务处理完,再关闭任务储存器和执行器。但是,如果你就直接关闭,你可以添加参数: ```python scheduler.shutdown(wait=False) ``` 上述方法不管有没有任务在执行,会强制关闭调度器。 ## 暂停、恢复任务进程 调度器可以暂停正在执行的任务: ```python scheduler.pause() ``` 也可以恢复任务: ```python scheduler.resume() ``` 同时,也可以在调度器启动时,默认所有任务设为暂停状态。 ```python scheduler.start(paused=True) ``` ## 限制任务执行的实例并行数 默认情况下,在同一时间,一个任务只允许一个执行中的实例在运行。比如说,一个任务是每5秒执行一次,但是这个任务在第一次执行的时候花了6秒,也就是说前一次任务还没执行完,后一次任务又触发了,由于默认一次只允许一个实例执行,所以第二次就丢失了。为了杜绝这种情况,可以在添加任务时,设置`max_instances`参数,为指定任务设置最大实例并行数。 ## 丢失任务的执行与合并 有时,任务会由于一些问题没有被执行。最常见的情况就是,在数据库里的任务到了该执行的时间,但调度器被关闭了,那么这个任务就成了“哑弹任务”。错过执行时间后,调度器才打开了。这时,调度器会检查每个任务的`misfire_grace_time`参数`int`值,即哑弹上限,来确定是否还执行哑弹任务(这个参数可以全局设定的或者是为每个任务单独设定)。此时,一个哑弹任务,就可能会被连续执行多次。 但这就可能导致一个问题,有些哑弹任务实际上并不需要被执行多次。`coalescing`合并参数就能把一个多次的哑弹任务揉成一个一次的哑弹任务。也就是说,`coalescing`为`True`能把多个排队执行的同一个哑弹任务,变成一个,而不会触发哑弹事件。 > **注!如果是由于线程池/进程池满了导致的任务延迟,执行器就会跳过执行。要避免这个问题,可以添加进程或线程数来实现或把 `misfire_grace_time`值调高。** ## 调度器事件 调度器允许添加事件侦听器。部分事件会有特有的信息,比如当前运行次数等。`add_listener(callback,mask)`中,第一个参数是回调对象,`mask`是指定侦听事件类型,`mask`参数也可以是逻辑组合。回调对象会有一个参数就是触发的事件。 具体可以查看文档中[`events`](https://apscheduler.readthedocs.io/en/latest/modules/events.html#module-apscheduler.events)模块,里面有关于事件类型以及事件参数的详细说明。 ```python def my_listener(event): if event.exception: print('The job crashed :(') else: print('The job worked :)') # 当任务执行完或任务出错时,调用my_listener scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR) ``` ### 事件类型 | Constant | Description | Event class | | --- | --- | --- | | EVENT\_SCHEDULER\_STARTED | The scheduler was started | [`SchedulerEvent`](https://apscheduler.readthedocs.io/en/latest/modules/events.html#apscheduler.events.SchedulerEvent) | | EVENT\_SCHEDULER\_SHUTDOWN | The scheduler was shut down | [`SchedulerEvent`](https://apscheduler.readthedocs.io/en/latest/modules/events.html#apscheduler.events.SchedulerEvent) | | EVENT\_SCHEDULER\_PAUSED | Job processing in the scheduler was paused | [`SchedulerEvent`](https://apscheduler.readthedocs.io/en/latest/modules/events.html#apscheduler.events.SchedulerEvent) | | EVENT\_SCHEDULER\_RESUMED | Job processing in the scheduler was resumed | [`SchedulerEvent`](https://apscheduler.readthedocs.io/en/latest/modules/events.html#apscheduler.events.SchedulerEvent) | | EVENT\_EXECUTOR\_ADDED | An executor was added to the scheduler | [`SchedulerEvent`](https://apscheduler.readthedocs.io/en/latest/modules/events.html#apscheduler.events.SchedulerEvent) | | EVENT\_EXECUTOR\_REMOVED | An executor was removed to the scheduler | [`SchedulerEvent`](https://apscheduler.readthedocs.io/en/latest/modules/events.html#apscheduler.events.SchedulerEvent) | | EVENT\_JOBSTORE\_ADDED | A job store was added to the scheduler | [`SchedulerEvent`](https://apscheduler.readthedocs.io/en/latest/modules/events.html#apscheduler.events.SchedulerEvent) | | EVENT\_JOBSTORE\_REMOVED | A job store was removed from the scheduler | [`SchedulerEvent`](https://apscheduler.readthedocs.io/en/latest/modules/events.html#apscheduler.events.SchedulerEvent) | | EVENT\_ALL\_JOBS_REMOVED | All jobs were removed from either all job stores or one particular job store | [`SchedulerEvent`](https://apscheduler.readthedocs.io/en/latest/modules/events.html#apscheduler.events.SchedulerEvent) | | EVENT\_JOB\_ADDED | A job was added to a job store | [`JobEvent`](https://apscheduler.readthedocs.io/en/latest/modules/events.html#apscheduler.events.JobEvent) | | EVENT\_JOB\_REMOVED | A job was removed from a job store | [`JobEvent`](https://apscheduler.readthedocs.io/en/latest/modules/events.html#apscheduler.events.JobEvent) | | EVENT\_JOB\_MODIFIED | A job was modified from outside the scheduler | [`JobEvent`](https://apscheduler.readthedocs.io/en/latest/modules/events.html#apscheduler.events.JobEvent) | | EVENT\_JOB\_SUBMITTED | A job was submitted to its executor to be run | [`JobSubmissionEvent`](https://apscheduler.readthedocs.io/en/latest/modules/events.html#apscheduler.events.JobSubmissionEvent) | | EVENT\_JOB\_MAX_INSTANCES | A job being submitted to its executor was not accepted by the executor because the job has already reached its maximum concurrently executing instances | [`JobSubmissionEvent`](https://apscheduler.readthedocs.io/en/latest/modules/events.html#apscheduler.events.JobSubmissionEvent) | | EVENT\_JOB\_EXECUTED | A job was executed successfully | [`JobExecutionEvent`](https://apscheduler.readthedocs.io/en/latest/modules/events.html#apscheduler.events.JobExecutionEvent) | | EVENT\_JOB\_ERROR | A job raised an exception during execution | [`JobExecutionEvent`](https://apscheduler.readthedocs.io/en/latest/modules/events.html#apscheduler.events.JobExecutionEvent) | | EVENT\_JOB\_MISSED | A job’s execution was missed | [`JobExecutionEvent`](https://apscheduler.readthedocs.io/en/latest/modules/events.html#apscheduler.events.JobExecutionEvent) | | EVENT_ALL | A catch-all mask that includes every event type | N/A | ## 异常捕获 通过logging模块,可以添加`apscheduler`日志至`DEBUG`级别,这样就能捕获异常信息。 关于`logging`初始化的方式如下: ```python import logging logging.basicConfig() logging.getLogger('apscheduler').setLevel(logging.DEBUG) ``` 日志会提供很多调度器的内部运行信息。 https://mp.weixin.qq.com/s/TTUFQRQ_DiKktJ5-J8O8Pg

很赞哦! (3)

文章交流

  • emoji
2人参与,2条评论
xiaolei 2022年10月12日 10:33
留言正在审核中...
xiaolei 2022年10月12日 10:32
留言正在审核中...

当前用户

未登录,点击   登录

站点信息

  • 建站时间:网站已运行2223天
  • 系统信息:Linux
  • 后台程序:Python: 3.8.10
  • 网站框架:Django: 3.2.6
  • 文章统计:257 篇
  • 文章评论:60 条
  • 腾讯分析网站概况-腾讯分析
  • 百度统计网站概况-百度统计
  • 公众号:微信扫描二维码,关注我们
  • QQ群:QQ加群,下载网站的学习源码
返回
顶部
标题 换行 登录
网站