|
- import datetime
- import json
- import logging
- import os
- import threading
- import time
- import traceback
- import urllib
- import zoneinfo
- import boto3
- import botocore
- import cv2
- import oss2
- import pytz
- import requests
- from boto3.session import Session
- from django.contrib.auth.hashers import make_password # 对密码加密模块
- from django.db import connection, transaction
- from django.db.models import Q
- from django.http import HttpResponse, JsonResponse
- from django.utils.decorators import method_decorator
- from django.views.decorators.csrf import csrf_exempt
- from django.views.generic.base import View
- from botocore import client
- from Ansjer.cn_config.config_test import REGION_NAME2
- from Ansjer.config import (
- ACCESS_KEY_ID,
- AVATAR_BUCKET,
- AWS_ACCESS_KEY_ID,
- AWS_SECRET_ACCESS_KEY,
- AWS_SES_ACCESS_REGION,
- CONFIG_INFO,
- OSS_STS_ACCESS_KEY,
- OSS_STS_ACCESS_SECRET,
- SECRET_ACCESS_KEY,
- SERVER_DOMAIN_SSL,
- SERVER_TYPE,
- CONFIG_TEST,
- BASE_DIR
- )
- from Model.models import (
- CompanySerialModel,
- Device_Info,
- Device_User,
- DeviceLogModel,
- LogModel,
- Order_Model,
- Store_Meal,
- TestDeviceFindSerial,
- TestSerialRepetition,
- UID_Bucket,
- UIDCompanySerialModel,
- VodBucketModel, PaypalWebHookEvent, TimeZoneInfo, CountryLanguageModel, UidSetModel, UidPushModel,
- )
- from Object.AliPayObject import AliPayObject
- from Object.AWS.AmazonS3Util import AmazonS3Util
- from Object.CeleryBeatObject import CeleryBeatObj
- from Object.ContentSecurityObject import ContentSecurity
- from Object.IPWeatherObject import IPQuery, OpenWeatherMap, GeoIP2
- from Object.OCIObjectStorage import OCIObjectStorage
- from Object.m3u8generate import PlaylistGenerator
- from Object.RedisObject import RedisObject
- from Object.ResponseObject import ResponseObject
- from Object.TokenObject import TokenObject
- from Object.utils.PayPalUtil import PayPalService
- from Service.CommonService import CommonService
- from Service.VodHlsService import SplitVodHlsObject
- from Object.ApschedulerObject import ApschedulerObject
- ACCESS_KEY = "AKIA2E67UIMD3CYTIWPA"
- SECRET_KEY = "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"
- LOGGER = logging.getLogger("info")
- class testView(View):
- @method_decorator(csrf_exempt)
- def dispatch(self, *args, **kwargs):
- return super(testView, self).dispatch(*args, **kwargs)
- def get(self, request, *args, **kwargs):
- request.encoding = "utf-8"
- operation = kwargs.get("operation")
- return self.validation(request.GET, request, operation)
- def post(self, request, *args, **kwargs):
- request.encoding = "utf-8"
- operation = kwargs.get("operation")
- return self.validation(request.POST, request, operation)
- def put(self, request, *args, **kwargs):
- request.encoding = "utf-8"
- kwargs.get("operation")
- response = ResponseObject()
- return response.json(0, request.body)
- def validation(self, request_dict, request, operation):
- response = ResponseObject()
- if operation == "generateToken":
- return self.generate_token(request_dict)
- elif operation == "signplaym3u8":
- return self.do_sign_play_m3u8(request_dict, response)
- elif operation == "analysisToken":
- return self.analysis_token(request_dict, response)
- elif operation == "test_upload_s3":
- return self.test_upload_s3(request_dict, response)
- elif operation == "rekognition":
- return self.testRekognition(request, request_dict)
- elif operation == "ip":
- return self.ip(response)
- elif operation == "configType":
- return self.configType(response)
- elif operation == "createData":
- return self.createData(request_dict, response)
- elif operation == "write_trade_no":
- return self.write_trade_no(request_dict, response)
- elif operation == "comb":
- return self.do_comb(request_dict, response)
- elif operation == "count_ts":
- return self.count_ts(request_dict, response)
- elif operation == "tsCount":
- return self.ts_count(request_dict, response)
- elif operation == "upload-s3":
- return self.file_upload_s3(request, request_dict, response)
- elif operation == "v2/upload-s3":
- return self.file_upload_s3_v2(request, request_dict, response)
- elif operation == "download-s3":
- return self.file_download_s3(request_dict, response)
- elif operation == "acl-put":
- return self.s3_acl_put(request_dict, response)
- elif operation == "s3-object-delete":
- return self.object_delete(request_dict, response)
- elif operation == "head-bucket":
- return self.head_bucket(request_dict, response)
- elif operation == "write_redis_list":
- return self.write_redis_list(response)
- elif operation == "read_redis_list":
- return self.read_redis_list(response)
- elif operation == "playM3u8":
- return self.play_m3u8(request_dict, response)
- elif operation == "generate_video":
- return self.generate_video(request_dict, response)
- elif operation == "serial-repetition": # 用与测试序列号重复接口
- response = ResponseObject("cn")
- return response.json(475)
- elif operation == "v2/serial-repetition": # 用与测试序列号重复接口
- response = ResponseObject("cn")
- return self.serial_repetition_test_v2(request_dict, response)
- elif operation == "getSerialNumberInfo": # 序列号信息查询
- return self.getSerialNumberInfo(request_dict, response)
- elif operation == "get-serial-details": # 序列号信息查询
- return self.get_serial_details(request_dict, response, request)
- elif operation == "find_device_serial": # 查找设备序列号接口:306低功耗无Wi-Fi产品
- return self.find_device_serial(request_dict, response)
- elif operation == "ali_text_review": # 阿里云文本审核
- return self.ali_text_review(request_dict, response)
- elif operation == "ali_image_review": # 阿里云图片审核
- return self.ali_image_review(request_dict, response)
- elif operation == "add_cron_apscheduler": # 新增定时任务
- return self.add_cron_job(request_dict, response)
- elif operation == "add_date_apscheduler": # 新增定时任务
- return self.add_date_job(request_dict, response)
- elif operation == "del_apscheduler": # 删除定时任务
- return self.del_apscheduler_job(request_dict, response)
- elif operation == 'getPayPalTransactions': # 删除定时任务
- return self.get_pay_pal_transactions(request_dict, response)
- elif operation == 'convertTimestamp': # 转换时间戳
- return self.convertTimestamp(request_dict, response)
- elif operation == 'checkOrderExist': # 检查paypal订单是否存在
- return self.checkOrderExist(request_dict, response)
- elif operation == 'foreignIp': # 查询外国ip
- return self.foreignIp(request_dict, response)
- elif operation == 'getForeignWeather': # 获取外国天气
- return self.getForeignWeather(request_dict, response)
- elif operation == 'unionpayback': # 银联回调
- return self.unionpayback(request_dict, response)
- elif operation == 'uploadAVSS': # 银联回调
- return self.uploadAVSS(request, response)
- elif operation == 'oci_oss': # 生成oci oss预签名链接
- return self.oci_oss(request, response)
- elif operation == 'celery': # celery
- return self.celery(request_dict, response)
- elif operation == 'tz':
- return self.tz(request_dict, response)
- elif operation == 'createPreauthRequest':
- return self.create_preauth_request(request_dict, response)
- elif operation == 'getOciUrl':
- return self.get_oci_url(request_dict, request, response)
- elif operation == 'delOciObj':
- return self.del_oci_obj(request_dict, request, response)
- elif operation == 'get-token':
- return self.get_token(request_dict, response)
- elif operation == 'langCountry':
- return self.lang_country(request_dict, response)
- elif operation == 'getObsSignedUrl':
- return self.getObsSignedUrl(request_dict, response)
- elif operation == 'duplicateRemovalUidSet':
- return self.duplicate_removal_uid_set(request_dict, response)
- elif operation == 'checkSerialUID':
- return self.checkSerialUID(response)
- else:
- return response.json(414)
- @classmethod
- def create_preauth_request(cls, request_dict, response):
- oci = OCIObjectStorage(request_dict.get('region'))
- bucket_name = request_dict.get('bucketName')
- name = 'oci-test'
- object_name = request_dict.get('objectName')
- time_expires = datetime.datetime.utcnow() + datetime.timedelta(minutes=10)
- result = oci.create_ereauthenticated_request(bucket_name, name, object_name, time_expires)
- return response.json(0, {'fullPath': result.full_path + result.object_name})
- @classmethod
- def get_oci_url(cls, request_dict, request, response):
- """
- 获取Oci存储地址
- """
- try:
- object_name = request_dict.get('objectName', None)
- if not object_name:
- return response.json(404)
- oci = OCIObjectStorage(request_dict.get('region'))
- bucket_name = request_dict.get('bucketName')
- name = 'oci-test'
- time_expires = datetime.datetime.utcnow() + datetime.timedelta(minutes=10)
- result = oci.get_preauthenticated_request_url(bucket_name, name, object_name, time_expires)
- return response.json(0, {'data': result.full_path})
- except Exception as e:
- print(repr(e))
- return response.json(500)
- @classmethod
- def del_oci_obj(cls, request_dict, request, response):
- """
- 删除对象
- """
- try:
- object_name = request_dict.get('objectName', None)
- if not object_name:
- return response.json(404)
- oci = OCIObjectStorage(request_dict.get('region'))
- bucket_name = request_dict.get('bucketName')
- oci.delete_object(bucket_name, object_name)
- return response.json(0)
- except Exception as e:
- print(repr(e))
- return response.json(500)
- @staticmethod
- def generate_token(request_dict):
- tko = TokenObject()
- userID = request_dict.get("userID", None)
- username = request_dict.get("username", None)
- res = tko.generate(data={"userID": userID, "lang": "cn", "user": username, "m_code": username})
- return JsonResponse(status=200, data=res, safe=False)
- @classmethod
- def serial_repetition_test_v2(cls, request_dict, response):
- try:
- serial_no = request_dict.get("serialNo", None)
- phone_model = request_dict.get("phoneModel", None)
- if not serial_no:
- return response.json(444)
- with transaction.atomic():
- first_serial = serial_no[:6]
- first_serial_qs = TestSerialRepetition.objects.filter(serial_number__icontains=first_serial)
- if first_serial_qs.exists():
- result = {
- "serialNumber": first_serial_qs.first().serial_number,
- "phoneModel": first_serial_qs.first().phone_model,
- "createdTime": first_serial_qs.first().created_time,
- }
- return response.json(174, result)
- serial_qs = TestSerialRepetition.objects.filter(serial_number=serial_no)
- if not serial_qs.exists():
- n_time = int(time.time())
- params = {"serial_number": serial_no, "created_time": n_time}
- if phone_model:
- params["phone_model"] = phone_model
- TestSerialRepetition.objects.create(**params)
- return response.json(0)
- else:
- return response.json(174)
- except Exception as e:
- logging.info("异常错误,errLine:{}, errMsg:{}".format(e.__traceback__.tb_lineno, repr(e)))
- return response.json(178, e)
- @classmethod
- def generate_video(cls, request_dict, response):
- # 设计抽取图片规则通过消息随机还是时间随机,调试copy S3对象查询是否携带失效时间
- try:
- DIR = os.path.abspath(os.path.dirname(os.path.dirname(__file__)))
- arr_list = ["1666756086.jpeg", "1666831275.jpeg", "1666841492.jpeg"]
- s3 = AmazonS3Util(AWS_ACCESS_KEY_ID[0], AWS_SECRET_ACCESS_KEY[0], "cn-northwest-1")
- bucket = "push-cloud-photo"
- for item in arr_list:
- path = DIR + r"\Ansjer\file\{}".format(item)
- s3_key = "HA154GVEDH41RY8Y111A/1/{}".format(item)
- s3.download_object(bucket, s3_key, path)
- video_dir = DIR + r"\Ansjer\file\result.mp4" # 输出视频的保存路径
- fps = 0.5 # 帧率
- img_size = (1920, 1080) # 图片尺寸
- fourcc = cv2.VideoWriter_fourcc(*"mp4v")
- videoWriter = cv2.VideoWriter(video_dir, fourcc, fps, img_size)
- for i in arr_list:
- img_path = DIR + r"\Ansjer\file\{}".format(i)
- frame = cv2.imread(img_path)
- frame = cv2.resize(frame, img_size) # 生成视频 图片尺寸和设定尺寸相同
- videoWriter.write(frame) # 将图片写进视频里
- os.remove(img_path)
- videoWriter.release() # 释放资源
- data = open(video_dir, "rb")
- key = "HA154GVEDH41RY8Y111A/1/20221027.mp4"
- s3.upload_file_obj(bucket, key, data)
- response_url = s3.generate_file_obj_url(bucket, key)
- os.remove(video_dir)
- except Exception as e:
- print(e)
- ex = traceback.format_exc()
- LOGGER.info("--->抽取推送图片异常:{}".format(ex))
- return response.json(177, ex)
- return response.json(0, response_url)
- @classmethod
- def head_bucket(cls, request_dict, response):
- bucket_name = request_dict.get("bucket", None)
- s3 = AmazonS3Util(
- "AKIA2E67UIMD45Y3HL53",
- "ckYLg4Lo9ZXJIcJEAKkzf2rWvs8Xth1FCjqiAqUw",
- "us-east-1",
- )
- s3.bucket_exists(bucket_name)
- return response.json(0)
- @classmethod
- def file_upload_s3(cls, request, request_dict, response):
- file = request.FILES.get("file", None)
- file_name = file.name
- file_name = "app/images/{}".format(file_name)
- cls.upload_s3(file, file_name)
- return response.json(0)
- @classmethod
- def file_upload_s3_v2(cls, request, request_dict, response):
- """
- 'private' | 'public-read' | 'public-read-write' | 'authenticated-read'
- """
- file = request.FILES.get("file", None)
- file_name = file.name
- # S3下文件夹路径+文件名 组成对象key
- file_key = "app/images/{}".format(file_name)
- s3 = AmazonS3Util(AWS_ACCESS_KEY_ID[1], AWS_SECRET_ACCESS_KEY[1], AWS_SES_ACCESS_REGION)
- # 存储桶
- bucket = "ansjerfilemanager"
- s3.upload_file_obj(
- bucket,
- file_key,
- file,
- {"ContentType": file.content_type, "ACL": "public-read"},
- )
- return response.json(0)
- @classmethod
- def object_delete(cls, request_dict, response):
- file_name = request_dict.get("key", None)
- file_key = "app/images/{}".format(file_name)
- s3 = AmazonS3Util(
- "AKIA2E67UIMD45Y3HL53",
- "ckYLg4Lo9ZXJIcJEAKkzf2rWvs8Xth1FCjqiAqUw",
- "us-east-1",
- )
- bucket = "ansjerfilemanager"
- s3.delete_obj(bucket, file_key)
- return response.json(0)
- @staticmethod
- def upload_s3(data, upload_path):
- """
- 上传对象到对应存储桶
- @param data:
- @param upload_path:
- @return:
- """
- try:
- aws_key = "AKIA2E67UIMD45Y3HL53" # 【你的 aws_access_key】
- aws_secret = "ckYLg4Lo9ZXJIcJEAKkzf2rWvs8Xth1FCjqiAqUw" # 【你的 aws_secret_key】
- session = Session(
- aws_access_key_id=aws_key,
- aws_secret_access_key=aws_secret,
- region_name="us-east-1",
- )
- s3 = session.resource("s3")
- # client = session.client("s3")
- bucket = "ansjerfilemanager" # 【你 bucket 的名字】 # 首先需要保.证 s3 上已经存在该存储桶,否则报错
- # upload_key = "test"
- s3.Bucket(bucket).put_object(Key=upload_path, Body=data)
- return True
- except Exception as e:
- print(repr(e))
- return False
- @classmethod
- def s3_acl_put(cls, request_dict, response):
- """
- S3通过存储桶对象设置Acl权限
- @param request_dict:
- @param response:
- @return:
- """
- try:
- key = request_dict.get("key", None)
- aws_key = "AKIA2E67UIMD45Y3HL53" # 【你的 aws_access_key】
- aws_secret = "ckYLg4Lo9ZXJIcJEAKkzf2rWvs8Xth1FCjqiAqUw" # 【你的 aws_secret_key】
- session = Session(
- aws_access_key_id=aws_key,
- aws_secret_access_key=aws_secret,
- region_name="us-east-1",
- )
- s3 = session.resource("s3")
- # client = session.client("s3")
- bucket = "ansjerfilemanager" # 【你 bucket 的名字】 # 首先需要保.证 s3 上已经存在该存储桶,否则报错
- # upload_key = "test"
- obj = s3.Object(bucket, key)
- obj.Acl().put(ACL="public-read")
- return response.json(0)
- except Exception as e:
- print(repr(e))
- return False
- @classmethod
- def file_download_s3(cls, request_dict, response):
- """
- S3通过签名获取对象URL
- @param request_dict:
- @param response:
- @return:
- """
- key = request_dict.get("key", None)
- s3_client = AmazonS3Util(
- "AKIA2E67UIMD45Y3HL53",
- "ckYLg4Lo9ZXJIcJEAKkzf2rWvs8Xth1FCjqiAqUw",
- "us-east-1",
- )
- bucket = "ansjerfilemanager"
- response_url = s3_client.generate_file_obj_url(bucket, key)
- return response.json(0, response_url)
- def do_comb(self, request_dict, response):
- import itertools
- list1 = [1, 2, 3, 4]
- list2 = []
- for i in range(1, len(list1) + 1):
- iter = itertools.combinations(list1, i)
- list2.append(list(iter))
- list3 = []
- for list_ in list2:
- for val in list_:
- print("-------------list")
- print(type(val))
- comb = [str(i) for i in val]
- comb_int = int("".join(comb))
- list3.append(comb_int)
- return HttpResponse(json.dumps(list3))
- # 生成m3u8列表
- def do_sign_play_m3u8(self, request_dict, response):
- uid = "GZL2PEFJPLY7W6BG111A"
- channel = 2
- storeTime = 1591344070
- fg = 6
- bucket__region = "ap-northeast-1"
- bucket_name = "azvod1"
- aws_access_key_id = "AKIA2E67UIMD45Y3HL53"
- aws_secret_access_key = "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"
- session = Session(
- aws_access_key_id=aws_access_key_id,
- aws_secret_access_key=aws_secret_access_key,
- region_name=bucket__region,
- )
- """
- http://test.dvema.com/cloudstorage/signplaym3u8?uid=VVDHCVBYDKFMJRWA111A&channel=1&time=1586940120&sign=tktktktk
- """
- conn = session.client("s3")
- playlist_entries = []
- for i in range(fg):
- thumbspng = "{uid}/vod{channel}/{time}/ts{i}.ts".format(uid=uid, channel=channel, time=storeTime, i=i)
- response_url = conn.generate_presigned_url(
- "get_object",
- Params={"Bucket": bucket_name, "Key": thumbspng},
- ExpiresIn=86400,
- )
- # m3u8 = '{uid}/vod{channel}/{time}/{time}.m3u8'. \
- # format(uid=uid, channel=channel, time=vod['time'])
- playlist_entries.append(
- {
- "name": response_url,
- "duration": 10,
- }
- )
- playlist = PlaylistGenerator(playlist_entries).generate()
- response = HttpResponse(playlist)
- response["Content-Type"] = "application/octet-stream"
- response["Content-Disposition"] = 'attachment;filename="play.m3u8"'
- return response
- # return HttpResponse(playlist)
- # response = HttpResponse(playlist, content_type="application/vnd.apple.mpegurl")
- # # response = HttpResponse(playlist, content_type="application/octet-stream")
- # return response
- # return HttpResponse(status=200, content=playlist)
- def do_pay_by_ali(self, request_dict, userID, response):
- uid = request_dict.get("uid", None)
- rank = request_dict.get("rank", None)
- channel = request_dict.get("channel", None)
- qs = Device_Info.objects.filter(userID_id=userID, UID=uid, isShare=False)
- if not qs.exists():
- return response.json(12)
- if not channel or not rank:
- return response.json(444, "channel,rank")
- smqs = Store_Meal.objects.filter(id=rank).values(
- "currency",
- "price",
- "content",
- "day",
- "bucket__storeDay",
- "bucket__region",
- "type",
- )
- if not smqs.exists():
- # 套餐不存在
- return response.json(173)
- if smqs[0]["type"] != 1:
- return response.json(10, "不支持支付宝支付")
- currency = smqs[0]["currency"]
- price = smqs[0]["price"]
- content = smqs[0]["content"]
- day = smqs[0]["day"]
- nowTime = int(time.time())
- ubqs = UID_Bucket.objects.filter(uid=uid, channel=channel, endTime__gte=nowTime).values(
- "bucket__storeDay", "bucket__region"
- )
- if ubqs.exists():
- if ubqs[0]["bucket__region"] != smqs[0]["bucket__region"]:
- return response.json(712) # 区域不一致
- elif ubqs[0]["bucket__storeDay"] != smqs[0]["bucket__storeDay"]:
- return response.json(713) # 不可更改套餐
- # 续费流程
- nowTime = int(time.time())
- # 新增流程
- orderID = CommonService.createOrderID()
- try:
- aliPayObj = AliPayObject()
- alipay = aliPayObj.conf()
- order_string = alipay.api_alipay_trade_wap_pay(
- out_trade_no=orderID,
- total_amount=price,
- subject="测试哟",
- return_url="{SERVER_DOMAIN_SSL}cloudVod/payOK".format(SERVER_DOMAIN_SSL=SERVER_DOMAIN_SSL),
- notify_url="{SERVER_DOMAIN_SSL}cloudVod/aliPayCallback".format(SERVER_DOMAIN_SSL=SERVER_DOMAIN_SSL)
- # return_url="http://192.168.136.40/cloudVod/payOK",
- # notify_url="http://192.168.136.40/cloudVod/aliPayCallback"
- )
- except Exception as e:
- print(repr(e))
- return response.json(10, repr(e))
- if order_string:
- redirectUrl = aliPayObj.alipay_prefix + order_string
- store_meal_qs = Store_Meal.objects.filter(id=rank, lang__lang="cn", is_show=0).values(
- "lang__title", "lang__content"
- )
- if store_meal_qs.exists():
- store_meal_name = store_meal_qs[0]["lang__title"] + "-" + store_meal_qs[0]["lang__content"]
- else:
- store_meal_name = "未知套餐"
- Order_Model.objects.create(
- orderID=orderID,
- UID=uid,
- channel=channel,
- userID_id=userID,
- desc=content,
- price=price,
- currency=currency,
- addTime=nowTime,
- updTime=nowTime,
- endTime=nowTime + int(day) * 3600 * 24,
- rank_id=rank,
- payType=1,
- store_meal_name=store_meal_name,
- )
- return JsonResponse(
- status=200,
- data={
- "result_code": 0,
- "reason": "success",
- "result": {"redirectUrl": redirectUrl, "orderID": orderID},
- "error_code": 0,
- },
- )
- else:
- return response.json(10, "生成订单错误")
- def do_filter_playlist(self, request_dict, userID, response):
- int(request_dict.get("startTime", None))
- int(request_dict.get("endTime", None))
- uid = request_dict.get("uid", None)
- channel = request_dict.get("channel", None)
- dvqs = Device_Info.objects.filter(UID=uid, userID_id=userID, isShare=False)
- if not dvqs.exists():
- return response.json(12)
- ubqs = UID_Bucket.objects.filter(uid=uid, channel=channel).values("status")
- if not ubqs.exists():
- return response.json(10, "设备未购买")
- split_vod_hls_obj = SplitVodHlsObject()
- vodqs = split_vod_hls_obj.get_vod_hls_data(uid=uid, channel=channel).values("start_time", "sec", "bucket_id")
- if not vodqs.exists():
- return response.json(173)
- vod_play_list = []
- auth = oss2.Auth(OSS_STS_ACCESS_KEY, OSS_STS_ACCESS_SECRET)
- bucket_id = vodqs[0]["bucket_id"]
- vod_bucket_qs = VodBucketModel.objects.filter(id=bucket_id).values("bucket", "endpoint")
- if not vod_bucket_qs.exists():
- return response.json(173)
- bucket_name = vod_bucket_qs[0]["bucket"]
- endpoint = vod_bucket_qs[0]["endpoint"]
- for vod in vodqs:
- bucket = oss2.Bucket(auth, endpoint, bucket_name)
- m3u8 = "{uid}/vod{channel}/{time}/{time}.m3u8".format(uid=uid, channel=channel, time=vod["start_time"])
- ts = "{uid}/vod{channel}/{time}/ts0.ts".format(uid=uid, channel=channel, time=vod["start_time"])
- url = bucket.sign_url("GET", m3u8, 3600, params={"x-oss-process": "hls/sign"})
- urllst = url.split("?")
- url_start = urllib.parse.unquote(urllst[0])
- url_end = urllst[1]
- vod_play_url = "{url_start}?{url_end}".format(url_start=url_start, url_end=url_end)
- thumb = bucket.sign_url(
- "GET",
- ts,
- 3600,
- params={"x-oss-process": "video/snapshot,t_10000,m_fast,w_300"},
- )
- vod_play_list.append(
- {
- "name": vod["start_time"],
- "sign_url": vod_play_url,
- "thumb": thumb,
- "sec": vod["sec"],
- }
- )
- return response.json(0, vod_play_list)
- @staticmethod
- def analysis_token(request_dict, response):
- token = request_dict.get("token", None)
- token = TokenObject(token)
- user_id = token.userID
- username = Device_User.objects.get(userID=user_id).username
- return response.json(0, username)
- def test_upload_s3(self, request_dict, response):
- aws_s3_guonei = boto3.client(
- "s3",
- aws_access_key_id=AWS_ACCESS_KEY_ID[0],
- aws_secret_access_key=AWS_SECRET_ACCESS_KEY[0],
- config=botocore.client.Config(signature_version="s3v4"),
- region_name="cn-northwest-1",
- )
- download_link = "ipctest"
- response_url = aws_s3_guonei.generate_presigned_url(
- ClientMethod="put_object",
- Params={"Bucket": "pc-package", "Key": download_link},
- ExpiresIn=3600,
- )
- return response.json(0, {"datas": response_url, "count": 1})
- def testRekognition(self, request, request_dict):
- files = request.FILES.get("image")
- labels = int(request_dict.get("labels", 5))
- minConfidence = int(request_dict.get("minConfidence", 99))
- if not files:
- return HttpResponse("请上传图片!!!!")
- client = boto3.client(
- "rekognition",
- aws_access_key_id="AKIA2E67UIMD6JD6TN3J",
- aws_secret_access_key="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",
- region_name="us-east-1",
- )
- # image = open('E:/photo/a615fa40b8c476bab0f6eeb332e62a5a-1000.jpg', "rb")
- response = client.detect_labels(Image={"Bytes": files.read()}, MaxLabels=labels, MinConfidence=minConfidence)
- # for obj in response['Labels']:
- # exit(obj)
- # if obj['Name'] == 'Person':
- # jsonstr = json.dumps(obj)
- return HttpResponse(
- json.dumps(response, ensure_ascii=False),
- content_type="application/json,charset=utf-8",
- )
- @staticmethod
- def ip(response):
- ip = "120.237.157.181"
- try:
- ip_qs = IPQuery(ip)
- district, city = ip_qs.district, ip_qs.city
- addr = "{}/{}".format(district, city)
- print(addr)
- return response.json(0, {ip: addr})
- except Exception as e:
- return response.json(
- 500,
- "error_line:{}, error_msg:{}".format(e.__traceback__.tb_lineno, repr(e)),
- )
- def configType(self, response):
- print(SERVER_TYPE)
- return response.json(0)
- def createData(self, request_dict, response):
- uid = request_dict.get("uid", None)
- # filter_date = datetime.datetime.now()-datetime.timedelta(days=4)
- # qs = DeviceLogModel.objects.filter(add_time__lt=filter_date)
- DeviceLogModel.objects.create(uid=uid)
- return response.json(0)
- @staticmethod
- def write_trade_no(request_dict, response):
- order_id_list = [
- '20230907065015782239',
- '20230907065317610335',
- '20230908052328972692',
- '20230909041922370332',
- '20230912112016475783',
- '20230912164404055222',
- '20230914102128409892',
- '20230914110630052782',
- '20230914125050468117',
- '20230918061527058694',
- '20230918062701462434']
- for order_id in order_id_list:
- paypal_webhook_event_qs = PaypalWebHookEvent.objects.filter(~Q(trade_no=''), orderID=order_id).values(
- 'trade_no')
- if paypal_webhook_event_qs.exists():
- trade_no = paypal_webhook_event_qs[0]['trade_no']
- Order_Model.objects.filter(orderID=order_id).update(trade_no=trade_no)
- return response.json(0)
- def count_ts(self, request_dict, response):
- year = request_dict.get("year", None)
- month = request_dict.get("month", None)
- if not year:
- year = CommonService.timestamp_to_str(int(time.time()), "%Y")
- if not month:
- month = CommonService.timestamp_to_str(int(time.time()), "%m")
- year = int(year)
- month = int(month)
- startTime = CommonService.str_to_timestamp("{year}-{month}".format(year=year, month=month), "%Y-%m")
- endTime = CommonService.str_to_timestamp("{year}-{month}".format(year=year, month=month + 1), "%Y-%m") - 1
- split_vod_hls_obj = SplitVodHlsObject()
- qsTs = split_vod_hls_obj.get_vod_hls_data(time__gte=startTime, time__lte=endTime).values("fg")
- if not qsTs.exists():
- return HttpResponse("查无数据")
- sumTs = 0 # 总ts个数
- sumSec = 0 # 总秒数
- for val in qsTs:
- fg = int(val["fg"])
- sumTs += fg & 0xF
- for i in range(15):
- shift = (i + 1) * 4
- sumSec += (fg >> shift) & 0xF
- size = 0
- return HttpResponse(
- "{year}年{month}月 </br>上传的TS总数:{sumTs} </br> 总秒数:{sumSec} </br> 总大小:{size}GB (1秒约等150KB计算)".format(
- year=year, month=month, sumTs=sumTs, sumSec=sumSec, size=size
- )
- )
- @staticmethod
- def ts_count(request_dict, response):
- request_dict.get("uid", None)
- request_dict.get("start_time", None)
- request_dict.get("end_time", None)
- cursor = connection.cursor()
- sql = (
- "SELECT fg FROM `vod_hls_mon` WHERE uid=%s AND start_time BETWEEN 1685289600 AND 1685548800 "
- "UNION ALL SELECT fg FROM `vod_hls_tues` WHERE uid=%s AND start_time BETWEEN 1685289600 AND 1685548800 "
- "UNION ALL SELECT fg FROM `vod_hls_wed` WHERE uid=%s AND start_time BETWEEN 1685289600 AND 1685548800 "
- "UNION ALL SELECT fg FROM `vod_hls_thur` WHERE uid=%s AND start_time BETWEEN 1685289600 AND 1685548800 "
- "UNION ALL SELECT fg FROM `vod_hls_fri` WHERE uid=%s AND start_time BETWEEN 1685289600 AND 1685548800 "
- "UNION ALL SELECT fg FROM `vod_hls_sat` WHERE uid=%s AND start_time BETWEEN 1685289600 AND 1685548800 "
- "UNION ALL SELECT fg FROM `vod_hls_sun` WHERE uid=%s AND start_time BETWEEN 1685289600 AND 1685548800"
- )
- print(sql)
- try:
- cursor.execute(
- sql,
- [
- "7TR9XE46NHXL5921111A",
- "7TR9XE46NHXL5921111A",
- "7TR9XE46NHXL5921111A",
- "7TR9XE46NHXL5921111A",
- "7TR9XE46NHXL5921111A",
- "7TR9XE46NHXL5921111A",
- "7TR9XE46NHXL5921111A",
- ],
- )
- result = cursor.fetchall()
- cursor.close()
- ts_count = 0
- for fg in result:
- ts_count += int(fg[0]) & 0xF
- return response.json(0, ts_count)
- except Exception as e:
- print(e)
- return response.json(
- 500,
- "error_line:{}, error_msg:{}".format(e.__traceback__.tb_lineno, repr(e)),
- )
- @staticmethod
- def write_redis_list(response):
- redis_obj = RedisObject()
- for i in range(10):
- redis_obj.rpush("serial_redis_list", i)
- return response.json(0)
- @staticmethod
- def read_redis_list(response):
- redis_obj = RedisObject()
- serial_redis_list_len = redis_obj.llen("serial_redis_list")
- if serial_redis_list_len > 0:
- for i in range(serial_redis_list_len):
- serial = redis_obj.lpop("serial_redis_list")
- print(serial)
- return response.json(0)
- @staticmethod
- def play_m3u8(request_dict, response): # 根据sts播放m3u8 视频流
- uid = request_dict.get("uid", None)
- channel = request_dict.get("channel", None)
- storeTime = request_dict.get("time", None)
- now_time = int(time.time())
- try:
- split_vod_hls_obj = SplitVodHlsObject()
- vh_qs = split_vod_hls_obj.get_vod_hls_data(
- uid=uid, channel=channel, start_time=storeTime, end_time__gte=now_time
- ).values("sec", "fg", "bucket_id")
- if not vh_qs.exists():
- return response.json(173)
- vod_bucket_qs = VodBucketModel.objects.filter(id=vh_qs[0]["bucket_id"]).values("bucket", "region", "mold")
- if not vod_bucket_qs.exists():
- return response.json(173)
- fg = int(vh_qs[0]["fg"])
- bucket_region = vod_bucket_qs[0]["region"]
- bucket_name = vod_bucket_qs[0]["bucket"]
- mold = vod_bucket_qs[0]["mold"]
- session = Session(
- aws_access_key_id=AWS_ACCESS_KEY_ID[mold],
- aws_secret_access_key=AWS_SECRET_ACCESS_KEY[mold],
- region_name=bucket_region,
- )
- conn = session.client("s3")
- playlist_entries = []
- # ts_count = fg & 0xf
- # fg 64位整型,低四位代表ts文件总数,然后进行位运算,一次移四位,每四位转为十进制即为当前ts文件的秒数
- for i in range(15):
- shift = (i + 1) * 4
- duration = (fg >> shift) & 0xF
- if duration > 0:
- tsFile = "{uid}/vod{channel}/{time}/ts{i}.ts".format(uid=uid, channel=channel, time=storeTime, i=i)
- response_url = conn.generate_presigned_url(
- "get_object",
- Params={"Bucket": bucket_name, "Key": tsFile},
- ExpiresIn=24 * 60 * 60,
- )
- playlist_entries.append(
- {
- "name": response_url,
- "duration": duration,
- }
- )
- playlist = PlaylistGenerator(playlist_entries).generate()
- response = HttpResponse(playlist)
- response["Content-Type"] = "application/octet-stream"
- response["Content-Disposition"] = 'attachment;filename="play.m3u8"'
- return response
- except Exception as e:
- print(e)
- return response.json(
- 500,
- "error_line:{}, error_msg:{}".format(e.__traceback__.tb_lineno, repr(e)),
- )
- @classmethod
- def getSerialNumberInfo(cls, request_dict, response):
- logger = logging.getLogger("info")
- serial_number = request_dict.get("serialNumber", None)
- if not serial_number:
- return response.json(444)
- serialNumber = serial_number[:9]
- serial_number = serial_number[:6]
- try:
- uid_user_message = {
- "uid": "",
- "serialNumber": "",
- "userID": "",
- "username": "",
- "primaryUserID": "",
- "vodPrimaryUserID": "vodPrimaryUserID",
- }
- data = {
- "uid": "",
- "serialNumber": serialNumber,
- "status": "",
- "uid_user_message": uid_user_message,
- }
- company_serial_qs = CompanySerialModel.objects.filter(serial_number=serial_number).values("status")
- if not company_serial_qs.exists():
- return response.json(173)
- if company_serial_qs[0]["status"] == 0:
- return response.json(0, {"contents": "序列号未分配"})
- uid_company_serial_qs = UIDCompanySerialModel.objects.filter(
- company_serial__serial_number=serial_number
- ).values("uid__uid", "uid__status", "company_serial__serial_number")
- if not uid_company_serial_qs.exists() and company_serial_qs[0]["status"] != 0:
- if company_serial_qs[0]["status"] == 1:
- data["status"] = "已分配"
- if company_serial_qs[0]["status"] == 2:
- data["status"] = "绑定uid"
- if company_serial_qs[0]["status"] == 3:
- data["status"] = "已占用"
- return response.json(0, data)
- for uid_company_serial in uid_company_serial_qs:
- data["uid"] = uid_company_serial["uid__uid"]
- data["serialNumber"] = serialNumber
- data["status"] = uid_company_serial["uid__status"]
- if company_serial_qs[0]["status"] == 1:
- data["status"] = "已分配"
- if company_serial_qs[0]["status"] == 2:
- data["status"] = "绑定uid"
- if company_serial_qs[0]["status"] == 3:
- data["status"] = "已占用"
- uid = uid_company_serial["uid__uid"] if uid_company_serial["uid__uid"] else ""
- device_info_qs = Device_Info.objects.filter(UID=uid).values(
- "UID",
- "serial_number",
- "userID_id",
- "primaryUserID",
- "vodPrimaryUserID",
- "userID__username",
- )
- uid_user_message = {
- "uid": device_info_qs[0]["UID"] if device_info_qs.exists() else "",
- "serialNumber": device_info_qs[0]["serial_number"] if device_info_qs.exists() else "",
- "userID": device_info_qs[0]["userID_id"] if device_info_qs.exists() else "",
- "username": device_info_qs[0]["userID__username"] if device_info_qs.exists() else "",
- "primaryUserID": device_info_qs[0]["primaryUserID"] if device_info_qs.exists() else "",
- "vodPrimaryUserID": device_info_qs[0]["vodPrimaryUserID"] if device_info_qs.exists() else "",
- }
- data["uid_user_message"] = uid_user_message
- return response.json(0, data)
- except Exception as e:
- logger.info("查询异常:{}".format(e))
- return response.json(500)
- @classmethod
- def get_serial_details(cls, request_dict, response, request):
- """
- 根据序列号查询各个服绑定状态
- """
- try:
- serial_number = request_dict.get("serialNumber", None)
- if not serial_number:
- return response.json(0)
- if not CONFIG_INFO == "cn":
- return response.json(0, [])
- def fetch_data(url, data, server_info, results_data):
- try:
- max_retries = 3
- res = ""
- for i in range(max_retries):
- res = requests.post(url, data=data, timeout=10)
- if res.status_code == 200:
- break
- result = json.loads(res.text)
- result['result'].update(server_info)
- results_data.append(result['result'])
- except Exception as e:
- LOGGER.error(f"Error fetching data from {url}: {e}")
- results_data = []
- servers = [
- ("https://www.zositechc.cn/testApi/getSerialNumberInfo", {"serialNumber": serial_number},
- {"server": 1, "serverName": "中国服", "domainName": "https://www.zositechc.cn"}),
- ("https://www.dvema.com/testApi/getSerialNumberInfo", {"serialNumber": serial_number},
- {"server": 2, "serverName": "美国服", "domainName": "https://www.dvema.com"}),
- ("https://api.zositeche.com/testApi/getSerialNumberInfo", {"serialNumber": serial_number},
- {"server": 3, "serverName": "欧洲服", "domainName": "https://api.zositeche.com"}),
- ("https://test.zositechc.cn/testApi/getSerialNumberInfo", {"serialNumber": serial_number},
- {"server": 4, "serverName": "测试服", "domainName": "https://test.zositechc.cn"})
- ]
- threads = []
- for url, data, server_info in servers:
- thread = threading.Thread(target=fetch_data, args=(url, data, server_info, results_data))
- threads.append(thread)
- thread.start()
- for thread in threads:
- thread.join()
- # 检查 results_data 中是否包含所有服务器的结果
- server_ids = {1, 2, 3, 4}
- fetched_server_ids = {item['server'] for item in results_data}
- if not fetched_server_ids == server_ids:
- return response.json(500, 'please try again')
- # 排序
- results_data = sorted(results_data, key=lambda x: x["server"])
- is_ok = True
- operation = ""
- status_log = ""
- for item in results_data:
- if item["status"] == "绑定uid":
- is_ok = False
- operation = "检测{}序列号已绑定UID".format(serial_number)
- LOGGER.info("序列号检测状态已绑定:{}".format(serial_number))
- break
- elif item["status"] == "已占用":
- status_log = "序列号检测状态已占用:{}".format(serial_number)
- if is_ok:
- LOGGER.info(status_log)
- LOGGER.info("序列号检测状态正常{}".format(serial_number))
- return response.json(0, results_data)
- log = {
- "ip": CommonService.get_ip_address(request),
- "user_id": 1,
- "status": 200,
- "time": int(time.time()),
- "operation": operation,
- "url": "testApi/get-serial-details",
- }
- LogModel.objects.create(**log)
- return response.json(0, results_data)
- except Exception as e:
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @classmethod
- def find_device_serial(cls, request_dict, response):
- try:
- firmware_time_code_no = request_dict.get("firmwareTimeCode", None)
- function_type_str = request_dict.get("functionType", None)
- serial_no = request_dict.get("serialNo", None)
- time_stamp = request_dict.get("timeStamp", None)
- sign = request_dict.get("sign", None)
- if not CommonService.check_time_stamp_token(sign, time_stamp):
- return response.json(13)
- if not function_type_str:
- return response.json(444)
- with transaction.atomic():
- first_firmwares_qs = TestDeviceFindSerial.objects.filter(firmware_time_code=firmware_time_code_no)
- if function_type_str == "device_save_serial": # 设备上报序列号绑定固件码 get_device_serial
- if not all([firmware_time_code_no, serial_no]):
- return response.json(444)
- if first_firmwares_qs.exists():
- return response.json(174)
- nowtime = int(time.time())
- params = {
- "firmware_time_code": firmware_time_code_no,
- "serial_number": serial_no,
- "created_time": nowtime,
- }
- TestDeviceFindSerial.objects.create(**params)
- return response.json(0)
- elif function_type_str == "get_device-serial": # 根据固件码获取序列号
- if not firmware_time_code_no:
- return response.json(444)
- if first_firmwares_qs.exists():
- result = {
- "Id": first_firmwares_qs.first().id,
- "firmwareTimeCode": firmware_time_code_no,
- "serialNumber": first_firmwares_qs.first().serial_number,
- "createdTime": first_firmwares_qs.first().created_time,
- }
- print("返回结果 : %s", result)
- return response.json(0, result)
- else:
- return response.json(173)
- else:
- return response.json(444)
- except Exception as e:
- LOGGER.info("异常详情,errLine:{}, errMsg:{}".format(e.__traceback__.tb_lineno, repr(e)))
- return response.json(178, e)
- @staticmethod
- def password(request_dict, response):
- password = request_dict.get("password", None)
- password_version = request_dict.get("pwdVersion", "V1")
- if password_version == "V1":
- password = make_password(password)
- return response.json(0)
- @staticmethod
- def ali_text_review(request_dict, response):
- service = "nickname_detection"
- service_parameters = request_dict.get("service_parameters", None)
- review_result = ContentSecurity().text_review(service, service_parameters)
- return response.json(0, review_result)
- @staticmethod
- def ali_image_review(request_dict, response):
- aws_s3_client = boto3.client(
- "s3",
- region_name=REGION_NAME2,
- aws_access_key_id=ACCESS_KEY_ID,
- aws_secret_access_key=SECRET_ACCESS_KEY,
- config=botocore.client.Config(signature_version="s3v4"),
- )
- # default/default.png
- params = {"Bucket": AVATAR_BUCKET, "Key": "default/default.png"}
- try:
- image_url = aws_s3_client.generate_presigned_url("get_object", Params=params)
- print(image_url)
- service = "profilePhotoCheck"
- service_dict = {"imageUrl": image_url}
- service_parameters = json.dumps(service_dict)
- legal = ContentSecurity().image_review(service, service_parameters)
- return response.json(0, legal)
- except Exception as e:
- return response.json(
- 500,
- "error_line:{}, error_msg:{}".format(e.__traceback__.tb_lineno, repr(e)),
- )
- @staticmethod
- def add_cron_job(request_dict, response):
- task_id = request_dict.get('task_id', None)
- timezone_offset = float(request_dict.get('timezone_offset', None))
- hour = request_dict.get('hour', None)
- minute = request_dict.get('minute', None)
- day_of_week = request_dict.get('day_of_week', None)
- apscheduler_obj = ApschedulerObject(timezone_offset)
- apscheduler_obj.create_cron_job(apscheduler_obj.auto_hello, task_id, day_of_week, hour, minute, ('date',))
- return response.json(0)
- @staticmethod
- def add_date_job(request_dict, response):
- task_id = request_dict.get('task_id', None)
- time_stamp = request_dict.get('time_stamp', None)
- time_stamp = int(time_stamp)
- apscheduler_obj = ApschedulerObject()
- apscheduler_obj.create_date_job(apscheduler_obj.auto_hello, task_id, time_stamp, ('date',))
- return response.json(0)
- @staticmethod
- def del_apscheduler_job(request_dict, response):
- task_id = request_dict.get('task_id', None)
- apscheduler_obj = ApschedulerObject()
- apscheduler_obj.del_job(task_id)
- return response.json(0)
- @classmethod
- def get_pay_pal_transactions(cls, request_dict, response):
- from Ansjer.config import PAYPAL_CRD
- try:
- params = (
- ('start_date', '2023-09-15T00:00:00-0800'),
- ('end_date', '2023-09-15T23:59:59-0800'),
- ('fields', 'all'),
- ('page_size', '20'),
- ('page', '1'),
- )
- result = PayPalService(PAYPAL_CRD['Vsees']['client_id'],
- PAYPAL_CRD['Vsees']['client_secret']).get_transactions(params)
- return response.json(0, result)
- except Exception as e:
- LOGGER.info('UnicomManageControllerView.transfer_device_package, errLine:{}, errMsg:{}'.format(
- e.__traceback__.tb_lineno, repr(e)))
- return response.json(500)
- @staticmethod
- def convertTimestamp(request_dict, response):
- timezone_offset = request_dict.get('timezone_offset', None)
- time_string = request_dict.get('time_string', None)
- if not all([timezone_offset, time_string]):
- return response.json(444)
- try:
- timezone_offset = float(timezone_offset)
- timestamp = CommonService.convert_to_timestamp(timezone_offset, time_string)
- return response.json(0, timestamp)
- except Exception as e:
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def checkOrderExist(request_dict, response):
- trade_no = request_dict.get('trade_no', None)
- pay_time = request_dict.get('pay_time', None)
- refund_order = request_dict.get('refund_order', None)
- try:
- order_qs = Order_Model.objects.filter(trade_no=trade_no)
- is_exist = 0
- if order_qs.exists():
- is_exist = 1
- if not refund_order:
- order_qs.update(payTime=pay_time, addTime=pay_time)
- return response.json(0, {'is_exist': is_exist})
- except Exception as e:
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def foreignIp(request_dict, response):
- ip = request_dict.get('ip', None)
- ip_obj = GeoIP2(ip)
- return response.json(0)
- @staticmethod
- def getForeignWeather(request_dict, response):
- lat = float(request_dict.get('lat', None))
- lon = float(request_dict.get('lon', None))
- open_weather_map = OpenWeatherMap(lat, lon)
- temp, humidity = open_weather_map.get_current_weather()
- res = {
- 'temp': temp,
- 'humidity': humidity
- }
- return response.json(0, res)
- @staticmethod
- def unionpayback(request_dict, response):
- logging.info('银联回调参数:{}'.format(request_dict))
- return response.json(0)
- @staticmethod
- def uploadAVSS(request, response):
- file = request.FILES.get("file", None)
- file_name = file.name
- file_key = "pc/{}".format(file_name)
- try:
- s3 = AmazonS3Util(AWS_ACCESS_KEY_ID[0], AWS_SECRET_ACCESS_KEY[0], 'cn-northwest-1')
- # 存储桶
- bucket = "ansjerfilemanager"
- s3.upload_file_obj(
- bucket,
- file_key,
- file,
- {"ContentType": file.content_type, "ACL": "public-read"},
- )
- return response.json(0)
- except Exception as e:
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def oci_oss(request, response):
- aws_access_key_id = '36edb60759487f3a2668ff7ab8da33238f902ff1'
- secret_access_key = 'NoCObsbQymztD/RxTWvFXVl854HOW78Ch9allKOZJn4='
- region_name = 'us-phoenix-1'
- try:
- s3_client = boto3.client(
- 's3',
- aws_access_key_id=aws_access_key_id,
- aws_secret_access_key=secret_access_key,
- config=botocore.client.Config(signature_version='s3v4'),
- region_name=region_name,
- endpoint_url='https://servers.compat.objectstorage.us-phoenix-1.oraclecloud.com'
- )
- # 存储桶
- bucket = 'push'
- key = 'test.jpeg'
- client_method = 'put_object'
- # pre_signed_url = s3_client.generate_file_obj_url(bucket, key, client_method, 24*3600)
- pre_signed_url = s3_client.generate_presigned_url(
- ClientMethod=client_method,
- ExpiresIn=7 * 24 * 3600,
- Params={
- 'Bucket': bucket,
- 'Key': key
- }
- )
- return response.json(0, pre_signed_url)
- except Exception as e:
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def celery(request_dict, response):
- from celery import Celery
- from Ansjer.celery import app
- from django_celery_beat.models import PeriodicTask, IntervalSchedule, ClockedSchedule, CrontabSchedule
- every = int(request_dict.get('every', None))
- period = request_dict.get('period', None)
- name = request_dict.get('name', None)
- task = request_dict.get('task', None)
- minute = request_dict.get('minute', None)
- hour = request_dict.get('hour', None)
- day_of_week = request_dict.get('day_of_week', None)
- time_string = request_dict.get('time_string', None)
- timezone_offset = request_dict.get('timezone_offset', None)
- timezone_offset = float(timezone_offset)
- celery_beat_obj = CeleryBeatObj()
- args = ['啊哈']
- kwargs = {
- 'x': 3,
- 'y': 3
- }
- try:
- # 间隔任务
- # celery_beat_obj.creat_interval_task(every, period, name, task)
- # schedule, _ = IntervalSchedule.objects.get_or_create(every=6, period=IntervalSchedule.SECONDS)
- # PeriodicTask.objects.create(interval=schedule, name='hello_6_task', task='Controller.CeleryTasks.tasks.hello')
- # PeriodicTask.objects.create(interval=schedule, name='clock_task',
- # task='Controller.CeleryTasks.tasks.hello')
- # 定时任务
- celery_beat_obj.creat_clocked_task(
- name=name, task=task, timezone_offset=timezone_offset, time_string=time_string)
- # time_stamp = CommonService.convert_to_timestamp(timezone_offset, time_string)
- # clocked_time = CommonService.get_date_from_timestamp(time_stamp, timezone_offset)
- # schedule, _ = ClockedSchedule.objects.get_or_create(clocked_time=clocked_time)
- # PeriodicTask.objects.create(clocked=schedule, one_off=True, name=task_name,
- # task='Controller.CeleryTasks.tasks.hello')
- # 周期任务
- # celery_beat_obj.creat_crontab_task(
- # timezone_offset, name, task, minute=minute, hour=hour, day_of_week=day_of_week, kwargs=kwargs)
- # time_zone_info_qs = TimeZoneInfo.objects.filter(tz=timezone_offset).values('zone_info')
- # zone_info = time_zone_info_qs[0]['zone_info']
- # timezone = zoneinfo.ZoneInfo(zone_info)
- # schedule, _ = CrontabSchedule.objects.get_or_create(
- # minute='11',
- # hour='16',
- # day_of_week='*',
- # day_of_month='*',
- # month_of_year='*',
- # timezone=timezone)
- # PeriodicTask.objects.create(crontab=schedule, name=task_name,
- # task='Controller.CeleryTasks.tasks.hello')
- # 暂停/恢复/删除任务
- # celery_beat_obj.disable_task(name)
- # celery_beat_obj.enable_task(name)
- celery_beat_obj.del_task(name)
- return response.json(0)
- except Exception as e:
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def tz(request_dict, response):
- try:
- zone_info_list = ['Pacific/Midway', 'Pacific/Honolulu', 'America/Anchorage', 'America/Los_Angeles',
- 'America/Denver', 'America/Chicago', 'America/New_York', 'America/Halifax',
- 'Canada/Newfoundland', 'America/Buenos_Aires', 'Atlantic/South_Georgia',
- 'Atlantic/Cape_Verde', 'Europe/London', 'Europe/Paris', 'Europe/Athens', 'Europe/Moscow',
- 'Asia/Tehran', 'Asia/Dubai', 'Asia/Kabul', 'Asia/Karachi', 'Asia/Almaty', 'Asia/Rangoon',
- 'Asia/Bangkok', 'Asia/Shanghai', 'Asia/Tokyo', 'Australia/Darwin', 'Australia/Sydney',
- 'Asia/Magadan', 'Pacific/Auckland']
- for zone_info in zone_info_list:
- timezone = zoneinfo.ZoneInfo(zone_info)
- print(timezone)
- return response.json(0)
- except Exception as e:
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def get_token(request_dict, response):
- """
- 获取临时token
- @param request_dict: 请求参数
- @request_dict uid: 设备uid
- @param response: 响应对象
- @return: response
- """
- try:
- sts_client_conn = boto3.client(
- 'sts',
- aws_access_key_id='AKIA2E67UIMD45Y3HL53',
- aws_secret_access_key='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',
- region_name='us-east-1'
- )
- sts_obj = sts_client_conn.get_session_token(DurationSeconds=129600)
- res = {
- 'AccessKeyId': sts_obj['Credentials']['AccessKeyId'],
- 'AccessKeySecret': sts_obj['Credentials']['SecretAccessKey'],
- 'SessionToken': sts_obj['Credentials']['SessionToken'],
- 'Expiration': str(sts_obj['Credentials']['Expiration'])
- }
- return response.json(0, res)
- except Exception as e:
- print(e)
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def lang_country(request_dict, response):
- """
- tb_country_language写入数据
- @param request_dict: 请求参数
- @param response: 响应对象
- @return: response
- """
- try:
- language_id = int(request_dict.get('language_id'))
- now_time = int(time.time())
- country_language_qs = CountryLanguageModel.objects.filter(language_id=1). \
- order_by('-id').values('country_id')
- country_list = []
- with open('国家.txt', 'r') as file:
- lines = file.readlines()
- for line in lines:
- country_list.append(line.rstrip())
- print(country_list)
- for country_language in country_language_qs:
- country_name = country_list.pop()
- country_id = country_language['country_id']
- print(country_name, country_id)
- CountryLanguageModel.objects.create(
- country_name=country_name, language_id=language_id, country_id=country_id, add_time=now_time,
- update_time=now_time)
- return response.json(0)
- except Exception as e:
- print(e)
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def getObsSignedUrl(request_dict, response):
- """
- 华为对象存储生成上传对象的带授权信息的URL
- @param request_dict: 请求参数
- @param response: 响应对象
- @return: response
- """
- from obs import ObsClient
- ak = 'TN9T7ZPN3QRBBQ9NQHNB'
- sk = 'rIlTBJ85MUC1WNLyJBZM2077HTsQ0qJaJf4IpTjU'
- server = 'https://obs.cn-east-3.myhuaweicloud.com'
- bucket_name = 'asj-push'
- uid = '517J385BNUGP3CPP111A'
- channel = '1'
- now_time = str(int(time.time()))
- object_key = '{}/{}/{}.jpeg'.format(uid, channel, now_time)
- try:
- obs_client = ObsClient(access_key_id=ak, secret_access_key=sk, server=server)
- res = obs_client.createSignedUrl(
- method='PUT', bucketName=bucket_name, objectKey=object_key, expires=3600)
- return response.json(0, res)
- except Exception as e:
- print(e)
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def duplicate_removal_uid_set(request_dict, response):
- """
- uid_set去重
- @param request_dict: 请求参数
- @param response: 响应对象
- @return: response
- """
- try:
- uid_list = []
- with open('uid.txt', 'r') as file:
- lines = file.readlines()
- for line in lines:
- uid_list.append(line.rstrip())
- print(uid_list)
- for uid in uid_list:
- uid_set_qs = UidSetModel.objects.filter(uid=uid).values('id', 'ai_type')
- for uid_set in uid_set_qs:
- # 删除没关联uid push的数据
- uid_set_id = uid_set['id']
- uid_push_qs = UidPushModel.objects.filter(uid_set_id=uid_set['id'])
- if not uid_push_qs.exists():
- # UidSetModel.objects.filter(id=uid_set_id).delete()
- break
- return response.json(0)
- except Exception as e:
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def checkSerialUID(response):
- try:
- cursor = connection.cursor()
- sql = 'SELECT company_serial_id FROM `tb_uid_company_serial` GROUP BY company_serial_id HAVING count(company_serial_id) >= 2'
- cursor.execute(sql)
- rows = cursor.fetchall()
- for row in rows:
- company_serial_id = row[0]
- # 查询关联的uid
- uid_qs = UIDCompanySerialModel.objects.filter(company_serial_id=company_serial_id).\
- values('company_serial__serial_number', 'uid__uid', 'uid_id', 'company_serial_id')
- print(uid_qs[0]['company_serial__serial_number'])
- pre_del_count = 0
- pre_del_uid_id = 0
- for uid_q in uid_qs:
- uid = uid_q['uid__uid']
- log_count = LogModel.objects.filter(operation__contains=uid).count()
- if log_count == 1:
- pre_del_count += 1
- pre_del_uid_id = uid_q['uid_id']
- # 如果uid相关日志大于1的计数只有一次,删除该uid绑定数据
- if pre_del_count == 1:
- print('序列号解绑uid,company_serial_id:{},uid_id:{}'.
- format(uid_qs[0]['company_serial_id'], pre_del_uid_id))
- UIDCompanySerialModel.objects.filter(uid_id=pre_del_uid_id).delete()
- cursor.close()
- return response.json(0)
- except Exception as e:
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
|