12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455 |
- import time
- from apscheduler.schedulers.background import BackgroundScheduler
- from django_apscheduler.jobstores import DjangoJobStore
- from django_apscheduler.models import DjangoJob
- import datetime
- import pytz
- class ApschedulerObject:
- def __init__(self, timezone_offset=0.00):
- # 计算时区偏移量(以分钟为单位)
- timezone_offset_minutes = int(timezone_offset * 60)
- timezone = pytz.FixedOffset(timezone_offset_minutes)
- self.scheduler = BackgroundScheduler(timezone=timezone)
- self.scheduler.add_jobstore(DjangoJobStore(), 'default')
- self.scheduler.start()
- @staticmethod
- def auto_hello(x): # 任务
- now_time = time.time()
- print('hello world: {} {}'.format(x, now_time))
- def create_cron_job(self, func, task_id, day_of_week, hour, minute, args, second=0): # 周期任务
- # day_of_week: 0,1,2,3,4,5,6对应周一到周日,也可写mon,tue,wed,thu,fri,sat,sun
- self.scheduler.add_job(func=func, trigger='cron', day_of_week=day_of_week, hour=hour, minute=minute,
- second=second, replace_existing=True, id=task_id, max_instances=1, coalesce=False,
- args=args, misfire_grace_time=300)
- def create_interval_job(self, func, task_id, minutes, start_time, end_time, args): # 间隔任务
- self.scheduler.add_job(func=func, trigger='interval', minutes=minutes,
- start_date=datetime.datetime.fromtimestamp(start_time),
- end_date=datetime.datetime.fromtimestamp(end_time),
- replace_existing=True, id=task_id, max_instances=1, coalesce=False, args=args)
- def create_date_job(self, func, task_id, time_stamp, args):
- """
- 创建时间点任务
- @param func:
- @param task_id:
- @param time_stamp:
- @param args:
- @return:
- """
- self.scheduler.add_job(func=func, trigger='date', run_date=datetime.datetime.fromtimestamp(time_stamp),
- replace_existing=True, id=task_id, max_instances=1, coalesce=False, args=args)
- @staticmethod
- def del_job(task_id): # 删除任务
- DjangoJob.objects.filter(id=task_id).delete()
- def pause_job(self, task_id): # 暂停任务
- self.scheduler.pause_job(task_id)
- def resume_job(self, task_id): # 恢复任务
- self.scheduler.resume_job(task_id)
|