| 123456789101112131415161718192021222324252627282930313233343536373839404142434445 | import timefrom apscheduler.schedulers.background import BackgroundSchedulerfrom django_apscheduler.jobstores import DjangoJobStorefrom django_apscheduler.models import DjangoJobimport datetimeclass ApschedulerObject:    def __init__(self):        self.scheduler = BackgroundScheduler()        self.scheduler.add_jobstore(DjangoJobStore(), 'default')        self.scheduler.start()    @staticmethod    def auto_hello():  # 任务        now_time = time.time()        print('hello world:[{}]'.format(now_time))    def create_cron_job(self, func, task_id, day_of_week, hour, minute, args):  # 周期任务        job = self.scheduler.add_job(func=func, trigger='cron', day_of_week=day_of_week, hour=hour, minute=minute,                                     replace_existing=True, id=task_id, max_instances=1, coalesce=False, args=args,                                     misfire_grace_time=300)        print(job)    def create_interval_job(self, func, task_id, minutes, start_time, end_time, args):  # 间隔任务        self.scheduler.add_job(func=func, trigger='interval', minutes=minutes,                               start_date=datetime.datetime.fromtimestamp(start_time),                               end_date=datetime.datetime.fromtimestamp(end_time),                               replace_existing=True, id=task_id, max_instances=1, coalesce=False, args=args)    def create_date_job(self, func, task_id, time_stamp, args):        """        创建时间点任务        @param func:        @param task_id:        @param time_stamp:        @param args:        @return:        """        self.scheduler.add_job(func=func, trigger='date', run_date=datetime.datetime.fromtimestamp(time_stamp),                               replace_existing=True, id=task_id, max_instances=1, coalesce=False, args=args)    @staticmethod    def del_job(task_id):  # 删除任务        DjangoJob.objects.filter(id=task_id).delete()
 |