import time from apscheduler.schedulers.background import BackgroundScheduler from django_apscheduler.jobstores import DjangoJobStore from django_apscheduler.models import DjangoJob import datetime import pytz class ApschedulerObject: def __init__(self, timezone_offset=0.00): # 计算时区偏移量(以分钟为单位) timezone_offset_minutes = int(timezone_offset * 60) timezone = pytz.FixedOffset(timezone_offset_minutes) self.scheduler = BackgroundScheduler(timezone=timezone) self.scheduler.add_jobstore(DjangoJobStore(), 'default') self.scheduler.start() @staticmethod def auto_hello(x): # 任务 now_time = time.time() print('hello world: {} {}'.format(x, now_time)) def create_cron_job(self, func, task_id, day_of_week, hour, minute, args, second=0): # 周期任务 # day_of_week: 0,1,2,3,4,5,6对应周一到周日,也可写mon,tue,wed,thu,fri,sat,sun self.scheduler.add_job(func=func, trigger='cron', day_of_week=day_of_week, hour=hour, minute=minute, second=second, replace_existing=True, id=task_id, max_instances=1, coalesce=False, args=args, misfire_grace_time=300) def create_interval_job(self, func, task_id, minutes, start_time, end_time, args): # 间隔任务 self.scheduler.add_job(func=func, trigger='interval', minutes=minutes, start_date=datetime.datetime.fromtimestamp(start_time), end_date=datetime.datetime.fromtimestamp(end_time), replace_existing=True, id=task_id, max_instances=1, coalesce=False, args=args) def create_date_job(self, func, task_id, time_stamp, args): """ 创建时间点任务 @param func: @param task_id: @param time_stamp: @param args: @return: """ self.scheduler.add_job(func=func, trigger='date', run_date=datetime.datetime.fromtimestamp(time_stamp), replace_existing=True, id=task_id, max_instances=1, coalesce=False, args=args) @staticmethod def del_job(task_id): # 删除任务 DjangoJob.objects.filter(id=task_id).delete() def pause_job(self, task_id): # 暂停任务 self.scheduler.pause_job(task_id) def resume_job(self, task_id): # 恢复任务 self.scheduler.resume_job(task_id)