# -*- encoding: utf-8 -*- """ @File : CronCloudPhotoController.py @Time : 2022/10/24 15:48 @Author : stephen @Email : zhangdongming@asj6.wecom.work @Software: PyCharm """ import datetime import logging import os import time import traceback import cv2 from django.db import transaction from django.views import View from Ansjer.config import PUSH_CLOUD_PHOTO, ACCESS_KEY_ID, SECRET_ACCESS_KEY, REGION_NAME, PUSH_BUCKET from Model.models import UidSetModel, DeviceCloudPhotoInfo, DevicePicturePushInfo from Object.AWS.AmazonS3Util import AmazonS3Util from Object.RedisObject import RedisObject from Object.ResponseObject import ResponseObject from Object.utils import LocalDateTimeUtil from Service.EquipmentInfoService import EquipmentInfoService LOGGER = logging.getLogger('info') UID_KEY = 'ANSJER:UID:SET:INFO' class CronCloudPhotoView(View): def get(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.GET, request, operation) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.POST, request, operation) def validation(self, request_dict, request, operation): response = ResponseObject() if operation == 'cache-uid-set': return self.cache_photo_uid_set(response) elif operation == 'generate-video': return self.generate_video(response) else: return response.json(404) @classmethod def cache_photo_uid_set(cls, response): """ 缓存uid_set信息 """ try: now_time = int(time.time()) LOGGER.info('进入读取开启云相册用户并把UID存在redis:{}'.format(now_time)) photo_qs = DeviceCloudPhotoInfo.objects.filter(status=1).values('uid') if not photo_qs.exists(): return response.json(0) redis = RedisObject() for item in photo_qs: uid_set_qs = UidSetModel.objects.filter(uid=item['uid'], detect_status=1) if not uid_set_qs: continue redis.CONN.rpush(UID_KEY, item['uid']) return response.json(0) except Exception as e: print(e) LOGGER.info('--->获取uid_set信息异常:{}'.format(traceback.format_exc())) return response.json(177, repr(e)) @classmethod def generate_video(cls, response): """ 生成视频并存库 """ try: now_time = int(time.time()) LOGGER.info('进入云相册开启用户,进行消息推送图进行合成视频:{}'.format(now_time)) photo_qs = DeviceCloudPhotoInfo.objects.filter(status=1).values('uid', 'user_id') if not photo_qs.exists(): return response.json(0) # 获取当前项目路径 poj_path = os.path.abspath(os.path.dirname(os.path.dirname(os.path.dirname(__file__)))) time_stamp = cls.get_month_stamp() s3 = AmazonS3Util(ACCESS_KEY_ID, SECRET_ACCESS_KEY, REGION_NAME) for item in photo_qs: try: with transaction.atomic(): last_date = LocalDateTimeUtil.get_cur_month_end() start_time, end_time = LocalDateTimeUtil.get_start_and_end_time(last_date, '%Y-%m-%d') picture_qs = DevicePicturePushInfo.objects.filter(uid=item['uid'], event_time__gt=time_stamp, event_time__lt=end_time, type=0) \ .values('uid', 'channel', 'event_time', 'device_nick_name') if not picture_qs.exists(): continue device_nickname = picture_qs[0]['device_nick_name'] pic_list = cls.download_push_picture(s3, item['uid'], poj_path, picture_qs) video_path = poj_path + r'\Ansjer\file\video.mp4' # 输出视频的保存路径 cls.picture_synthesis_video(video_path, poj_path, pic_list) # 图片合成视频 video_name = datetime.datetime.now().strftime('%Y%m') data = open(video_path, 'rb') key = '{}/video/{}.mp4'.format(item['uid'], video_name) s3.upload_file_obj(PUSH_CLOUD_PHOTO, key, data) # 将视频资源上传至S3保存 os.remove(video_path) # 上传完成删除本地资源 push_data = {'type': 1, 'uid': item['uid'], 'channel': 1, 'event_time': int(video_name), 'updated_time': now_time, 'created_time': now_time, 'device_nick_name': device_nickname, 'user_id': item['user_id']} DevicePicturePushInfo.objects.create(**push_data) except Exception as e: LOGGER.info('异常详情,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e))) continue return response.json(0) except Exception as e: print(e) LOGGER.info('--->图片合成视频异常:{}'.format(traceback.format_exc())) return response.json(177) @staticmethod def download_push_picture(s3, uid, dir_url, picture_qs): """ 下载推送图片,讲下载资源保存到项目本地 @param s3: s3对象 @param uid: 设备uid @param dir_url: 项目文件路径 @param picture_qs: 消息推送图片对象 @return: pic_list 图片文件名集合 """ pic_list = [] for pic in picture_qs: path = dir_url + r'\Ansjer\file\{}.jpeg'.format(pic['event_time']) s3_key = '{}/{}/{}.jpeg'.format(uid, pic['channel'], pic['event_time']) s3.download_object(PUSH_CLOUD_PHOTO, s3_key, path) pic_list.append('{}.jpeg'.format(pic['event_time'])) return pic_list @staticmethod def picture_synthesis_video(video_path, poj_path, pic_list): """ 图片合成视频 @param video_path: 输出视频的保存路径 @param poj_path: 项目路径 @param pic_list: 图片文件名集合 @return: """ if not pic_list: raise Exception('this pic_list is null.') fps = 1 # 帧率 img_path = poj_path + r'\Ansjer\file\{}'.format(pic_list[0]) image = cv2.imread(img_path) size = image.shape w = size[1] # 宽度 h = size[0] # 高度 img_size = (w, h) # 图片尺寸 fourcc = cv2.VideoWriter_fourcc(*"mp4v") video_writer = cv2.VideoWriter(video_path, fourcc, fps, img_size) for item in pic_list: img_path = poj_path + r'\Ansjer\file\{}'.format(item) frame = cv2.imread(img_path) if frame is None: continue frame = cv2.resize(frame, img_size) # 生成视频 图片尺寸和设定尺寸相同 video_writer.write(frame) # 将图片写进视频里 os.remove(img_path) video_writer.release() # 释放资源 @staticmethod def get_month_stamp(): """ 获取当月开始时间戳 """ now_month = LocalDateTimeUtil.get_cur_month_start() start_time = "{} 00:00:00".format(str(now_month)) time_array = time.strptime(start_time, "%Y-%m-%d %H:%M:%S") # 转换为时间戳 return int(time.mktime(time_array))