# @Author : Rocky # @File : tasks.py # @Time : 2024/3/12 14:23 # 解决models导入失败问题 # ************* # 修改任务函数后需要重启supervisor的celery和celery-beat才能失效 # ************* # 需要在setup之前导入celery app,否则缺少Django环境配置 from Ansjer.celery import app import django django.setup() # 其他导入在setup之后 import requests from Ansjer.Config.gatewaySensorConfig import DEVICE_TYPE, SMART_SOCKET_TOPIC, EVENT_TYPE, ANSJER_GENERIC_TOPIC from Service.CommonService import CommonService import time from Ansjer.config import LOGGER, CONFIG_INFO, DETECT_PUSH_DOMAINS from Model.models import Device_User, SceneLog from AdminController.CloudServiceManage.AgentOrderController import AgentOrderView import os import threading # ###################测试函数 @app.task def hello(): device_user_qs = Device_User.objects.filter(username='13138137872').values('NickName') nickname = device_user_qs[0]['NickName'] info = 'celery测试日志, CONFIG_INFO:{}, nickname:{}'.format(CONFIG_INFO, nickname) LOGGER.info(info) @app.task def test(arg): time.sleep(10) print(arg) @app.task def add(x, y): print(x + y) # ################### @app.task def loocam_smart_scene(device_type, event_type, serial_number, scene_id=0): """ loocam智能场景任务 @param device_type: 设备类型 @param event_type: 事件类型 @param serial_number: 序列号 @param scene_id: 场景id @return: """ LOGGER.info('loocam智能场景任务,device_type:{},event_type:{},serial_number:{},scene_id:{}'. format(device_type, event_type, serial_number, scene_id)) msg = {} # 插座 if device_type == DEVICE_TYPE['socket']: topic_name = SMART_SOCKET_TOPIC.format(serial_number) status = 1 if event_type == EVENT_TYPE['socket_power_on'] else 0 msg['type'] = 1 msg['data'] = { 'deviceSwitch': status } # 摄像头 elif device_type == DEVICE_TYPE['C516']: topic_name = ANSJER_GENERIC_TOPIC.format(serial_number) if event_type == EVENT_TYPE['detection_reminder_on']: msg['commandType'] = 'detection_reminder' msg['enable'] = 1 elif event_type == EVENT_TYPE['detection_reminder_off']: msg['commandType'] = 'detection_reminder' msg['enable'] = 0 else: return else: return CommonService.req_publish_mqtt_msg(serial_number, topic_name, msg) # 没有设备任务时,最后一个任务上报场景日志 if scene_id: data = { 'sceneId': scene_id, 'status': 1 } url = DETECT_PUSH_DOMAINS + 'gatewayService/sceneLogPush' requests.post(url=url, data=data, timeout=8) @app.task def update_installment_settlement_order(): LOGGER.info('start周期结算代理商订单任务') AgentOrderView.update_periodic_settlement() LOGGER.info('end周期结算代理商订单任务') @app.task def send_mqtt(serial_number, topic_name, msg, task_id, scene_type, device_id, tasks): """ 定时发送mqtt, (不要随意更改,否则定时任务不执行) @param serial_number: 设备序列号 @param topic_name: 主题 @param msg: 消息 @param task_id: 任务id @param scene_type: 场景类型(1:排程;2:计时器) @param device_id: 设备id @param tasks: 任务 @return: response """ now_time = int(time.time()) msg['send_time'] = now_time result = CommonService.req_publish_mqtt_msg(serial_number, topic_name, msg) scene_log = { 'scene_name': task_id, 'scene_id': 0 if scene_type == 2 else msg['task_id'], 'device_id': device_id, 'tasks': tasks, 'created_time': now_time, } SceneLog.objects.create(**scene_log) LOGGER.info('定时发送mqtt结果:{},参数:{},{},{},{},{},线程:{},进程:{}'.format(result, serial_number, topic_name, msg, now_time, task_id, threading.get_ident(), os.getpid()))