ソースを参照

定时任务执行一次测试接口

locky 1 年間 前
コミット
9e5f7c6066
2 ファイル変更42 行追加1 行削除
  1. 13 1
      Controller/TestApi.py
  2. 29 0
      Object/ApschedulerObject.py

+ 13 - 1
Controller/TestApi.py

@@ -36,6 +36,7 @@ from Ansjer.config import (
     CONFIG_TEST,
     BASE_DIR
 )
+from Controller.SensorGateway.SmartSceneController import SmartSceneView
 from Model.models import (
     CompanySerialModel,
     Device_Info,
@@ -61,7 +62,7 @@ from Object.TokenObject import TokenObject
 from Object.utils.PayPalUtil import PayPalService
 from Service.CommonService import CommonService
 from Service.VodHlsService import SplitVodHlsObject
-from Object.ApschedulerObject import ApschedulerObject
+from Object.ApschedulerObject import ApschedulerObject, ApschedulerObjectTest
 
 ACCESS_KEY = "AKIA2E67UIMD3CYTIWPA"
 SECRET_KEY = "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"
@@ -175,6 +176,8 @@ class testView(View):
             return self.oci_oss(request, response)
         elif operation == 'genericReportPush':  # 设备上传日志
             return self.generic_report_push(request_dict, response)
+        elif operation == 'date_apscheduler':  # date定时任务测试
+            return self.date_apscheduler(request_dict, response)
         else:
             return response.json(414)
 
@@ -1218,3 +1221,12 @@ class testView(View):
         except Exception as e:
             return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
 
+    @staticmethod
+    def date_apscheduler(request_dict, response):
+        task_id = request_dict.get('task_id', None)
+        time_stamp = int(request_dict.get('time_stamp', None))
+        try:
+            ApschedulerObjectTest(SmartSceneView.pub_mqtt, task_id, time_stamp, (201, 2010, '000IAR11L'), 8.0)
+            return response.json(0)
+        except Exception as e:
+            return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))

+ 29 - 0
Object/ApschedulerObject.py

@@ -60,3 +60,32 @@ class ApschedulerObject:
 
     def resume_job(self, task_id):  # 恢复任务
         self.scheduler.resume_job(task_id)
+
+
+class ApschedulerObjectTest:
+    def __init__(self, func, task_id, time_stamp, args, timezone_offset=0.00):
+        try:
+            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+            sock.bind(('127.0.0.1', 12345))
+        except socket.error:
+            pass
+        else:
+            # 计算时区偏移量(以分钟为单位)
+            timezone_offset_minutes = int(timezone_offset * 60)
+            timezone = pytz.FixedOffset(timezone_offset_minutes)
+            self.scheduler = BackgroundScheduler(timezone=timezone)
+            self.scheduler.add_jobstore(DjangoJobStore(), 'default')
+            self.create_date_job(func, task_id, time_stamp, args)
+            self.scheduler.start()
+
+    def create_date_job(self, func, task_id, time_stamp, args):
+        """
+        创建时间点任务
+        @param func:
+        @param task_id:
+        @param time_stamp:
+        @param args:
+        @return:
+        """
+        self.scheduler.add_job(func=func, trigger='date', run_date=datetime.datetime.fromtimestamp(time_stamp),
+                               replace_existing=True, id=task_id, max_instances=1, coalesce=False, args=args)