tasks.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. # @Author : Rocky
  2. # @File : tasks.py
  3. # @Time : 2024/3/12 14:23
  4. # 解决models导入失败问题
  5. # *************
  6. # 修改任务函数后需要重启supervisor的celery和celery-beat才能失效
  7. # *************
  8. # 需要在setup之前导入celery app,否则缺少Django环境配置
  9. from Ansjer.celery import app
  10. import django
  11. django.setup()
  12. # 其他导入在setup之后
  13. import requests
  14. from Ansjer.Config.gatewaySensorConfig import DEVICE_TYPE, SMART_SOCKET_TOPIC, EVENT_TYPE
  15. from Service.CommonService import CommonService
  16. import time
  17. from Ansjer.config import LOGGER, CONFIG_INFO, DETECT_PUSH_DOMAINS
  18. from Model.models import Device_User
  19. from AdminController.CloudServiceManage.AgentOrderController import AgentOrderView
  20. import os
  21. import threading
  22. # ###################测试函数
  23. @app.task
  24. def hello():
  25. device_user_qs = Device_User.objects.filter(username='13138137872').values('NickName')
  26. nickname = device_user_qs[0]['NickName']
  27. info = 'celery测试日志, CONFIG_INFO:{}, nickname:{}'.format(CONFIG_INFO, nickname)
  28. LOGGER.info(info)
  29. @app.task
  30. def test(arg):
  31. time.sleep(10)
  32. print(arg)
  33. @app.task
  34. def add(x, y):
  35. print(x + y)
  36. # ###################
  37. @app.task
  38. def loocam_smart_scene(device_type, event_type, serial_number, scene_id=0):
  39. """
  40. loocam智能场景任务
  41. @param device_type: 设备类型
  42. @param event_type: 事件类型
  43. @param serial_number: 序列号
  44. @param scene_id: 场景id
  45. @return:
  46. """
  47. LOGGER.info('loocam智能场景任务,device_type:{},event_type:{},serial_number:{},scene_id:{}'.
  48. format(device_type, event_type, serial_number, scene_id))
  49. if device_type == DEVICE_TYPE['socket']:
  50. topic_name = SMART_SOCKET_TOPIC.format(serial_number)
  51. status = 1 if event_type == EVENT_TYPE['socket_power_on'] else 0
  52. msg = {
  53. 'type': 1,
  54. 'data': {'deviceSwitch': status}
  55. }
  56. CommonService.req_publish_mqtt_msg(serial_number, topic_name, msg)
  57. # 没有设备任务时,最后一个任务上报场景日志
  58. if scene_id:
  59. data = {
  60. 'sceneId': scene_id,
  61. 'status': 1
  62. }
  63. url = DETECT_PUSH_DOMAINS + 'gatewayService/sceneLogPush'
  64. requests.post(url=url, data=data, timeout=8)
  65. @app.task
  66. def update_installment_settlement_order():
  67. LOGGER.info('start周期结算代理商订单任务')
  68. AgentOrderView.update_periodic_settlement()
  69. LOGGER.info('end周期结算代理商订单任务')
  70. @app.task
  71. def send_mqtt(serial_number, topic_name, msg, task_id):
  72. """
  73. 定时发送mqtt, (不要随意更改,否则定时任务不执行)
  74. @param serial_number: 设备序列号
  75. @param topic_name: 主题
  76. @param msg: 消息
  77. @param task_id: 任务id
  78. @return: response
  79. """
  80. now_time = int(time.time())
  81. msg['implementTime'] = now_time
  82. result = CommonService.req_publish_mqtt_msg(serial_number, topic_name, msg)
  83. LOGGER.info('定时发送mqtt结果:{},参数:{},{},{},{},{},线程:{},进程:{}'.format(result, serial_number, topic_name, msg,
  84. now_time, task_id,
  85. threading.get_ident(), os.getpid()))