tasks.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. # @Author : Rocky
  2. # @File : tasks.py
  3. # @Time : 2024/3/12 14:23
  4. # 解决models导入失败问题
  5. # *************
  6. # 修改任务函数后需要重启supervisor的celery和celery-beat才能失效
  7. # *************
  8. # 需要在setup之前导入celery app,否则缺少Django环境配置
  9. from Ansjer.celery import app
  10. import django
  11. django.setup()
  12. # 其他导入在setup之后
  13. import requests
  14. from Ansjer.Config.gatewaySensorConfig import DEVICE_TYPE, SMART_SOCKET_TOPIC, EVENT_TYPE, ANSJER_GENERIC_TOPIC
  15. from Service.CommonService import CommonService
  16. import time
  17. from Ansjer.config import LOGGER, CONFIG_INFO, DETECT_PUSH_DOMAINS
  18. from Model.models import Device_User, SceneLog
  19. from AdminController.CloudServiceManage.AgentOrderController import AgentOrderView
  20. import os
  21. import threading
  22. # ###################测试函数
  23. @app.task
  24. def hello():
  25. device_user_qs = Device_User.objects.filter(username='13138137872').values('NickName')
  26. nickname = device_user_qs[0]['NickName']
  27. info = 'celery测试日志, CONFIG_INFO:{}, nickname:{}'.format(CONFIG_INFO, nickname)
  28. LOGGER.info(info)
  29. @app.task
  30. def test(arg):
  31. time.sleep(10)
  32. print(arg)
  33. @app.task
  34. def add(x, y):
  35. print(x + y)
  36. # ###################
  37. @app.task
  38. def loocam_smart_scene(device_type, event_type, serial_number, scene_id=0):
  39. """
  40. loocam智能场景任务
  41. @param device_type: 设备类型
  42. @param event_type: 事件类型
  43. @param serial_number: 序列号
  44. @param scene_id: 场景id
  45. @return:
  46. """
  47. LOGGER.info('loocam智能场景任务,device_type:{},event_type:{},serial_number:{},scene_id:{}'.
  48. format(device_type, event_type, serial_number, scene_id))
  49. msg = {}
  50. # 插座
  51. if device_type == DEVICE_TYPE['socket']:
  52. topic_name = SMART_SOCKET_TOPIC.format(serial_number)
  53. status = 1 if event_type == EVENT_TYPE['socket_power_on'] else 0
  54. msg['type'] = 1
  55. msg['data'] = {
  56. 'deviceSwitch': status
  57. }
  58. # 摄像头
  59. elif device_type == DEVICE_TYPE['C516']:
  60. topic_name = ANSJER_GENERIC_TOPIC.format(serial_number)
  61. if event_type == EVENT_TYPE['detection_reminder_on']:
  62. msg['commandType'] = 'detection_reminder'
  63. msg['enable'] = 1
  64. elif event_type == EVENT_TYPE['detection_reminder_off']:
  65. msg['commandType'] = 'detection_reminder'
  66. msg['enable'] = 0
  67. else:
  68. return
  69. else:
  70. return
  71. CommonService.req_publish_mqtt_msg(serial_number, topic_name, msg)
  72. # 没有设备任务时,最后一个任务上报场景日志
  73. if scene_id:
  74. data = {
  75. 'sceneId': scene_id,
  76. 'status': 1
  77. }
  78. url = DETECT_PUSH_DOMAINS + 'gatewayService/sceneLogPush'
  79. requests.post(url=url, data=data, timeout=8)
  80. @app.task
  81. def update_installment_settlement_order():
  82. LOGGER.info('start周期结算代理商订单任务')
  83. AgentOrderView.update_periodic_settlement()
  84. LOGGER.info('end周期结算代理商订单任务')
  85. @app.task
  86. def send_mqtt(serial_number, topic_name, msg, task_id, scene_type, device_id, tasks):
  87. """
  88. 定时发送mqtt, (不要随意更改,否则定时任务不执行)
  89. @param serial_number: 设备序列号
  90. @param topic_name: 主题
  91. @param msg: 消息
  92. @param task_id: 任务id
  93. @param scene_type: 场景类型(1:排程;2:计时器)
  94. @param device_id: 设备id
  95. @param tasks: 任务
  96. @return: response
  97. """
  98. now_time = int(time.time())
  99. msg['send_time'] = now_time
  100. result = CommonService.req_publish_mqtt_msg(serial_number, topic_name, msg)
  101. scene_log = {
  102. 'scene_name': task_id,
  103. 'scene_id': 0 if scene_type == 2 else msg['task_id'],
  104. 'device_id': device_id,
  105. 'tasks': tasks,
  106. 'created_time': now_time,
  107. }
  108. SceneLog.objects.create(**scene_log)
  109. LOGGER.info('定时发送mqtt结果:{},参数:{},{},{},{},{},线程:{},进程:{}'.format(result, serial_number, topic_name, msg,
  110. now_time, task_id,
  111. threading.get_ident(), os.getpid()))