#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ @Copyright (C) ansjer cop Video Technology Co.,Ltd.All rights reserved. @AUTHOR: ASJRD018 @NAME: AnsjerFormal @software: PyCharm @DATE: 2018/12/5 9:30 @Version: python3.6 @MODIFY DECORD:ansjer dev @file: cloudstorage.py @Contact: chanjunkai@163.com """ import base64 import json import os import time import glob import urllib from urllib.parse import quote, parse_qs, unquote import apns2 import boto3 import jpush import oss2 import paypalrestsdk import threading import calendar import datetime import logging import sys import requests from aliyunsdkcore import client from aliyunsdksts.request.v20150401 import AssumeRoleRequest from boto3.session import Session from django.http import JsonResponse, HttpResponseRedirect, HttpResponse from django.db import transaction from django.views.generic.base import View import jwt from Object.ETkObject import ETkObject from pyfcm import FCMNotification from Ansjer.config import OSS_STS_ACCESS_KEY, OSS_STS_ACCESS_SECRET, OSS_ROLE_ARN, SERVER_DOMAIN, PAYPAL_CRD, \ SERVER_DOMAIN_SSL, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_ARN, APNS_MODE, APNS_CONFIG, BASE_DIR, \ JPUSH_CONFIG, FCM_CONFIG, OAUTH_ACCESS_TOKEN_SECRET from Controller.CheckUserData import DataValid from Model.models import Device_Info, Order_Model, Store_Meal, VodHlsModel, OssCrdModel, UID_Bucket, StsCrdModel, \ ExperienceContextModel, Pay_Type, CDKcontextModel, Device_User, SysMassModel, SysMsgModel, UidPushModel, \ Unused_Uid_Meal, UIDMainUser, UserModel, PromotionRuleModel, VideoPlaybackTimeModel, CloudLogModel, CouponModel, \ AiStoreMeal, AiService, UidSetModel, Ai_Push_Info, iotdeviceInfoModel, AiProcessTime from Object.AWS.S3Email import S3Email from Object.AliPayObject import AliPayObject from Object.AliSmsObject import AliSmsObject from Object.RedisObject import RedisObject from Object.ResponseObject import ResponseObject from Object.TokenObject import TokenObject from Object.UidTokenObject import UidTokenObject from Service.CommonService import CommonService from Object.m3u8generate import PlaylistGenerator from Object.WechatPayObject import WechatPayObject from django.db.models import Q, F, Count, Sum from Controller.PaymentCycle import Paypal from decimal import Decimal from Ansjer.config import SERVER_TYPE from Service.ModelService import ModelService from Object import MergePic import boto3 import botocore from botocore import client # AI服务 class AiView(View): def get(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.GET, request, operation) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.POST, request, operation) def validation(self, request_dict, request, operation): response = ResponseObject() if operation is None: return response.json(444, 'error path') elif operation == 'identification': # ai识别 return self.do_ai_identification(request.POST, response) elif operation == 'doPayPalCallBack': # paypal支付回调 return self.do_pay_by_paypal_callback(request_dict, response) elif operation == 'doAlipayCallBack': # 支付宝支付回调 return self.do_alipay_callback(request_dict, response) elif operation == 'doWechatCallBack': # 微信支付回调 return self.do_wechat_callback(request, response) else: token = request_dict.get('token', None) # 设备主键uid tko = TokenObject(token) response.lang = tko.lang if tko.code != 0: return response.json(tko.code) userID = tko.userID if operation == 'createpayorder': # 创建支付订单 return self.do_create_pay_order(request_dict, request, userID, response) elif operation == 'changeaistatus': # 修改AI开关状态 return self.do_change_ai_status(userID, request_dict, response) elif operation == 'getAiStatus': # 获取AI开关状态 return self.getAiStatus(userID, request_dict, response) elif operation == 'commoditylist': # 获取AI套餐列表 return self.do_commodity_list(userID, request_dict, response) elif operation == 'identification': # ai识别 return self.do_ai_identification(request_dict, response) elif operation == 'queryInfo': # 查询消息列表 return self.queryInfo(userID, request_dict, response) elif operation == 'readInfo': # 消息已读 return self.readInfo(userID, request_dict, response) elif operation == 'deleteInfo': # 删除消息 return self.deleteInfo(userID, request_dict, response) elif operation == 'queryorderlist': # 查询订单 return self.do_querylist(userID, request_dict, response) elif operation == 'getUsingPackage': # 获取当前使用的ai套餐 return self.getUsingPackage(request_dict, userID, response) else: return response.json(414) def do_change_ai_status(self, userID, request_dict, response): token_val = request_dict.get('token_val', None) appBundleId = request_dict.get('appBundleId', None) app_type = request_dict.get('app_type', None) push_type = request_dict.get('push_type', None) status = request_dict.get('status', None) m_code = request_dict.get('m_code', None) uid = request_dict.get('uid', None) lang = request_dict.get('lang', 'en') tz = request_dict.get('tz', '0') detect_group = request_dict.get('detect_group', None) interval = request_dict.get('interval', None) if not status: return response.json(444, 'status') # 关闭推送 if not all([appBundleId, app_type, token_val, uid, m_code]): return response.json(444, 'appBundleId,app_type,token_val,uid,m_code') # 如果传空上来,就默认为0 tz = 0 if tz == '' else tz.replace("GMT", "") # 判断推送类型对应key是否存在 if push_type == '0': if appBundleId not in APNS_CONFIG.keys(): return response.json(904) elif push_type == '1': if appBundleId not in FCM_CONFIG.keys(): return response.json(904) elif push_type == '2': if appBundleId not in JPUSH_CONFIG.keys(): return response.json(904) else: return response.json(173) try: ai_service_qs = AiService.objects.filter(uid=uid, use_status=1) if not ai_service_qs.exists(): return response.json(10053) nowTime = int(time.time()) endTime = ai_service_qs.values('endTime')[0]['endTime'] if nowTime > endTime: return response.json(10054) dvqs = Device_Info.objects.filter(userID_id=userID, UID=uid) status = int(status) nowTime = int(time.time()) if not dvqs.exists(): return response.json(14) uid_set_qs = UidSetModel.objects.filter(uid=uid) if uid_set_qs.exists(): uid_set_id = uid_set_qs[0].id qs_data = { 'updTime': nowTime, } if interval: qs_data['detect_interval'] = int(interval) if detect_group: qs_data['detect_group'] = detect_group uid_set_qs.update(**qs_data) else: qs_data = { 'uid': uid, 'addTime': nowTime, 'updTime': nowTime, } if interval: qs_data['detect_interval'] = int(interval) if detect_group: qs_data['detect_group'] = detect_group # 添加设备配置 uid_set_qs = UidSetModel.objects.create(**qs_data) uid_set_id = uid_set_qs.id qs_data['detect_status'] = status # ai开关状态 ai_service_qs.update(**qs_data) topic_name = 'ansjer/generic/{}'.format(uid) if status == 0: # 关闭 # mqtt通知设备关闭AI识别功能 msg = {'commandType': 'AIDisable'} req_success = CommonService.req_publish_mqtt_msg(uid, topic_name, msg) if not req_success: return response.json(10044) return response.json(0) elif status == 1: # 开启 uid_push_qs = UidPushModel.objects.filter(userID_id=userID, m_code=m_code, uid_set__uid=uid) if uid_push_qs.exists(): uid_push_update_dict = { 'appBundleId': appBundleId, 'app_type': app_type, 'push_type': push_type, 'token_val': token_val, 'updTime': nowTime, 'lang': lang, 'tz': tz } uid_push_qs.update(**uid_push_update_dict) else: uid_push_create_dict = { 'uid_set_id': uid_set_id, 'userID_id': userID, 'appBundleId': appBundleId, 'app_type': app_type, 'push_type': push_type, 'token_val': token_val, 'm_code': m_code, 'addTime': nowTime, 'updTime': nowTime, 'lang': lang, 'tz': tz } # 绑定设备推送 UidPushModel.objects.create(**uid_push_create_dict) etkObj = ETkObject(etk='') etk = etkObj.encrypt(uid) aiIdentificationUrl = "{DETECT_PUSH_DOMAIN}AiService/identification".format(DETECT_PUSH_DOMAIN=SERVER_DOMAIN_SSL) # mqtt通知设备开启AI识别功能 msg = { 'commandType': 'AIEnable', 'payload': { 'etk': etk, 'endTime': endTime, 'aiIdentificationUrl': aiIdentificationUrl, } }, req_success = CommonService.req_publish_mqtt_msg(uid, topic_name, msg) if not req_success: return response.json(10044) return response.json(0, {'aiIdentificationUrl': aiIdentificationUrl, 'endTime': endTime, 'etk': etk}) except Exception as e: print(e) return response.json(500, repr(e)) def getAiStatus(self, userID, request_dict, response): uid = request_dict.get('uid', None) if not uid: return response.json(444) try: ai_server_qs = AiService.objects.filter(uid=uid, use_status=1).values('detect_status', 'detect_group') if not ai_server_qs.exists(): return response.json(173) res = { 'detect_status': ai_server_qs[0]['detect_status'], 'detect_group': ai_server_qs[0]['detect_group'], } return response.json(0, {'data': res}) except Exception as e: return response.json(500, repr(e)) def do_commodity_list(self, userID, request_dict, response): uid = request_dict.get('uid', None) lang = request_dict.get('lang', 'en') try: # DVR/NVR设备暂不返回套餐列表 device_info_qs = Device_Info.objects.filter(Q(UID=uid), Q(Type__lte=4) | Q(Type=10001)) if device_info_qs.exists(): return response.json(0) # 查询套餐数据 ai_meal_qs = AiStoreMeal.objects.filter(is_show=1, lang__lang=lang).\ annotate(ai_meal_id=F('id'), title=F('lang__title'), content=F('lang__content')).\ values("ai_meal_id", "title", "content", "price", "effective_day", "currency", "virtual_price", "symbol") if not ai_meal_qs.exists(): return response.json(0) # 查询每种套餐的所有支付方式 ai_meal_list = list(ai_meal_qs) for ai_meal in ai_meal_list: pay_type_qs = Pay_Type.objects.filter(aistoremeal=ai_meal['ai_meal_id']).values('id', 'payment') ai_meal['pay_type'] = list(pay_type_qs) result = { 'meals': ai_meal_list, } return response.json(0, result) except Exception as e: print(e) return response.json(500, repr(e)) def do_querylist(self, userID, request_dict, response): page = request_dict.get('page', None) line = request_dict.get('line', None) uid = request_dict.get('uid', None) lang = request_dict.get('lang', 'en') if not page or not line: return response.json(444, 'page,line') try: page = int(page) line = int(line) omqs = Order_Model.objects.filter(userID_id=userID, status=1, order_type=1, ai_rank__lang__lang=lang) # 筛选指定设备id的订单 if uid: omqs.filter(UID=uid) if not omqs.exists(): return response.json(173) count = omqs.count() omqs = omqs.annotate(rank__title=F('ai_rank__lang__title'), rank__content=F('ai_rank__lang__content'), rank__day=F('ai_rank__effective_day'), rank__price=F('ai_rank__price'), rank__expire=F('ai_rank__effective_day'), rank__id=F('ai_rank_id'), rank__currency=F('ai_rank__currency')) order_ql = omqs[(page - 1) * line:page * line].values("orderID", "UID", "channel", "desc", "price", "currency", "addTime", "updTime", "paypal", "rank__day", "payType", "rank__price", "status", "rank__content", "rank__title", "rank__currency", "rank__expire", "ai_rank_id") order_list = list(order_ql) data = [] nowTime = int(time.time()) # 这里需要进行优化 uid_list = [] for od in order_list: uid_list.append(od['UID']) didqs = Device_Info.objects.filter(userID_id=userID, UID__in=uid_list).values('id', 'UID', 'Type') for d in order_list: if d['status'] == 0: if d['addTime'] + 3600 < nowTime: d['status'] = 3 for did in didqs: if d['UID'] == did['UID']: d['did'] = did['id'] d['Type'] = did['Type'] data.append(d) # d['rank__lang__content'] = '月' if lang == 'cn' else 'month' return response.json(0, {'data': data, 'count': count}) except Exception as e: print(e) return response.json(500, repr(e)) # 获取当前使用的ai套餐 def getUsingPackage(self, request_dict, userID, response): uid = request_dict.get('uid', None) lang = request_dict.get('lang', 'en') # dv_qs = Device_Info.objects.filter(userID_id=userID, UID=uid, isShare=False, isExist=1, vodPrimaryUserID=userID) # if not dv_qs.exists(): # return response.json(12) try: ai_service_qs = AiService.objects.filter(uid=uid, use_status=1, orders__ai_rank__lang__lang=lang) if not ai_service_qs.exists(): return response.json(0, []) # 计算套餐过期时间 sum_end_time = AiService.objects.filter(Q(uid=uid), ~Q(use_status=2)).aggregate(Sum('endTime'))['endTime__sum'] ai_service_qs = ai_service_qs.order_by('addTime').annotate(bucket__content=F('orders__ai_rank__lang__title')).\ values('uid', 'use_status', 'bucket__content') ai_service_data = ai_service_qs[0] ai_service_data['endTime'] = sum_end_time return response.json(0, [ai_service_data]) except Exception as e: print(e) return response.json(500, repr(e)) def do_create_pay_order(self, request_dict, request, userID, response): uid = request_dict.get('uid', None) channel = request_dict.get('channel', None) pay_type = int(request_dict.get('pay_type', 1)) ai_meal_id = request_dict.get('ai_meal_id', None) lang = request_dict.get('lang', 'en') if not uid or not channel or not pay_type or not ai_meal_id: return response.json(444) try: # 判断是否为主用户 # dv_qs = Device_Info.objects.filter(userID_id=userID, UID=uid, isShare=False, isExist=1).values( # 'vodPrimaryUserID', # 'vodPrimaryMaster') # if not dv_qs.exists(): # return response.json(12) # dvq = Device_Info.objects.filter(UID=uid) # dvq = dvq.filter(~Q(vodPrimaryUserID='')).values('vodPrimaryUserID') # if dvq.exists(): # if dvq[0]['vodPrimaryUserID'] != userID: # return response.json(10033) # 获取ai套餐数据 ai_sm_qs = AiStoreMeal.objects.filter(id=ai_meal_id, pay_type=pay_type, is_show=1). \ values('currency', 'price', 'content', 'effective_day', 'title') if not ai_sm_qs.exists(): return response.json(173) title = ai_sm_qs[0]['title'] content = ai_sm_qs[0]['content'] currency = ai_sm_qs[0]['currency'] price = ai_sm_qs[0]['price'] day = ai_sm_qs[0]['effective_day'] nowTime = int(time.time()) orderID = CommonService.createOrderID() price = round(float(price), 2) order_dict = { 'orderID': orderID, 'UID': uid, 'channel': channel, 'userID_id': userID, 'desc': content, 'payType': pay_type, 'payTime': nowTime, 'price': price, 'currency': currency, 'addTime': nowTime, 'updTime': nowTime, 'ai_rank_id': ai_meal_id, 'rank_id': 1, 'order_type': 1, } if pay_type == 1: # PayPal支付 order_dict['paymentID'], order_dict['pay_url'] = self.create_paypal_payment(lang, orderID, price, currency, content, response) res_data = {'redirectUrl': order_dict['pay_url'], 'orderID': orderID} elif pay_type == 2: # 支付宝 order_dict['pay_url'] = self.create_alipay_payment(lang, orderID, price, title, content, response) res_data = {'redirectUrl': order_dict['pay_url'], 'orderID': orderID} elif pay_type == 3: # 微信支付 ip = CommonService.get_ip_address(request) order_dict['pay_url'], sign_params = self.create_wechat_payment(orderID, ip, response) res_data = {'redirectUrl': order_dict['pay_url'], 'orderID': orderID, 'result': sign_params} Order_Model.objects.create(**order_dict) return response.json(0, res_data) except Exception as e: print(e) return response.json(500, repr(e)) @staticmethod def create_paypal_payment(lang, orderID, price, currency, content, response): cancel_url = CommonService.get_payment_status_url(lang, 'fail') call_sub_url = "{}AiService/doPayPalCallBack?orderID={}&lang={}".format(SERVER_DOMAIN_SSL, orderID, lang) paypalrestsdk.configure(PAYPAL_CRD) payment = paypalrestsdk.Payment({ "intent": "sale", "payer": {"payment_method": "paypal"}, "redirect_urls": {"return_url": call_sub_url, "cancel_url": cancel_url}, "transactions": [{ "item_list": {"items": [ {"name": "Cloud video", "sku": "1", "price": price, "currency": "USD", "quantity": 1}]}, "amount": {"total": price, "currency": currency}, "description": content}]}) if not payment.create(): # 创建失败 return response.json(10, payment.error) paymentID = payment['id'] # 获取paymentID for link in payment.links: if link.rel == "approval_url": pay_url = str(link.href) return paymentID, pay_url return response.json(10, 'create_ai_order_failed') @staticmethod def create_alipay_payment(lang, orderID, price, title, content, response): aliPayObj = AliPayObject() alipay = aliPayObj.conf() subject = title + content order_string = alipay.api_alipay_trade_wap_pay( out_trade_no=orderID, total_amount=price, subject=subject, return_url="{}web/paid2/success.html".format(SERVER_DOMAIN_SSL), notify_url="{}AiService/doAlipayCallBack".format(SERVER_DOMAIN_SSL), quit_url="{}web/paid2/fail.html".format(SERVER_DOMAIN_SSL), passback_params=quote("lang=" + lang) ) if not order_string: return response.json(10, '生成订单错误.') return aliPayObj.alipay_prefix + order_string @staticmethod def create_wechat_payment(lang, orderID, price, ip, content, response): pay = WechatPayObject() pay_url = "{}AiService/doWechatCallBack".format(SERVER_DOMAIN_SSL) # 统一调用接口 pay.get_parameter(orderID, content, float(price) * 100, ip, pay_url, quote("lang=" + lang)) sign_params = pay.re_finall(orderid=orderID) if not sign_params: return response.json(10, '生成订单错误.') return pay_url, sign_params def do_pay_by_paypal_callback(self, request_dict, response): logger = logging.getLogger('info') logger.info('AI订单---paypal支付回调') paymentId = request_dict.get('paymentId', None) PayerID = request_dict.get('PayerID', None) orderID = request_dict.get('orderID', None) lang = request_dict.get('lang', 'en') if not orderID: pay_failed_url = CommonService.get_payment_status_url(lang, 'fail') return HttpResponseRedirect(pay_failed_url) logger.info("paymentID: {}, payerID: {}".format(paymentId, PayerID)) # redis加锁,防止订单重复 redisObj = RedisObject() isLock = redisObj.CONN.setnx(orderID + 'do_notify', 1) redisObj.CONN.expire(orderID + 'do_notify', 60) if not isLock: return response.json(5) try: order_qs = Order_Model.objects.filter(orderID=orderID, status=0) if not order_qs.exists(): return response.json(173) paypalrestsdk.configure(PAYPAL_CRD) payment = paypalrestsdk.Payment.find(paymentId) logger.info("payment: {}".format(payment)) payer = payment.execute({"payer_id": PayerID}) logger.info('payres: {}'.format(payer)) if not payer: pay_failed_url = CommonService.get_payment_status_url(lang, 'fail') redisObj.del_data(key=orderID + 'do_notify') return HttpResponseRedirect(pay_failed_url) return self.payment_success(orderID, lang, order_qs, redisObj) except Exception as e: logger.info('AI订单paypal支付回调异常:{}'.format(repr(e))) order_qs.update(status=10) pay_failed_url = CommonService.get_payment_status_url(lang, 'fail') redisObj.del_data(key=orderID + 'do_notify') return HttpResponseRedirect(pay_failed_url) def do_alipay_callback(self, request_dict, response): # 支付宝支付回调 logger = logging.getLogger('info') logger.info('AI订单---支付宝支付回调') try: data = request_dict.dict() passback_params = data['passback_params'] parmap = dict([(k, v[0]) for k, v in parse_qs(unquote(passback_params)).items()]) lang = parmap['lang'] signature = data['sign'] data.pop('sign') orderID = data['out_trade_no'] # redis加锁,防止订单重复 redisObj = RedisObject() isLock = redisObj.CONN.setnx(orderID + 'do_notify', 1) redisObj.CONN.expire(orderID + 'do_notify', 60) if not isLock: return response.json(5) order_qs = Order_Model.objects.filter(orderID=orderID, status=0) if not order_qs.exists(): return response.json(173) aliPayObj = AliPayObject() alipay = aliPayObj.conf() success = alipay.verify(data, signature) if not success or data["trade_status"] not in ("TRADE_SUCCESS", "TRADE_FINISHED"): return response.json(0, signature) return self.payment_success(orderID, lang, order_qs, redisObj) except Exception as e: logger.info('AI订单支付宝支付回调异常:{}'.format(repr(e))) order_qs.update(status=10) redisObj.del_data(key=orderID + 'do_notify') pay_failed_url = CommonService.get_payment_status_url(lang, 'fail') redisObj.del_data(key=orderID + 'do_notify') return HttpResponseRedirect(pay_failed_url) def do_wechat_callback(self, request, response): # 微信支付回调 logger = logging.getLogger('info') logger.info('AI订单---微信支付回调') try: pay = WechatPayObject() data = pay.weixinpay_call_back(request.body) attach = data["attach"] parmap = dict([(k, v[0]) for k, v in parse_qs(unquote(attach)).items()]) lang = parmap['lang'] trade_status = data['result_code'] # 业务结果 SUCCESS/FAIL orderID = data['out_trade_no'] # 商户订单号 order_qs = Order_Model.objects.filter(orderID=orderID, status=0) if not order_qs.exists(): return response.json(173) if trade_status != 'SUCCESS': order_qs.update(status=10) return HttpResponse(pay.xml_to_dict({'return_code': 'FAIL'})) check_sign = pay.get_notifypay(data) if not check_sign: return HttpResponse(pay.xml_to_dict({'return_code': 'FAIL', 'return_msg': '签名失败'})) # redis加锁,防止订单重复 redisObj = RedisObject() isLock = redisObj.CONN.setnx(orderID + 'do_notify', 1) redisObj.CONN.expire(orderID + 'do_notify', 60) if not isLock: return response.json(5) return self.payment_success(self, orderID, lang, order_qs, redisObj, True) except Exception as e: order_qs.update(status=10) redisObj.del_data(key=orderID + 'do_notify') return HttpResponse(pay.xml_to_dict({'return_code': 'FAIL', 'return_msg': repr(e)})) def payment_success(self, orderID, lang, order_qs, redisObj, is_wechat_pay=False): nowTime = int(time.time()) order_list = order_qs.values("UID", "channel", "commodity_code", "ai_rank__effective_day", "isSelectDiscounts", "userID__userID", "userID__username", "coupon_id") userid = order_list[0]['userID__userID'] username = order_list[0]['userID__username'] UID = order_list[0]['UID'] channel = order_list[0]['channel'] effective_day = order_list[0]['ai_rank__effective_day'] ai_service_qs = AiService.objects.filter(Q(uid=UID), Q(channel=channel), Q(use_status=1)) ai_service_dict = {'orders_id': orderID, 'uid': UID, 'channel': channel, 'detect_status': 1, 'addTime': nowTime, 'updTime': nowTime, 'detect_group': '1' } if ai_service_qs.exists(): # 有正在使用的套餐,套餐结束时间保存为套餐有效期 ai_service_dict['endTime'] = effective_day * 24 * 60 * 60 else: ai_service_dict['use_status'] = 1 ai_service_dict['endTime'] = nowTime + effective_day * 24 * 60 * 60 with transaction.atomic(): # 更新设备主用户 Device_Info.objects.filter(UID=UID, vodPrimaryUserID='', vodPrimaryMaster=''). \ update(vodPrimaryUserID=userid, vodPrimaryMaster=username) # 更新订单数据,返回支付成功url order_qs.update(status=1, updTime=nowTime) # 创建AiService数据 AiService.objects.create(**ai_service_dict) pay_success_url = CommonService.get_payment_status_url(lang, 'success') redisObj.del_data(key=orderID + 'do_notify') if is_wechat_pay: return HttpResponse("\ \ \ ") else: return HttpResponseRedirect(pay_success_url) def do_ai_identification(self, request_dict,response): etk = request_dict.get('etk', None) n_time = request_dict.get('n_time', None) channel = request_dict.get('channel', '1') receiveTime = int(time.time()) logger = logging.getLogger('info') logger.info('-----------into----ai--api') logger.info("etk={etk}".format(etk=etk)) if not etk: return response.json(444) try: # 解密uid及判断长度 eto = ETkObject(etk) uid = eto.uid logger.info("uid={uid}".format(uid=uid)) if len(uid) != 20 and len(uid) != 14: return response.json(444) ##通过uid查出endTime是否过期,并且ai开关是否打开 AiServiceQuery = AiService.objects.filter(uid=uid, detect_status=1, use_status=1, endTime__gt=receiveTime).\ values('detect_group') if not AiServiceQuery.exists(): logger.info('none-----aiService') return response.json(173) detect_group = AiServiceQuery[0]['detect_group'] #{}?? # file_post_one = request_dict.get('fileOne', None) file_post_two = request_dict.get('fileTwo', None) file_post_three = request_dict.get('fileThree', None) file_post_four = request_dict.get('fileFour', None) file_post_one = file_post_one.replace(' ', '+') file_post_two = file_post_two.replace(' ', '+') file_post_three = file_post_three.replace(' ', '+') file_post_four = file_post_four.replace(' ', '+') file_post_one = base64.b64decode(file_post_one) file_post_two = base64.b64decode(file_post_two) file_post_three = base64.b64decode(file_post_three) file_post_four = base64.b64decode(file_post_four) file_list = [file_post_one, file_post_two, file_post_three, file_post_four] del file_post_one, file_post_two, file_post_three, file_post_four dir_path = os.path.join(BASE_DIR, 'static/ai/' + uid + '/' + str(n_time)) if not os.path.exists(dir_path): os.makedirs(dir_path) file_path_list = [] for i, val in enumerate(file_list): # file_path = dir_path + '/' + str(i) + '.jpg' file_path = "{dir_path}/{n_time}_{i}.jpg".format(dir_path=dir_path, n_time=n_time, i=i) file_path_list.append(file_path) with open(file_path, 'wb') as f: f.write(val) f.close() image_size = 500 # 每张小图片的大小 image_colnum = 1 # 合并成一张图后,一行有几个小图 MergePic.merge_images(dir_path, image_size, image_colnum) photo = open(dir_path + '.jpg', 'rb') #打开合成图 # photo = open(r'E:\test---------------\test\snipaste20220121_215952.jpg', 'rb') #识别合成图片 maxLabels = 50 minConfidence = 70 ai_start_time = int(time.time()) client = boto3.client( 'rekognition', aws_access_key_id='AKIA2E67UIMD6JD6TN3J', aws_secret_access_key='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', region_name='us-east-1') # doc: rekognition_res = client.detect_labels( Image={'Bytes': photo.read()}, MaxLabels=maxLabels, MinConfidence=minConfidence) if rekognition_res['ResponseMetadata']['HTTPStatusCode'] != 200: return response.json(173) ai_end_time = int(time.time()) labels =rekognition_res['Labels'] label_name = [] logger.info('--------识别到的标签-------') logger.info(labels) for label in labels: label_name.append(label['Name']) for Parents in label['Parents']: label_name.append(Parents['Name']) labels = self.checkLabels(detect_group, label_name) #检查标签是否符合用户选择的识别类型 if len(labels['label_list']) == 0: logger.info('没有识别到任何标签-----------------') return response.json(10055) event_type = ','.join(labels['label_type']) label_list = ','.join(labels['label_list']) logger.info(event_type) logger.info(label_list) # 上传缩略图到s3 for i, val in enumerate(file_path_list): upload_path = "{uid}/{channel}/{n_time}_{i}.jpg".format(uid=uid, channel=channel, n_time=n_time,i=i) # 封面图 thread_task = threading.Thread(target=self.upload_s3, args=(val, upload_path)) thread_task.start() #需要删除图片 # self.del_path(os.path.join(BASE_DIR, 'static/ai/' + uid)) #存储消息以及推送 is_st = 3 #多图 # 查询推送数据 uid_push_qs = UidPushModel.objects.filter(uid_set__uid=uid). \ values('token_val', 'app_type', 'appBundleId', 'm_code', 'push_type', 'userID_id', 'userID__NickName', 'lang', 'm_code', 'tz', 'uid_set__nickname', 'uid_set__detect_interval', 'uid_set__detect_group', 'uid_set__channel') if not uid_push_qs.exists(): return response.json(173) uid_push_list = [] for qs in uid_push_qs: uid_push_list.append(qs) nickname = uid_push_list[0]['uid_set__nickname'] if not nickname: nickname = uid eq_list = [] userID_ids = [] apns_start_time = 0 apns_end_time = 0 for up in uid_push_list: push_type = up['push_type'] appBundleId = up['appBundleId'] token_val = up['token_val'] lang = up['lang'] tz = up['tz'] if tz is None or tz == '': tz = 0 # 以下是存库 userID_id = up["userID_id"] if userID_id not in userID_ids: now_time = int(time.time()) eq_list.append(Ai_Push_Info( userID_id=userID_id, eventTime=n_time, eventType=event_type, devUid=uid, devNickName=nickname, Channel=channel, alarm='检查到{labels} \tChannel:{channel}'.format(labels=','.join(labels['label_list']), channel=channel), is_st=is_st, receiveTime=receiveTime, addTime=now_time, storage_location=2 )) userID_ids.append(userID_id) # 推送标题 msg_title = self.get_msg_title(appBundleId=appBundleId, nickname=nickname) # 推送内容 msg_text = self.get_msg_text(channel=channel, n_time=n_time, lang=lang, tz=tz, label_list=label_list) kwargs = { 'uid': uid, 'channel': channel, 'event_type': event_type, 'n_time': n_time, 'appBundleId': appBundleId, 'token_val': token_val, 'msg_title': msg_title, 'msg_text': msg_text, } try: # 推送消息 if push_type == 0: # ios apns res = self.do_apns(**kwargs) elif push_type == 1: # android gcm self.do_fcm(**kwargs) elif push_type == 2: # android jpush self.do_jpush(**kwargs) # if push_type == 1: # android gcm # logger.info('into-------gcm') # apns_start_time = int(time.time()) # res = self.do_fcm(**kwargs) # apns_end_time = int(time.time()) except Exception as e: logger.info("errLine={errLine}, errMsg={errMsg}".format(errLine=e.__traceback__.tb_lineno,errMsg=repr(e))) continue Ai_Push_Info.objects.bulk_create(eq_list) return JsonResponse(status=200, data='success', safe=False) except Exception as e: print(e) data = { 'errLine':e.__traceback__.tb_lineno, 'errMsg':repr(e) } return response.json(500, data) def del_path(self, path): if not os.path.exists(path): return if os.path.isfile(path): os.remove(path) else: items = os.listdir(path) for f in items: c_path = os.path.join(path, f) if os.path.isdir(c_path): self.del_path(c_path) else: os.remove(c_path) os.rmdir(path) ## 检查是否有符合条件的标签 def checkLabels(self, user_detect_group, labels): labels_type = { '1': ['Person', 'Human'], #人 '2': ['Dog', 'Pet', 'Canine', 'Animal'], #动物 '3': ['Car', '', 'Vehicle', 'Transportation', 'Automobile'] #车 } user_detect_list = user_detect_group.split(',') user_labels_type = {} for user_detect in user_detect_list: if user_detect in labels_type.keys(): user_labels_type[user_detect] = labels_type[user_detect] label_list = [] for k, labels_type in user_labels_type.items(): for label in labels_type: if label in labels: label_list.append(label) user_labels_list = list(user_labels_type.keys()) user_labels_list.sort() return {'label_type': user_labels_list, 'label_list': label_list} def upload_s3(self, file_path, upload_path): try: aws_key = "AKIA2MMWBR4DSFG67DTG" #【你的 aws_access_key】 aws_secret = "aI9gxcAKPmiGgPy9axrtFKzjYGbvpuytEX4xWweL" # 【你的 aws_secret_key】 session = Session(aws_access_key_id=aws_key, aws_secret_access_key=aws_secret, region_name="cn-northwest-1") s3 = session.resource("s3") # client = session.client("s3") bucket = "aipush" # 【你 bucket 的名字】 # 首先需要保.证 s3 上已经存在该存储桶,否则报错 upload_data = open(file_path, "rb") # upload_key = "test" s3.Bucket(bucket).put_object(Key=upload_path, Body=upload_data) return True except Exception as e: print(repr(e)) return False def get_msg_title(self, appBundleId, nickname): package_title_config = { 'com.ansjer.customizedd_a': 'DVS', 'com.ansjer.zccloud_a': 'ZosiSmart', 'com.ansjer.zccloud_ab': '周视', 'com.ansjer.adcloud_a': 'ADCloud', 'com.ansjer.adcloud_ab': 'ADCloud', 'com.ansjer.accloud_a': 'ACCloud', 'com.ansjer.loocamccloud_a': 'Loocam', 'com.ansjer.loocamdcloud_a': 'Anlapus', 'com.ansjer.customizedb_a': 'COCOONHD', 'com.ansjer.customizeda_a': 'Guardian365', 'com.ansjer.customizedc_a': 'PatrolSecure', } if appBundleId in package_title_config.keys(): return package_title_config[appBundleId] + '(' + nickname + ')' else: return nickname def get_msg_text(self, channel, n_time, lang, tz, label_list): n_date = CommonService.get_now_time_str(n_time=n_time, tz=tz,lang=lang) if lang == 'cn': msg = '摄像头AI识别到了{}'.format(label_list) send_text = '{msg} 通道:{channel} 日期:{date}'.format(msg=msg, channel=channel, date=n_date) else: msg = 'Camera AI recognizes{}'.format(label_list) send_text = '{msg} channel:{channel} date:{date}'.format(msg=msg, channel=channel, date=n_date) return send_text def do_jpush(self, uid, channel, appBundleId, token_val, event_type, n_time, msg_title, msg_text): app_key = JPUSH_CONFIG[appBundleId]['Key'] master_secret = JPUSH_CONFIG[appBundleId]['Secret'] # 此处换成各自的app_key和master_secre _jpush = jpush.JPush(app_key, master_secret) push = _jpush.create_push() push.audience = jpush.registration_id(token_val) push_data = {"alert": "Motion ", "event_time": n_time, "event_type": event_type, "msg": "", "received_at": n_time, "sound": "sound.aif", "uid": uid, "zpush": "1", "channel": channel} android = jpush.android(alert=msg_text, priority=1, style=1, alert_type=7, big_text=msg_text, title=msg_title, extras=push_data) push.notification = jpush.notification(android=android) push.platform = jpush.all_ res = push.send() print(res) return res.status_code def do_fcm(self, uid, channel, appBundleId, token_val, event_type, n_time, msg_title, msg_text): try: serverKey = FCM_CONFIG[appBundleId] push_service = FCMNotification(api_key=serverKey) data = {"alert": "Motion ", "event_time": n_time, "event_type": event_type, "msg": "", "received_at": n_time, "sound": "sound.aif", "uid": uid, "zpush": "1", "channel": channel} result = push_service.notify_single_device(registration_id=token_val, message_title=msg_title, message_body=msg_text, data_message=data, extra_kwargs={ 'default_vibrate_timings': True, 'default_sound': True, 'default_light_settings': True }) print('fcm push ing') print(result) return result except Exception as e: return 'serverKey abnormal' def do_apns(self, uid, channel, appBundleId, token_val, event_type, n_time, msg_title, msg_text): logger = logging.getLogger('info') logger.info("进来do_apns函数了") logger.info(token_val) logger.info(APNS_MODE) logger.info(os.path.join(BASE_DIR, APNS_CONFIG[appBundleId]['pem_path'])) try: cli = apns2.APNSClient(mode=APNS_MODE, client_cert=os.path.join(BASE_DIR, APNS_CONFIG[appBundleId]['pem_path'])) push_data = {"alert": "Motion ", "event_time": n_time, "event_type": event_type, "msg": "", "received_at": n_time, "sound": "", "uid": uid, "zpush": "1", "channel": channel} alert = apns2.PayloadAlert(body=msg_text, title=msg_title) payload = apns2.Payload(alert=alert, custom=push_data, sound="default") n = apns2.Notification(payload=payload, priority=apns2.PRIORITY_LOW) res = cli.push(n=n, device_token=token_val, topic=appBundleId) if res.status_code == 200: return res.status_code else: logger.info('apns push fail') logger.info(res.reason) return res.status_code except (ValueError, ArithmeticError): return 'The program has a numeric format exception, one of the arithmetic exceptions' except Exception as e: print(repr(e)) logger.info(repr(e)) return repr(e) def queryInfo(self, userID, request_dict, response): page = int(request_dict.get('page', None)) line = int(request_dict.get('line', None)) if not page or not line: return response.json(444, 'page,line') startTime = request_dict.get('startTime', None) endTime = request_dict.get('endTime', None) eventType = request_dict.get('eventType', None) now_time = int(time.time()) seven_days_ago = now_time - 7 * 24 * 3600 # 查询7天内的数据 qs = Ai_Push_Info.objects.filter(userID_id=userID, eventTime__gt=seven_days_ago).order_by('-eventTime') if startTime and endTime: qs = qs.filter(eventTime__range=(startTime, endTime)) # if eventType: # qs = qs.filter(eventType__contains=eventType) uids = request_dict.get('uids', None) if uids: uid_list = uids.split(',') qs = qs.filter(devUid__in=uid_list) dvqs = Device_Info.objects.filter(UID__in=uid_list, userID_id=userID).values('UID', 'Type', 'NickName') uid_type_dict = {} for dv in dvqs: uid_type_dict[dv['UID']] = {'type': dv['Type'], 'NickName': dv['NickName']} else: dvqs = Device_Info.objects.filter(userID_id=userID).values('UID', 'Type', 'NickName') uid_type_dict = {} for dv in dvqs: uid_type_dict[dv['UID']] = {'type': dv['Type'], 'NickName': dv['NickName']} if not qs.exists(): return response.json(0, {'datas': [], 'count': 0}) count = qs.count() qs = qs.values('id', 'devUid', 'devNickName', 'Channel', 'eventType', 'status', 'alarm', 'eventTime', 'receiveTime', 'is_st', 'addTime', 'storage_location') qs = qs[(page - 1) * line:page * line] res = [] aws_s3_client = boto3.client( 's3', aws_access_key_id=AWS_ACCESS_KEY_ID[0], aws_secret_access_key=AWS_SECRET_ACCESS_KEY[0], config=botocore.client.Config(signature_version='s3v4'), region_name='cn-northwest-1' ) for p in qs: p['eventType'] = int(p['eventType'][0]) devUid = p['devUid'] eventTime = p['eventTime'] channel = p['Channel'] storage_location = p['storage_location'] if p['is_st'] == 1: s3_img_cover = '{uid}/{channel}/cover{time}.jpg'.format(uid=devUid, channel=channel, time=eventTime) s3_img_desc = '{uid}/{channel}/desc{time}.jpg'.format(uid=devUid, channel=channel, time=eventTime) response_url_cover = aws_s3_client.generate_presigned_url('get_object', ExpiresIn=300, Params={ 'Bucket': 'aipush', 'Key': s3_img_cover }, ) response_url_desc = aws_s3_client.generate_presigned_url('get_object', ExpiresIn=300, Params={ 'Bucket': 'aipush', 'Key': s3_img_desc }, ) p['img'] = response_url_cover p['img_list'] = [response_url_desc] elif p['is_st'] == 2: pass elif p['is_st'] == 3: # 列表装载回放时间戳标记 p['img_list'] = [] for i in range(4): thumbspng = '{uid}/{channel}/{time}_{st}.jpg'.format(uid=devUid, channel=p['Channel'], time=eventTime, st=i) response_url = aws_s3_client.generate_presigned_url('get_object', ExpiresIn=300, Params={ 'Bucket': 'aipush', 'Key': thumbspng }, ) p['img_list'].append(response_url) if devUid in uid_type_dict.keys(): p['uid_type'] = uid_type_dict[devUid]['type'] p['devNickName'] = uid_type_dict[devUid]['NickName'] else: p['uid_type'] = '' res.append(p) return response.json(0, {'datas': res, 'count': count}) def readInfo(self, userID, request_dict, response): is_update_all = request_dict.get('is_update_all', 0) try: if int(is_update_all) == 1: # 全部已读 is_update = Ai_Push_Info.objects.filter(userID_id=userID).update(status=1) return response.json(0, {'update_count': is_update}) else: id_list = request_dict.get('id_list', None) if not id_list: request_dict.getlist('id_list[]', None) # 获取IOS数组传参 logger = logging.getLogger('info') logger.info('已读ai消息id_list:{}'.format(id_list)) if not id_list: return response.json(444) id_list = eval(id_list) # 字符串转列表 param_flag = CommonService.get_param_flag(data=id_list) if not param_flag: return response.json(444) count = 0 for id in id_list: ai_push_qs = Ai_Push_Info.objects.filter(id=int(id)) if ai_push_qs.exists(): own_dev = ModelService.check_own_device(userID, ai_push_qs[0].devUid) if own_dev: count += 1 ai_push_qs.update(status=1) return response.json(0, {'update_success': count}) except Exception as e: print(e) return response.json(500, repr(e)) def deleteInfo(self, userID, request_dict, response): id_list = request_dict.get('id_list', None) if not id_list: request_dict.getlist('id_list[]', None) # 获取IOS数组传参 logger = logging.getLogger('info') logger.info('删除ai消息id_list:{}'.format(id_list)) if not id_list: return response.json(444) try: id_list = eval(id_list) # 字符串转列表 param_flag = CommonService.get_param_flag(data=id_list) if not param_flag: return response.json(444) for id in id_list: ai_push_qs = Ai_Push_Info.objects.filter(id=id) if ai_push_qs.exists(): own_dev = ModelService.check_own_device(userID, ai_push_qs[0].devUid) if own_dev: ai_push_qs.delete() return response.json(0) except Exception as e: print(e) return response.json(500, repr(e)) # 如果ai套餐过期,更新未使用的关联套餐 def updateUnusedAi(request): now_time = int(time.time()) ai_service_qs = AiService.objects.filter(endTime__lte=now_time, use_status=1).values('id', 'uid')[0:200] for ai_service in ai_service_qs: try: with transaction.atomic(): AiService.objects.filter(id=ai_service['id']).update(use_status=2) # 更新过期ai订单状态 # 如果存在未使用套餐,更新为使用 unused_ai_service = AiService.objects.filter(uid=ai_service['uid'], use_status=0).order_by('addTime')[:1].values('id', 'endTime') if unused_ai_service.exists(): effective_day = unused_ai_service[0]['endTime'] # 未使用套餐的endTime在购买的时候保存为有效时间 endTime = now_time + effective_day AiService.objects.filter(id=unused_ai_service[0]['id']).update(use_status=1, endTime=endTime, updTime=now_time) except Exception as e: continue return HttpResponse()