locky hace 1 año
padre
commit
94ae467bae

+ 2 - 0
Ansjer/__init__.py

@@ -0,0 +1,2 @@
+from .celery import app as celery_app
+__all__ = ('celery_app', )

+ 37 - 0
Ansjer/celery.py

@@ -0,0 +1,37 @@
+# @Author    : Rocky
+# @File      : celery.py
+# @Time      : 2024/3/12 14:13
+# @Author    : Rocky
+# @File      : celery.py
+# @Time      : 2024/3/12 11:52
+import os
+
+from django.utils import timezone
+from celery import Celery
+
+
+# 从环境变量中获取Django环境,
+# win:set DJANGO_ENV=local
+# linux: export DJANGO_ENV=local,或vim ~/.bashrc设置环境变量
+django_env = os.environ.get('DJANGO_ENV')
+if django_env == 'local':
+    os.environ.setdefault("DJANGO_SETTINGS_MODULE", 'Ansjer.local_config.local_settings')
+elif django_env == 'test':
+    os.environ.setdefault("DJANGO_SETTINGS_MODULE", 'Ansjer.cn_config.test_settings')
+elif django_env == 'cn':
+    os.environ.setdefault("DJANGO_SETTINGS_MODULE", 'Ansjer.cn_config.formal_settings')
+elif django_env == 'us':
+    os.environ.setdefault("DJANGO_SETTINGS_MODULE", 'Ansjer.us_config.formal_settings')
+elif django_env == 'eu':
+    os.environ.setdefault("DJANGO_SETTINGS_MODULE", 'Ansjer.eur_config.formal_settings')
+
+app = Celery('apps')
+
+# 使用Django的settings文件配置Celery
+app.config_from_object('django.conf:settings', namespace='CELERY')
+
+# 自动从django注册的app中发现所有任务
+app.autodiscover_tasks()
+
+# 解决时区问题,定时任务启动就循环输出
+app.now = timezone.now

+ 13 - 1
Ansjer/cn_config/test_settings.py

@@ -20,7 +20,8 @@ INSTALLED_APPS = [
     'Model',
     'PushModel',
     'django_apscheduler',
-    'AgentModel'
+    'AgentModel',
+    'django_celery_beat'
 ]
 
 MIDDLEWARE = [
@@ -163,6 +164,17 @@ AUTH_PASSWORD_VALIDATORS = [
     },
 ]
 
+# celery配置,需要以CELERY_开头
+# 设置结果存储
+CELERY_RESULT_BACKEND = 'redis://{}:6379/1'.format(SERVER_HOST)
+# 设置代理人broker
+CELERY_BROKER_URL = 'redis://{}:6379/0'.format(SERVER_HOST)
+
+# 配置定时任务
+CELERY_TIMEZONE = 'Asia/Shanghai'
+DJANGO_CELERY_BEAT_TZ_AWARE = True
+CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
+
 LANGUAGE_CODE = 'en-us'
 TIME_ZONE = 'Asia/Shanghai'
 # TIME_ZONE = 'UTC'

+ 15 - 2
Ansjer/local_config/local_settings.py

@@ -52,7 +52,8 @@ INSTALLED_APPS = [
     'Model',
     'PushModel',
     'django_apscheduler',
-    'AgentModel'
+    'AgentModel',
+    'django_celery_beat',
 ]
 
 MIDDLEWARE = [
@@ -132,7 +133,8 @@ DATABASE_APPS_MAPPING = {
     'Model': 'default',
     'PushModel': 'mysql02',
     'django_apscheduler': 'default',
-    'AgentModel': 'mysql03'
+    'AgentModel': 'mysql03',
+    'django_celery_beat': 'default'
 }
 
 AUTH_PASSWORD_VALIDATORS = [
@@ -150,6 +152,17 @@ AUTH_PASSWORD_VALIDATORS = [
     },
 ]
 
+# celery配置,需要以CELERY_开头
+# 设置结果存储
+CELERY_RESULT_BACKEND = 'redis://{}:6379/1'.format(SERVER_HOST)
+# 设置代理人broker
+CELERY_BROKER_URL = 'redis://{}:6379/0'.format(SERVER_HOST)
+
+# 配置定时任务
+CELERY_TIMEZONE = 'Asia/Shanghai'
+DJANGO_CELERY_BEAT_TZ_AWARE = True
+CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
+
 LANGUAGE_CODE = 'en-us'
 TIME_ZONE = 'Asia/Shanghai'
 # TIME_ZONE = 'UTC'

+ 33 - 0
Controller/CeleryTasks/tasks.py

@@ -0,0 +1,33 @@
+# @Author    : Rocky
+# @File      : tasks.py
+# @Time      : 2024/3/12 14:23
+import time
+
+from celery import shared_task
+from Ansjer.celery import app
+
+from celery.schedules import crontab
+
+import django
+django.setup()
+
+from Model.models import Device_User
+from Ansjer.config import CONFIG_INFO
+
+
+@app.task
+def hello():
+    device_user_qs = Device_User.objects.filter(username='13138137872').values('NickName')
+    nickname = device_user_qs[0]['NickName']
+    print('CONFIG_INFO:{}'.format(CONFIG_INFO) + nickname)
+
+
+@app.task
+def test(arg):
+    time.sleep(10)
+    print(arg)
+
+
+@app.task
+def add(x, y):
+    print(x + y)

+ 65 - 8
Controller/TestApi.py

@@ -1,14 +1,17 @@
+import datetime
 import json
 import logging
 import os
 import time
 import traceback
 import urllib
+import zoneinfo
 
 import boto3
 import botocore
 import cv2
 import oss2
+import pytz
 import requests
 from boto3.session import Session
 from django.contrib.auth.hashers import make_password  # 对密码加密模块
@@ -49,10 +52,11 @@ from Model.models import (
     TestSerialRepetition,
     UID_Bucket,
     UIDCompanySerialModel,
-    VodBucketModel, PaypalWebHookEvent,
+    VodBucketModel, PaypalWebHookEvent, TimeZoneInfo,
 )
 from Object.AliPayObject import AliPayObject
 from Object.AWS.AmazonS3Util import AmazonS3Util
+from Object.CeleryBeatObject import CeleryBeatObj
 from Object.ContentSecurityObject import ContentSecurity
 from Object.IPWeatherObject import IPQuery, OpenWeatherMap, GeoIP2
 from Object.m3u8generate import PlaylistGenerator
@@ -62,7 +66,7 @@ from Object.TokenObject import TokenObject
 from Object.utils.PayPalUtil import PayPalService
 from Service.CommonService import CommonService
 from Service.VodHlsService import SplitVodHlsObject
-from Object.ApschedulerObject import ApschedulerObject, ApschedulerObjectTest
+from Object.ApschedulerObject import ApschedulerObject
 
 ACCESS_KEY = "AKIA2E67UIMD3CYTIWPA"
 SECRET_KEY = "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"
@@ -176,8 +180,8 @@ class testView(View):
             return self.oci_oss(request, response)
         elif operation == 'genericReportPush':  # 设备上传日志
             return self.generic_report_push(request_dict, response)
-        elif operation == 'date_apscheduler':  # date定时任务测试
-            return self.date_apscheduler(request_dict, response)
+        elif operation == 'celery':  # celery
+            return self.celery(request_dict, response)
         else:
             return response.json(414)
 
@@ -1222,11 +1226,64 @@ class testView(View):
             return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
 
     @staticmethod
-    def date_apscheduler(request_dict, response):
-        task_id = request_dict.get('task_id', None)
-        time_stamp = int(request_dict.get('time_stamp', None))
+    def celery(request_dict, response):
+        from celery import Celery
+        from Ansjer.celery import app
+        from django_celery_beat.models import PeriodicTask, IntervalSchedule, ClockedSchedule, CrontabSchedule
+        every = int(request_dict.get('every', None))
+        period = request_dict.get('period', None)
+        name = request_dict.get('name', None)
+        task = request_dict.get('task', None)
+        minute = request_dict.get('minute', None)
+        hour = request_dict.get('hour', None)
+        day_of_week = request_dict.get('day_of_week', None)
+
+        time_string = request_dict.get('time_string', None)
+        timezone_offset = request_dict.get('timezone_offset', None)
+        timezone_offset = float(timezone_offset)
+        celery_beat_obj = CeleryBeatObj()
+        args = ['啊哈']
+        kwargs = {
+            'x': 3,
+            'y': 3
+        }
         try:
-            ApschedulerObjectTest(SmartSceneView.pub_mqtt, task_id, time_stamp, (201, 2010, '000IAR11L'), 8.0)
+            # 间隔任务
+            # celery_beat_obj.creat_interval_task(every, period, name, task)
+            # schedule, _ = IntervalSchedule.objects.get_or_create(every=6, period=IntervalSchedule.SECONDS)
+            # PeriodicTask.objects.create(interval=schedule, name='hello_6_task', task='Controller.CeleryTasks.tasks.hello')
+            # PeriodicTask.objects.create(interval=schedule, name='clock_task',
+            #                             task='Controller.CeleryTasks.tasks.hello')
+
+            # 定时任务
+            # celery_beat_obj.creat_clocked_task(timezone_offset, time_string, name, task, kwargs=kwargs)
+            # time_stamp = CommonService.convert_to_timestamp(timezone_offset, time_string)
+            # clocked_time = CommonService.get_date_from_timestamp(time_stamp, timezone_offset)
+            # schedule, _ = ClockedSchedule.objects.get_or_create(clocked_time=clocked_time)
+            # PeriodicTask.objects.create(clocked=schedule, one_off=True, name=task_name,
+            #                             task='Controller.CeleryTasks.tasks.hello')
+
+            # 周期任务
+            # celery_beat_obj.creat_crontab_task(
+            #     timezone_offset, name, task, minute=minute, hour=hour, day_of_week=day_of_week, kwargs=kwargs)
+            # time_zone_info_qs = TimeZoneInfo.objects.filter(tz=timezone_offset).values('zone_info')
+            # zone_info = time_zone_info_qs[0]['zone_info']
+            # timezone = zoneinfo.ZoneInfo(zone_info)
+            # schedule, _ = CrontabSchedule.objects.get_or_create(
+            #     minute='11',
+            #     hour='16',
+            #     day_of_week='*',
+            #     day_of_month='*',
+            #     month_of_year='*',
+            #     timezone=timezone)
+            # PeriodicTask.objects.create(crontab=schedule, name=task_name,
+            #                             task='Controller.CeleryTasks.tasks.hello')
+
+            # 暂停/恢复/删除任务
+            # celery_beat_obj.disable_task(name)
+            # celery_beat_obj.enable_task(name)
+            celery_beat_obj.del_task(name)
+
             return response.json(0)
         except Exception as e:
             return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))

+ 6 - 42
Object/ApschedulerObject.py

@@ -1,4 +1,3 @@
-import socket
 import time
 from apscheduler.schedulers.background import BackgroundScheduler
 from django_apscheduler.jobstores import DjangoJobStore
@@ -9,18 +8,12 @@ import pytz
 
 class ApschedulerObject:
     def __init__(self, timezone_offset=0.00):
-        try:
-            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-            sock.bind(('127.0.0.1', 12345))
-        except Exception:
-            pass
-        else:
-            # 计算时区偏移量(以分钟为单位)
-            timezone_offset_minutes = int(timezone_offset * 60)
-            timezone = pytz.FixedOffset(timezone_offset_minutes)
-            self.scheduler = BackgroundScheduler(timezone=timezone)
-            self.scheduler.add_jobstore(DjangoJobStore(), 'default')
-            self.scheduler.start()
+        # 计算时区偏移量(以分钟为单位)
+        timezone_offset_minutes = int(timezone_offset * 60)
+        timezone = pytz.FixedOffset(timezone_offset_minutes)
+        self.scheduler = BackgroundScheduler(timezone=timezone)
+        self.scheduler.add_jobstore(DjangoJobStore(), 'default')
+        self.scheduler.start()
 
     @staticmethod
     def auto_hello(x):  # 任务
@@ -60,32 +53,3 @@ class ApschedulerObject:
 
     def resume_job(self, task_id):  # 恢复任务
         self.scheduler.resume_job(task_id)
-
-
-class ApschedulerObjectTest:
-    def __init__(self, func, task_id, time_stamp, args, timezone_offset=0.00):
-        try:
-            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-            sock.bind(('127.0.0.1', 12345))
-        except Exception:
-            pass
-        else:
-            # 计算时区偏移量(以分钟为单位)
-            timezone_offset_minutes = int(timezone_offset * 60)
-            timezone = pytz.FixedOffset(timezone_offset_minutes)
-            self.scheduler = BackgroundScheduler(timezone=timezone)
-            self.scheduler.add_jobstore(DjangoJobStore(), 'default')
-            self.create_date_job(func, task_id, time_stamp, args)
-            self.scheduler.start()
-
-    def create_date_job(self, func, task_id, time_stamp, args):
-        """
-        创建时间点任务
-        @param func:
-        @param task_id:
-        @param time_stamp:
-        @param args:
-        @return:
-        """
-        self.scheduler.add_job(func=func, trigger='date', run_date=datetime.datetime.fromtimestamp(time_stamp),
-                               replace_existing=True, id=task_id, max_instances=1, coalesce=False, args=args)

+ 130 - 0
Object/CeleryBeatObject.py

@@ -0,0 +1,130 @@
+# @Author    : Rocky
+# @File      : CeleryBeatObject.py
+# @Time      : 2024/3/21 16:35
+import json
+import zoneinfo
+
+from django_celery_beat.models import PeriodicTask, IntervalSchedule, ClockedSchedule, CrontabSchedule
+
+from Model.models import TimeZoneInfo
+from Service.CommonService import CommonService
+
+
+class CeleryBeatObj:
+    # celery beat对象,封装定时任务函数
+    # https://github.com/celery/django-celery-beat
+
+    @staticmethod
+    def creat_interval_task(every, period, name, task, args=None, kwargs=None):
+        """
+        创建间隔任务
+        @param every: 时间间隔
+        @param period: 单位,seconds,minutes,hours,days(或IntervalSchedule.SECONDS...)
+        @param name: 任务名称
+        @param task: 任务函数
+        @param args: 参数
+        @param kwargs: 参数
+        @return:
+        """
+        if args is None:
+            args = []
+        args = json.dumps(args)
+        if kwargs is None:
+            kwargs = {}
+        kwargs = json.dumps(kwargs)
+
+        schedule, _ = IntervalSchedule.objects.get_or_create(every=every, period=period)
+        PeriodicTask.objects.create(interval=schedule, name=name, task=task, args=args, kwargs=kwargs)
+
+    @staticmethod
+    def creat_clocked_task(timezone_offset, time_string, name, task, args=None, kwargs=None):
+        """
+        创建定时任务
+        @param timezone_offset: 时区偏移量
+        @param time_string: 时间字符串
+        @param name: 任务名称
+        @param task: 任务函数
+        @param args: 参数
+        @param kwargs: 参数
+        @return:
+        """
+        if args is None:
+            args = []
+        args = json.dumps(args)
+        if kwargs is None:
+            kwargs = {}
+        kwargs = json.dumps(kwargs)
+
+        time_stamp = CommonService.convert_to_timestamp(timezone_offset, time_string)
+        clocked_time = CommonService.get_date_from_timestamp(time_stamp, timezone_offset)
+        schedule, _ = ClockedSchedule.objects.get_or_create(clocked_time=clocked_time)
+        PeriodicTask.objects.create(clocked=schedule, one_off=True, name=name, task=task, args=args, kwargs=kwargs)
+
+    @staticmethod
+    def creat_crontab_task(
+            timezone_offset, name, task,
+            minute='*', hour='*', day_of_week='*', day_of_month='*', month_of_year='*', args=None, kwargs=None):
+        """
+        创建周期任务
+        @param timezone_offset: 时区偏移量
+        @param name: 任务名称
+        @param task: 任务函数
+        @param minute: 分
+        @param hour: 时
+        @param day_of_week: 周,1-7对应周一到周日,也可写mon,tue,wed,thu,fri,sat,sun
+        @param day_of_month: 月
+        @param month_of_year: 年
+        @param args: 参数
+        @param kwargs: 参数
+        @return:
+        """
+        if args is None:
+            args = []
+        args = json.dumps(args)
+        if kwargs is None:
+            kwargs = {}
+        kwargs = json.dumps(kwargs)
+
+        time_zone_info_qs = TimeZoneInfo.objects.filter(tz=timezone_offset).values('zone_info')
+        if time_zone_info_qs.exists():
+            zone_info = time_zone_info_qs[0]['zone_info']
+            timezone = zoneinfo.ZoneInfo(zone_info)
+            schedule, _ = CrontabSchedule.objects.get_or_create(
+                minute=minute,
+                hour=hour,
+                day_of_week=day_of_week,
+                day_of_month=day_of_month,
+                month_of_year=month_of_year,
+                timezone=timezone)
+            PeriodicTask.objects.create(crontab=schedule, name=name, task=task, args=args, kwargs=kwargs)
+
+    @staticmethod
+    def disable_task(name):
+        """
+        暂停任务
+        @param name: 任务名称
+        @return:
+        """
+        periodic_task = PeriodicTask.objects.get(name=name)
+        periodic_task.enabled = False
+        periodic_task.save()
+
+    @staticmethod
+    def enable_task(name):
+        """
+        恢复任务
+        @param name: 任务名称
+        @return:
+        """
+        periodic_task = PeriodicTask.objects.get(name=name)
+        periodic_task.enabled = True
+        periodic_task.save()
+
+    @staticmethod
+    def del_task(name):
+        """
+        删除任务
+        @param name: 任务名称
+        @return:
+        """
+        PeriodicTask.objects.get(name=name).delete()