123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132 |
- # @Author : Rocky
- # @File : tasks.py
- # @Time : 2024/3/12 14:23
- # 解决models导入失败问题
- # *************
- # 修改任务函数后需要重启supervisor的celery和celery-beat才能失效
- # *************
- # 需要在setup之前导入celery app,否则缺少Django环境配置
- from Ansjer.celery import app
- import django
- django.setup()
- # 其他导入在setup之后
- import requests
- from Ansjer.Config.gatewaySensorConfig import DEVICE_TYPE, SMART_SOCKET_TOPIC, EVENT_TYPE, ANSJER_GENERIC_TOPIC
- from Service.CommonService import CommonService
- import time
- from Ansjer.config import LOGGER, CONFIG_INFO, DETECT_PUSH_DOMAINS
- from Model.models import Device_User, SceneLog
- from AdminController.CloudServiceManage.AgentOrderController import AgentOrderView
- import os
- import threading
- # ###################测试函数
- @app.task
- def hello():
- device_user_qs = Device_User.objects.filter(username='13138137872').values('NickName')
- nickname = device_user_qs[0]['NickName']
- info = 'celery测试日志, CONFIG_INFO:{}, nickname:{}'.format(CONFIG_INFO, nickname)
- LOGGER.info(info)
- @app.task
- def test(arg):
- time.sleep(10)
- print(arg)
- @app.task
- def add(x, y):
- print(x + y)
- # ###################
- @app.task
- def loocam_smart_scene(device_type, event_type, serial_number, scene_id=0):
- """
- loocam智能场景任务
- @param device_type: 设备类型
- @param event_type: 事件类型
- @param serial_number: 序列号
- @param scene_id: 场景id
- @return:
- """
- LOGGER.info('loocam智能场景任务,device_type:{},event_type:{},serial_number:{},scene_id:{}'.
- format(device_type, event_type, serial_number, scene_id))
- msg = {}
- # 插座
- if device_type == DEVICE_TYPE['socket']:
- topic_name = SMART_SOCKET_TOPIC.format(serial_number)
- status = 1 if event_type == EVENT_TYPE['socket_power_on'] else 0
- msg['type'] = 1
- msg['data'] = {
- 'deviceSwitch': status
- }
- # 摄像头
- elif device_type == DEVICE_TYPE['C516']:
- topic_name = ANSJER_GENERIC_TOPIC.format(serial_number)
- if event_type == EVENT_TYPE['detection_reminder_on']:
- msg['commandType'] = 'detection_reminder'
- msg['enable'] = 1
- elif event_type == EVENT_TYPE['detection_reminder_off']:
- msg['commandType'] = 'detection_reminder'
- msg['enable'] = 0
- else:
- return
- else:
- return
- CommonService.req_publish_mqtt_msg(serial_number, topic_name, msg)
- # 没有设备任务时,最后一个任务上报场景日志
- if scene_id:
- data = {
- 'sceneId': scene_id,
- 'status': 1
- }
- url = DETECT_PUSH_DOMAINS + 'gatewayService/sceneLogPush'
- requests.post(url=url, data=data, timeout=8)
- @app.task
- def update_installment_settlement_order():
- LOGGER.info('start周期结算代理商订单任务')
- AgentOrderView.update_periodic_settlement()
- LOGGER.info('end周期结算代理商订单任务')
- @app.task
- def send_mqtt(serial_number, topic_name, msg, task_id, scene_type, device_id, tasks):
- """
- 定时发送mqtt, (不要随意更改,否则定时任务不执行)
- @param serial_number: 设备序列号
- @param topic_name: 主题
- @param msg: 消息
- @param task_id: 任务id
- @param scene_type: 场景类型(1:排程;2:计时器)
- @param device_id: 设备id
- @param tasks: 任务
- @return: response
- """
- now_time = int(time.time())
- msg['send_time'] = now_time
- result = CommonService.req_publish_mqtt_msg(serial_number, topic_name, msg)
- scene_log = {
- 'scene_name': task_id,
- 'scene_id': 0 if scene_type == 2 else msg['task_id'],
- 'device_id': device_id,
- 'tasks': tasks,
- 'created_time': now_time,
- }
- SceneLog.objects.create(**scene_log)
- LOGGER.info('定时发送mqtt结果:{},参数:{},{},{},{},{},线程:{},进程:{}'.format(result, serial_number, topic_name, msg,
- now_time, task_id,
- threading.get_ident(), os.getpid()))
|