12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328 |
- #!/usr/bin/python3.6
- # -*- coding: utf-8 -*-
- #
- # Copyright (C) 2022 #
- # @Time : 2022/4/1 11:27
- # @Author : ming
- # @Email : zhangdongming@asj6.wecom.work
- # @File : CronTaskController.py
- # @Software: PyCharm
- import datetime
- import threading
- import time
- import requests
- from django.db import connection, connections, transaction
- from django.db.models import Q, Sum, Count
- from django.views import View
- from Ansjer.config import USED_SERIAL_REDIS_LIST, UNUSED_SERIAL_REDIS_LIST, CONFIG_INFO, CONFIG_US, \
- RESET_REGION_ID_SERIAL_REDIS_LIST, LOGGER, PAYPAL_CRD
- from Model.models import Device_User, Device_Info, UidSetModel, UID_Bucket, Unused_Uid_Meal, Order_Model, StsCrdModel, \
- VodHlsModel, ExperienceContextModel, AiService, VodHlsSummary, VideoPlaybackTimeModel, DeviceUserSummary, \
- CountryModel, DeviceTypeModel, OrdersSummary, DeviceInfoSummary, CompanySerialModel, \
- CloudLogModel, UidCloudStorageCount, UserExModel, DeviceDomainRegionModel, VodHlsTag, VodHlsTagType, \
- Store_Meal, Lang, VodBucketModel, UnicomComboOrderInfo, UnicomDeviceInfo, AbnormalOrder
- from Object.RedisObject import RedisObject
- from Object.ResponseObject import ResponseObject
- from Object.utils import LocalDateTimeUtil
- from Object.utils.PayPalUtil import PayPalService
- from Service.CommonService import CommonService
- from Service.VodHlsService import SplitVodHlsObject
- from Object.UnicomObject import UnicomObjeect
- from Object.WechatPayObject import WechatPayObject
- class CronDelDataView(View):
- def get(self, request, *args, **kwargs):
- request.encoding = 'utf-8'
- operation = kwargs.get('operation')
- return self.validation(request.GET, request, operation)
- def post(self, request, *args, **kwargs):
- request.encoding = 'utf-8'
- operation = kwargs.get('operation')
- return self.validation(request.POST, request, operation)
- def validation(self, request_dict, request, operation):
- response = ResponseObject()
- if operation == 'delAccessLog': # 定时删除访问接口数据
- return self.delAccessLog(response)
- elif operation == 'delPushInfo': # 定时删除推送数据
- return self.delPushInfo(response)
- elif operation == 'delPushInfoV2': # 定时删除推送数据V2
- return self.delPushInfoV2(response)
- elif operation == 'delVodHls': # 定时删除云存播放列表
- return self.delVodHls(response)
- elif operation == 'delCloudLog': # 定时删除云存接口数据
- return self.delCloudLog(response)
- elif operation == 'delTesterDevice': # 定时删除测试账号下的设备数据
- return self.delTesterDevice(response)
- elif operation == 'delAppLog': # 定时删除app日志
- return self.delAppLog(response)
- elif operation == 'UpdateConfiguration': # 定时更新配置
- return self.UpdateConfiguration(response)
- elif operation == 'cloud-log':
- return self.uid_cloud_storage_upload_count(response)
- elif operation == 'delDeviceLog': # 定时删除设备日志
- return self.del_device_log(response)
- else:
- return response.json(404)
- @staticmethod
- def UpdateConfiguration(response):
- """
- 定时更新配置
- @param response: 响应对象
- @return:
- """
- try:
- ucode_list = ['823C01552AA',
- '823C01550AA',
- '823C01550XA',
- '823C01850XA',
- '730201350AA',
- '730201350AA',
- '730201450AA',
- '730201450MA',
- '72V201252AA',
- '72V201253AA',
- '72V201353AA',
- '72V201354AA',
- '72V201355AA',
- '72V201254AA',
- 'V82301850AA',
- 'V82301850XA',
- '72V201257AA']
- UidSetModel.objects.filter(ucode__in=ucode_list, is_human=0).update(is_human=1)
- ucode_list = ['72V201257AA', '72V201254AA']
- UidSetModel.objects.filter(ucode__in=ucode_list, mobile_4g=0).update(mobile_4g=1)
- # 根据设备规格码定时更新默认算法类型类型
- ucode_list = ['823C01552AA', '823C01550AA', '823C01550XA', 'C18201550KA',
- '823C01550TA', '823C01550VA', '823C01850XA', 'C18201850KA',
- '823C01850TA', '823C01850VA', 'C22501850VA']
- UidSetModel.objects.filter(ucode__in=ucode_list, ai_type=0).update(ai_type=47)
- ucode_list = ['730201350AA', '730201450AA', '730201450MA', '730201450NA']
- UidSetModel.objects.filter(ucode__in=ucode_list, ai_type=0).update(ai_type=7)
- ucode_list = ['V82301850AA', 'V82301850XA']
- UidSetModel.objects.filter(ucode__in=ucode_list, ai_type=0).update(ai_type=2031)
- # 根据设备规格码更新默认个性化语音值
- ucode_list = ['823C01552AA', '823C01550XA', 'C18201550KA', '823C01550TA',
- '823C01550VA', '823C01850XA', 'C18201850KA', '823C01850TA', '823C01850VA',
- '730201450AA', '730201450MA', '730201450NA', '72V201252AA', '72V201253AA',
- '72V201353AA', '72V201354AA', '72V201355AA', '72V201254AA', 'C22501850VA',
- 'V82301850AA', 'V82301850XA', '72V201257AA']
- UidSetModel.objects.filter(ucode__in=ucode_list, is_custom_voice=0).update(is_custom_voice=1)
- # 根据设备规格码更新is_ai
- ucode_list = ['823C01552AA', '823C01550XA', 'C18201550KA', '823C01550TA',
- '823C01550VA', '823C01850XA', 'C18201850KA', '823C01850TA', '823C01850VA',
- '730201450AA', '730201450MA', '730201450NA', '72V201252AA', '72V201253AA',
- '72V201353AA', '72V201354AA', '72V201355AA', '72V201254AA', 'C22501850VA',
- 'V82301850AA', 'V82301850XA', '72V201257AA']
- UidSetModel.objects.filter(ucode__in=ucode_list, is_ai=2).update(is_ai=1)
- return response.json(0)
- except Exception as e:
- LOGGER.info('UpdateConfiguration异常详情,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def delAppLog(response):
- """
- 定时删除app日志
- @param response: 响应对象
- @return:
- """
- nowTime = int(time.time())
- try:
- cursor = connection.cursor()
- month_ago_time = nowTime - 30 * 24 * 60 * 60 # 保留近30天的数据
- sql = 'DELETE FROM `app_log` WHERE add_time<{}'.format(month_ago_time)
- cursor.execute(sql)
- cursor.close()
- return response.json(0)
- except Exception as e:
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def uid_cloud_storage_upload_count(response):
- try:
- now_time = int(time.time())
- local_time = LocalDateTimeUtil.get_before_days_timestamp(now_time)
- format_str = '%Y-%m-%d'
- date_str = LocalDateTimeUtil.time_stamp_to_time(local_time, format_str)
- start_time, end_time = LocalDateTimeUtil.get_start_and_end_time(date_str, format_str)
- cs_uid_qs = UID_Bucket.objects.filter(addTime__gte=int(1669824000)).values('uid')
- if not cs_uid_qs.exists():
- return response.json(0)
- for item in cs_uid_qs:
- uid = item['uid']
- cloud_log_qs = CloudLogModel.objects.filter(uid=uid, operation=r'cloudstorage/storeplaylist',
- time__gte=start_time, time__lte=end_time)
- cloud_log_qs = cloud_log_qs.values('uid')[0:1]
- if not cloud_log_qs.exists():
- continue
- count_data = {'uid': uid, 'count': cloud_log_qs.count(), 'created_time': end_time,
- 'updated_time': end_time}
- UidCloudStorageCount.objects.create(**count_data)
- return response.json(0)
- except Exception as e:
- LOGGER.info('异常详情,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- return response.json(500)
- @staticmethod
- def delAccessLog(response):
- try:
- cursor = connection.cursor()
- # 删除7天前的数据
- last_week = LocalDateTimeUtil.get_last_week()
- sql = 'DELETE FROM access_log WHERE time < %s limit %s'
- cursor.execute(sql, [last_week, 10000])
- # 关闭游标
- cursor.close()
- connection.close()
- return response.json(0)
- except Exception as e:
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @classmethod
- def delPushInfo(cls, response):
- now_time = int(time.time())
- try:
- # 当前时间转日期
- local_date_now = str(datetime.datetime.fromtimestamp(int(now_time)).date())
- # 根据日期获取周几
- week_val = LocalDateTimeUtil.date_to_week(local_date_now)
- # 根据当前时间获取7天前时间戳
- expiration_time = LocalDateTimeUtil.get_before_days_timestamp(now_time, 7)
- # 异步删除推送消息
- kwargs = {
- 'week_val': week_val,
- 'expiration_time': expiration_time
- }
- del_push_info_thread = threading.Thread(
- target=cls.del_push_info_data,
- kwargs=kwargs)
- del_push_info_thread.start()
- return response.json(0)
- except Exception as e:
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def del_push_info_data(**kwargs):
- cursor = connections['mysql02'].cursor()
- # 获取删除星期列表
- week_val = kwargs['week_val']
- del_week_val_list = [i for i in range(1, 8)]
- # 移除当天和前后两天
- del_week_val_list.remove(week_val)
- if week_val == 1:
- pre_week_val = 7
- else:
- pre_week_val = week_val - 1
- del_week_val_list.remove(pre_week_val)
- if week_val == 7:
- nex_week_val = 1
- else:
- nex_week_val = week_val + 1
- del_week_val_list.remove(nex_week_val)
- expiration_time = kwargs['expiration_time']
- # 每次删除条数
- size = 5000
- # 删除7天前的数据
- sql = "DELETE FROM equipment_info WHERE addTime<= %s LIMIT %s "
- cursor.execute(sql, [expiration_time, size])
- for week_val in del_week_val_list:
- if week_val == 1:
- sql = "DELETE FROM equipment_info_monday WHERE add_time<= %s LIMIT %s "
- if week_val == 2:
- sql = "DELETE FROM equipment_info_tuesday WHERE add_time<= %s LIMIT %s "
- if week_val == 3:
- sql = "DELETE FROM equipment_info_wednesday WHERE add_time<= %s LIMIT %s "
- if week_val == 4:
- sql = "DELETE FROM equipment_info_thursday WHERE add_time<= %s LIMIT %s "
- if week_val == 5:
- sql = "DELETE FROM equipment_info_friday WHERE add_time<= %s LIMIT %s "
- if week_val == 6:
- sql = "DELETE FROM equipment_info_saturday WHERE add_time<= %s LIMIT %s "
- if week_val == 7:
- sql = "DELETE FROM equipment_info_sunday WHERE add_time<= %s LIMIT %s "
- cursor.execute(sql, [expiration_time, size])
- # 关闭游标
- cursor.close()
- @classmethod
- def delPushInfoV2(cls, response):
- try:
- del_push_info_thread = threading.Thread(
- target=cls.del_push_info_data_v2)
- del_push_info_thread.start()
- return response.json(0)
- except Exception as e:
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def del_push_info_data_v2():
- cursor = connections['mysql02'].cursor()
- # 获取7天前时间戳
- now_time = int(time.time())
- expiration_time = LocalDateTimeUtil.get_before_days_timestamp(now_time, 7)
- # 每次删除条数
- size = 5000
- for i in range(1, 21):
- sql = "DELETE FROM equipment_info_{} WHERE add_time< %s LIMIT %s ".format(i)
- cursor.execute(sql, [expiration_time, size])
- # 关闭游标
- cursor.close()
- @staticmethod
- def delVodHls(response):
- nowTime = int(time.time())
- try:
- CronDelDataView.del_vod_hls_tag()
- cursor = connection.cursor()
- month_ago_time = nowTime - 3 * 30 * 24 * 60 * 60 # 删除3个月前的数据
- sql = 'DELETE FROM `vod_hls` WHERE endTime<{} LIMIT 50000'.format(month_ago_time)
- cursor.execute(sql)
- cursor.close()
- # 删除vod_hls分表数据
- split_vod_hls_obj = SplitVodHlsObject()
- split_vod_hls_obj.del_vod_hls_data(end_time__lt=month_ago_time)
- return response.json(0)
- except Exception as e:
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def del_vod_hls_tag():
- """
- 删除AI标签记录
- """
- e_time = LocalDateTimeUtil.get_before_days_timestamp(int(time.time()), 30)
- VodHlsTagType.objects.filter(created_time__lt=e_time).delete()
- VodHlsTag.objects.filter(created_time__lt=e_time).delete()
- @staticmethod
- def delCloudLog(response):
- nowTime = int(time.time())
- cursor = connection.cursor()
- try:
- # 删除3个月前的数据
- sql = "DELETE FROM `cloud_log` WHERE time<={} LIMIT 50000".format(
- nowTime - 3 * 30 * 24 * 60 * 60)
- cursor.execute(sql)
- # 关闭游标
- cursor.close()
- return response.json(0)
- except Exception as e:
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def delTesterDevice(response):
- try:
- userID_list = [
- 'tech01@ansjer.com',
- 'tech02@ansjer.com',
- 'tech03@ansjer.com',
- 'tech04@ansjer.com',
- 'tech05@ansjer.com',
- 'tech06@ansjer.com',
- 'tech07@ansjer.com',
- 'tech08@ansjer.com',
- 'tech09@ansjer.com',
- 'tech10@ansjer.com',
- 'fix01@ansjer.com',
- 'fix02@ansjer.com',
- 'fix03@ansjer.com',
- 'fix04@ansjer.com',
- 'fix05@ansjer.com']
- device_user = Device_User.objects.filter(username__in=userID_list)
- device_info_qs = Device_Info.objects.filter(
- userID__in=device_user).values('UID')
- uid_list = []
- for device_info in device_info_qs:
- uid_list.append(device_info['UID'])
- with transaction.atomic():
- # 删除设备云存相关数据
- UidSetModel.objects.filter(uid__in=uid_list).delete()
- UID_Bucket.objects.filter(uid__in=uid_list).delete()
- Unused_Uid_Meal.objects.filter(uid__in=uid_list).delete()
- Order_Model.objects.filter(UID__in=uid_list).delete()
- StsCrdModel.objects.filter(uid__in=uid_list).delete()
- VodHlsModel.objects.filter(uid__in=uid_list).delete()
- # 删除vod_hls分表数据
- split_vod_hls_obj = SplitVodHlsObject()
- split_vod_hls_obj.del_vod_hls_data(uid__in=uid_list)
- ExperienceContextModel.objects.filter(uid__in=uid_list).delete()
- Device_Info.objects.filter(userID__in=device_user).delete()
- return response.json(0)
- except Exception as e:
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def del_device_log(response):
- """
- 定时删除设备日志
- @param response: 响应对象
- @return:
- """
- nowTime = int(time.time())
- try:
- cursor = connection.cursor()
- month_ago_time = nowTime - 30 * 24 * 60 * 60 # 保留近30天的数据
- sql = 'DELETE FROM `device_log` WHERE unix_timestamp(add_time)<{}'.format(month_ago_time)
- cursor.execute(sql)
- cursor.close()
- return response.json(0)
- except Exception as e:
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- class CronUpdateDataView(View):
- def get(self, request, *args, **kwargs):
- request.encoding = 'utf-8'
- operation = kwargs.get('operation')
- return self.validation(request.GET, request, operation)
- def post(self, request, *args, **kwargs):
- request.encoding = 'utf-8'
- operation = kwargs.get('operation')
- return self.validation(request.POST, request, operation)
- def validation(self, request_dict, request, operation):
- response = ResponseObject()
- if operation == 'updateUnusedUidBucket': # 定时更新过期云存关联的未使用套餐状态
- return self.updateUnusedUidBucket(response)
- elif operation == 'updateUnusedAiService': # 定时更新过期ai关联的未使用套餐状态
- return self.updateUnusedAiService(response)
- elif operation == 'reqUpdateSerialStatus': # 定时请求更新序列号状态
- return self.reqUpdateSerialStatus(response)
- elif operation == 'updateSerialStatus': # 更新序列号状态
- return self.updateSerialStatus(request_dict, response)
- elif operation == 'reset-region-id': # 重置地区id
- return self.reset_region_id(request_dict, response)
- elif operation == 'updateVodMeal': # 定时修改体验套餐有效期为1个月
- return self.update_vod_meal(request_dict, response)
- else:
- return response.json(404)
- @staticmethod
- def updateUnusedUidBucket(response):
- """
- 监控云存套餐过期修改状态
- @param response:
- @return:
- """
- # 定时更新已过期套餐修改状态为2
- now_time = int(time.time())
- expired_uid_bucket = UID_Bucket.objects.filter(endTime__lte=now_time)
- expired_uid_bucket = expired_uid_bucket.filter(~Q(use_status=2)).values('id')
- if expired_uid_bucket.exists():
- expired_uid_bucket.update(use_status=2)
- # 监控有未使用套餐则自动生效
- expired_uid_buckets = UID_Bucket.objects.filter(endTime__lte=now_time, has_unused=1).values("id", "uid")[0:1000]
- for expired_uid_bucket in expired_uid_buckets:
- unuseds = Unused_Uid_Meal.objects.filter(
- uid=expired_uid_bucket['uid']).values(
- "id",
- "uid",
- "channel",
- "addTime",
- "expire",
- "is_ai",
- "bucket_id",
- "order_id").order_by('addTime')
- if not unuseds.exists():
- continue
- unused = unuseds[0]
- try:
- with transaction.atomic():
- count_unused = Unused_Uid_Meal.objects.filter(uid=expired_uid_bucket['uid']).count()
- has_unused = 1 if count_unused > 1 else 0
- end_time = CommonService.calcMonthLater(unused['expire'])
- UID_Bucket.objects.filter(
- uid=expired_uid_bucket['uid']).update(
- channel=unused['channel'],
- endTime=end_time,
- bucket_id=unused['bucket_id'],
- updateTime=now_time,
- use_status=1,
- has_unused=has_unused)
- if unused['is_ai']:
- ai_service = AiService.objects.filter(uid=expired_uid_bucket['uid'], channel=unused['channel'])
- if ai_service.exists():
- ai_service.update(updTime=now_time, use_status=1, orders_id=unused['order_id'],
- endTime=end_time)
- else:
- AiService.objects.create(uid=expired_uid_bucket['uid'], channel=unused['channel'],
- detect_status=1, addTime=now_time, orders_id=unused['order_id'],
- updTime=now_time, endTime=end_time, use_status=1)
- Unused_Uid_Meal.objects.filter(id=unused['id']).delete()
- StsCrdModel.objects.filter(uid=expired_uid_bucket['uid']).delete() # 删除sts记录
- except Exception as e:
- print(repr(e))
- continue
- return response.json(0)
- @staticmethod
- def updateUnusedAiService(response):
- now_time = int(time.time())
- ai_service_qs = AiService.objects.filter(
- endTime__lte=now_time,
- use_status=1).values(
- 'id',
- 'uid')[
- 0:200]
- for ai_service in ai_service_qs:
- try:
- with transaction.atomic():
- AiService.objects.filter(
- id=ai_service['id']).update(
- use_status=2) # 更新过期ai订单状态
- # 如果存在未使用套餐,更新为使用
- unused_ai_service = AiService.objects.filter(
- uid=ai_service['uid'],
- use_status=0).order_by('addTime')[
- :1].values(
- 'id',
- 'endTime')
- if unused_ai_service.exists():
- # 未使用套餐的endTime在购买的时候保存为有效时间
- effective_day = unused_ai_service[0]['endTime']
- endTime = now_time + effective_day
- AiService.objects.filter(
- id=unused_ai_service[0]['id']).update(
- use_status=1, endTime=endTime, updTime=now_time)
- except Exception:
- continue
- return response.json(0)
- @classmethod
- def reqUpdateSerialStatus(cls, response):
- redis_obj = RedisObject()
- # 更新已使用序列号其他服务器的状态
- used_serial_redis_list = redis_obj.lrange(USED_SERIAL_REDIS_LIST, 0, -1) # 读取redis已使用序列号
- if used_serial_redis_list:
- LOGGER.info('---请求更新已使用序列号列表---used_serial_redis_list:{}'.format(used_serial_redis_list))
- used_serial_redis_list = [str(i, 'utf-8') for i in used_serial_redis_list]
- cls.do_request_function(used_serial_redis_list, 3)
- # 更新未使用序列号其他服务器的状态
- unused_serial_redis_list = redis_obj.lrange(UNUSED_SERIAL_REDIS_LIST, 0, -1) # 读取redis未使用序列号
- if unused_serial_redis_list:
- LOGGER.info('---请求更新未使用序列号列表---unused_serial_redis_list:{}'.format(unused_serial_redis_list))
- unused_serial_redis_list = [str(i, 'utf-8') for i in unused_serial_redis_list]
- cls.do_request_function(unused_serial_redis_list, 1)
- # 重置地区id
- reset_region_id_serial_redis_list = redis_obj.lrange(RESET_REGION_ID_SERIAL_REDIS_LIST, 0, -1) # 读取redis未使用序列号
- if reset_region_id_serial_redis_list:
- LOGGER.info('---请求重置地区id的序列号列表---:{}'.format(reset_region_id_serial_redis_list))
- reset_region_id_serial_redis_list = [str(i, 'utf-8') for i in reset_region_id_serial_redis_list]
- cls.do_request_reset_region_id(reset_region_id_serial_redis_list)
- return response.json(0)
- @staticmethod
- def do_request_function(serial_redis_list, status):
- """
- 请求更新序列号状态
- @param serial_redis_list: 序列号redis列表
- @param status: 状态, 1: 未使用, 3: 已占用
- """
- data = {
- 'serial_redis_list': str(serial_redis_list),
- 'status': status
- }
- # 确认域名列表
- orders_domain_name_list = CommonService.get_orders_domain_name_list()
- redis_obj = RedisObject()
- LOGGER.info('---请求更新序列号线程---data:{},orders_domain_name_list:{}'.format(data, orders_domain_name_list))
- try:
- requests_failed_flag = False # 请求失败标志位
- for domain_name in orders_domain_name_list:
- url = '{}cron/update/updateSerialStatus'.format(domain_name)
- response = requests.post(url=url, data=data, timeout=5)
- LOGGER.info('---请求更新序列号响应时间---:{}'.format(response.elapsed.total_seconds()))
- result = response.json()
- if result['result_code'] != 0: # 请求失败标志位置位
- requests_failed_flag = True
- break
- # 状态为未使用,重置美洲服的地区id
- if status == 1: # 美洲服直接更新
- if CONFIG_INFO == CONFIG_US:
- DeviceDomainRegionModel.objects.filter(~Q(region_id=0), serial_number__in=serial_redis_list). \
- update(region_id=0)
- else: # 其他服请求到美洲服更新
- req_url = 'https://www.dvema.com/cron/update/reset-region-id'
- req_data = {
- 'serial_redis_list': str(serial_redis_list)
- }
- response = requests.post(url=req_url, data=req_data, timeout=5)
- LOGGER.info('---请求重置地区id响应时间---:{}'.format(response.elapsed.total_seconds()))
- result = response.json()
- if result['result_code'] != 0: # 请求失败标志位置位
- requests_failed_flag = True
- break
- if not requests_failed_flag: # 请求成功删除redis序列号
- if status == 1:
- for i in serial_redis_list:
- redis_obj.lrem(UNUSED_SERIAL_REDIS_LIST, 0, i)
- elif status == 3:
- for i in serial_redis_list:
- redis_obj.lrem(USED_SERIAL_REDIS_LIST, 0, i)
- except Exception as e:
- LOGGER.info('---更新序列号状态异常---:{}'.format(repr(e)))
- @staticmethod
- def do_request_reset_region_id(reset_region_id_serial_redis_list):
- """
- 请求重置地区id
- @param reset_region_id_serial_redis_list: 序列号redis列表
- """
- redis_obj = RedisObject()
- requests_failed_flag = False # 请求失败标志位
- data = {
- 'serial_redis_list': str(reset_region_id_serial_redis_list),
- }
- url = 'https://www.dvema.com/cron/update/reset-region-id'
- try:
- response = requests.post(url=url, data=data, timeout=5)
- result = response.json()
- if result['result_code'] != 0: # 请求失败标志位置位
- requests_failed_flag = True
- if not requests_failed_flag: # 请求成功删除redis序列号
- for serial in reset_region_id_serial_redis_list:
- redis_obj.lrem(RESET_REGION_ID_SERIAL_REDIS_LIST, 0, serial)
- except Exception as e:
- LOGGER.info('---请求重置地区id异常---:{}'.format(repr(e)))
- @staticmethod
- def updateSerialStatus(request_dict, response):
- """
- 更新序列号状态
- @param request_dict: 请求参数
- @request_dict serial_redis_list: 序列号redis列表
- @request_dict status: 状态, 1: 未使用, 3: 已占用
- @param response: 响应对象
- """
- serial_redis_list = request_dict.get('serial_redis_list', None)
- status = request_dict.get('status', None)
- LOGGER.info('---更新序列号状态参数---serial_redis_list:{},status:{}'.format(serial_redis_list, status))
- if not all([serial_redis_list, status]):
- return response.json(444)
- now_time = int(time.time())
- try:
- serial_redis_list = eval(serial_redis_list)
- CompanySerialModel.objects.filter(serial_number__in=serial_redis_list).update(status=int(status),
- update_time=now_time)
- return response.json(0)
- except Exception as e:
- LOGGER.info('---更新序列号状态异常---:{}'.format(repr(e)))
- return response.json(500)
- @staticmethod
- def reset_region_id(request_dict, response):
- """
- 重置地区id
- @param request_dict: 请求参数
- @request_dict serial_redis_list: 序列号redis列表
- @param response: 响应对象
- """
- serial_redis_list = request_dict.get('serial_redis_list', None)
- LOGGER.info('---重置地区id参数---serial_redis_list:{}'.format(serial_redis_list))
- if not serial_redis_list:
- return response.json(444)
- try:
- serial_redis_list = eval(serial_redis_list)
- DeviceDomainRegionModel.objects.filter(serial_number__in=serial_redis_list).update(region_id=0)
- return response.json(0)
- except Exception as e:
- LOGGER.info('---重置地区id异常---:{}'.format(repr(e)))
- return response.json(500)
- @staticmethod
- def update_vod_meal(request_dict, response):
- """
- 定时修改体验套餐有效期为1个月
- @param request_dict: 请求参数
- @param response: 响应对象
- """
- try:
- Store_Meal.objects.filter(is_show=0, expire=12, pixel_level=0).update(price='39.99',
- virtual_price='56.6',
- sort=1)
- Store_Meal.objects.filter(is_show=0, cycle_config_id=1, pixel_level=0).update(price='3.65',
- virtual_price='5.66',
- sort=2)
- Store_Meal.objects.filter(id=12).update(price='3.99', virtual_price='5.66', sort=3)
- Store_Meal.objects.filter(id__in=(16, 17, 18)).update(is_show=0)
- return response.json(0)
- except Exception as e:
- LOGGER.info('---修改云存套餐内容异常---:{}'.format(repr(e)))
- return response.json(500)
- class CronCollectDataView(View):
- def get(self, request, *args, **kwargs):
- request.encoding = 'utf-8'
- operation = kwargs.get('operation')
- return self.validation(request.GET, request, operation)
- def post(self, request, *args, **kwargs):
- request.encoding = 'utf-8'
- operation = kwargs.get('operation')
- return self.validation(request.POST, request, operation)
- def validation(self, request_dict, request, operation):
- response = ResponseObject()
- if operation == 'collectPlayBack': # 定时保存云存视频回放
- return self.collect_play_back(response)
- elif operation == 'collectDeviceUser': # 定时保存用户数据
- return self.collect_device_user(response)
- elif operation == 'collectOrder': # 定时保存订单数据
- return self.collect_order(response)
- elif operation == 'collectDeviceInfo': # 定时保存设备数据
- return self.collect_device_info(response)
- elif operation == 'collectFlowInfo': # 定时保存设备数据
- return self.collect_flow_info(response)
- else:
- return response.json(404)
- @staticmethod
- def collect_play_back(response):
- try:
- now_time = int(time.time())
- today = datetime.datetime.today()
- start_time = datetime.datetime(today.year, today.month, today.day)
- end_time = start_time + datetime.timedelta(days=1)
- start_time = CommonService.str_to_timestamp(start_time.strftime('%Y-%m-%d %H:%M:%S'))
- end_time = CommonService.str_to_timestamp(end_time.strftime('%Y-%m-%d %H:%M:%S'))
- this_month_str = datetime.datetime(today.year, today.month, 1)
- this_month_stamp = CommonService.str_to_timestamp(this_month_str.strftime('%Y-%m-%d %H:%M:%S'))
- video_play_back_time_qs = VideoPlaybackTimeModel.objects.filter(startTime__gte=start_time,
- startTime__lt=end_time,
- playMode='cloud').values('uid').annotate(
- play_duration=Sum('duration'), play_frequency=Count('uid'))
- with transaction.atomic():
- for item in video_play_back_time_qs:
- vod_hls_summary_qs = VodHlsSummary.objects.filter(uid=item['uid'], time=this_month_stamp)
- if vod_hls_summary_qs.exists():
- vod_hls_summary = vod_hls_summary_qs.first()
- vod_hls_summary.play_duration += item['play_duration']
- vod_hls_summary.play_frequency += 1
- vod_hls_summary.updated_time = now_time
- vod_hls_summary.save()
- else:
- VodHlsSummary.objects.create(uid=item['uid'], time=this_month_stamp, created_time=now_time,
- play_duration=item['play_duration'], play_frequency=1,
- updated_time=now_time)
- return response.json(0)
- except Exception as e:
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def collect_device_user(response):
- try:
- created_time = int(time.time())
- today = datetime.datetime.today()
- start_time = datetime.datetime(today.year, today.month, today.day)
- end_time = start_time + datetime.timedelta(days=1)
- increase_user_qs = Device_User.objects.filter(data_joined__year=today.year, data_joined__month=today.month,
- data_joined__day=today.day).values('region_country')
- start_time = CommonService.str_to_timestamp(start_time.strftime('%Y-%m-%d %H:%M:%S'))
- end_time = CommonService.str_to_timestamp(end_time.strftime('%Y-%m-%d %H:%M:%S'))
- active_user_qs = UserExModel.objects.filter(updTime__gte=start_time, updTime__lt=end_time).values(
- 'userID__region_country')
- country_qs = CountryModel.objects.all().values('id', 'region__name', 'country_name')
- country_dict = {}
- continent_dict = {}
- for item in country_qs:
- country_dict[item['id']] = item['country_name']
- continent_dict[item['country_name']] = item['region__name']
- with transaction.atomic():
- if increase_user_qs.exists():
- increase_user_count = increase_user_qs.count()
- increase_user_country_list = increase_user_qs.values('region_country').annotate(
- count=Count('region_country')).order_by('count')
- increase_user_country_dict = {}
- increase_user_continent_dict = {}
- for item in increase_user_country_list:
- country_name = country_dict.get(item['region_country'], '未知国家')
- continent_name = continent_dict.get(country_name, '未知大洲')
- increase_user_country_dict[country_name] = item['count']
- if continent_name not in increase_user_continent_dict:
- increase_user_continent_dict[continent_name] = 0
- increase_user_continent_dict[continent_name] += item['count']
- DeviceUserSummary.objects.create(time=start_time, count=increase_user_count,
- country=increase_user_country_dict, created_time=created_time,
- continent=increase_user_continent_dict)
- if active_user_qs.exists():
- active_user_count = active_user_qs.count()
- active_user_country_list = active_user_qs.values('userID__region_country').annotate(
- count=Count('userID__region_country')).order_by('count')
- active_user_country_dict = {}
- active_user_continent_dict = {}
- for item in active_user_country_list:
- country_name = country_dict.get(item['userID__region_country'], '未知国家')
- continent_name = continent_dict.get(country_name, '未知大洲')
- active_user_country_dict[country_name] = item['count']
- if continent_name not in active_user_continent_dict:
- active_user_continent_dict[continent_name] = 0
- active_user_continent_dict[continent_name] += item['count']
- DeviceUserSummary.objects.create(time=start_time, query_type=1, count=active_user_count,
- country=active_user_country_dict, created_time=created_time,
- continent=active_user_continent_dict)
- return response.json(0)
- except Exception as e:
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def collect_order(response):
- try:
- created_time = int(time.time())
- today = datetime.datetime.today()
- start_time = datetime.datetime(today.year, today.month, today.day)
- end_time = start_time + datetime.timedelta(days=1)
- start_time = CommonService.str_to_timestamp(start_time.strftime('%Y-%m-%d %H:%M:%S'))
- end_time = CommonService.str_to_timestamp(end_time.strftime('%Y-%m-%d %H:%M:%S'))
- order_qs = Order_Model.objects.filter(addTime__gte=start_time, addTime__lt=end_time,
- status=1).values('UID', 'order_type',
- 'store_meal_name', 'price',
- 'addTime', 'currency').order_by(
- 'addTime')
- uid_list = []
- all_order_qs = Order_Model.objects.filter(addTime__lt=start_time, status=1).values('UID')
- for item in all_order_qs:
- if item['UID'] not in uid_list:
- uid_list.append(item['UID'])
- # 国家表数据
- country_qs = CountryModel.objects.values('id', 'country_name')
- country_dict = {}
- for item in country_qs:
- country_dict[item['id']] = item['country_name']
- # 设备类型数据
- device_type_qs = DeviceTypeModel.objects.values('name', 'type')
- device_type_dict = {}
- for item in device_type_qs:
- device_type_dict[item['type']] = item['name']
- with transaction.atomic():
- for item in order_qs:
- is_pay = 0
- price = float(item['price'])
- currency = item['currency']
- uid_set_qs = UidSetModel.objects.filter(uid=item['UID']).values('tb_country')
- country_id = uid_set_qs[0]['tb_country'] if uid_set_qs.exists() else 0
- country_name = country_dict.get(country_id, '未知国家')
- order_type = item['order_type']
- device_info_qs = Device_Info.objects.filter(UID=item['UID']).values('Type')
- device_type_id = device_info_qs[0]['Type'] if device_info_qs.exists() else 0
- device_type_name = device_type_dict.get(device_type_id, '未知设备')
- store_meal_name = item['store_meal_name']
- add_time_stamp = item['addTime']
- add_time_str = datetime.datetime.fromtimestamp(int(add_time_stamp))
- add_time_str = datetime.datetime(add_time_str.year, add_time_str.month, add_time_str.day)
- add_time_stamp = CommonService.str_to_timestamp(add_time_str.strftime('%Y-%m-%d %H:%M:%S'))
- if price == 0:
- is_pay = 1
- order_summary_qs = OrdersSummary.objects.filter(time=add_time_stamp, query_type=1,
- service_type=order_type)
- else:
- order_summary_qs = OrdersSummary.objects.filter(time=add_time_stamp, query_type=0,
- service_type=order_type)
- if item['UID'] not in uid_list:
- pay_order_summary_qs = OrdersSummary.objects.filter(time=add_time_stamp, query_type=2,
- service_type=order_type)
- query_type = 2
- else:
- pay_order_summary_qs = OrdersSummary.objects.filter(time=add_time_stamp, query_type=3,
- service_type=order_type)
- query_type = 3
- if pay_order_summary_qs.exists():
- pay_order_summary = pay_order_summary_qs.first()
- pay_order_summary.count += 1
- temp_total = eval(pay_order_summary.total)
- if currency not in temp_total:
- temp_total[currency] = price
- else:
- temp_total[currency] = round(temp_total[currency] + price, 2)
- pay_order_summary.total = temp_total
- country_temp_dict = eval(pay_order_summary.country)
- if country_name in country_temp_dict:
- country_temp_dict[country_name]['数量'] += 1
- if currency not in country_temp_dict[country_name]:
- country_temp_dict[country_name][currency] = price
- else:
- country_temp_dict[country_name][currency] = round(
- country_temp_dict[country_name][currency] + price, 2)
- else:
- country_temp_dict[country_name] = {'数量': 1, currency: price}
- pay_order_summary.country = country_temp_dict
- device_type_temp_dict = eval(pay_order_summary.device_type)
- if device_type_name in device_type_temp_dict:
- device_type_temp_dict[device_type_name]['数量'] += 1
- if currency not in device_type_temp_dict[device_type_name]:
- device_type_temp_dict[device_type_name][currency] = price
- else:
- device_type_temp_dict[device_type_name][currency] = round(
- device_type_temp_dict[device_type_name][currency] + price, 2)
- else:
- device_type_temp_dict[device_type_name] = {'数量': 1, currency: price}
- pay_order_summary.device_type = device_type_temp_dict
- store_meal_temp_dict = eval(pay_order_summary.store_meal)
- if store_meal_name in store_meal_temp_dict:
- store_meal_temp_dict[store_meal_name]['数量'] += 1
- if currency not in store_meal_temp_dict[store_meal_name]:
- store_meal_temp_dict[store_meal_name][currency] = price
- else:
- store_meal_temp_dict[store_meal_name][currency] = round(
- store_meal_temp_dict[store_meal_name][currency] + price, 2)
- else:
- store_meal_temp_dict[store_meal_name] = {'数量': 1, currency: price}
- pay_order_summary.store_meal = store_meal_temp_dict
- pay_order_summary.save()
- else:
- final_total = {currency: price}
- country_temp_dict = {
- country_name: {
- '数量': 1,
- currency: price
- }
- }
- device_type_temp_dict = {
- device_type_name: {
- '数量': 1,
- currency: price
- }
- }
- store_meal_temp_dict = {
- store_meal_name: {
- '数量': 1,
- currency: price
- }
- }
- OrdersSummary.objects.create(time=add_time_stamp, count=1, query_type=query_type,
- service_type=order_type, total=final_total,
- country=country_temp_dict, created_time=created_time,
- device_type=device_type_temp_dict,
- store_meal=store_meal_temp_dict)
- if order_summary_qs.exists():
- order_summary = order_summary_qs.first()
- order_summary.count += 1
- temp_total = eval(order_summary.total)
- if currency not in temp_total:
- temp_total[currency] = price
- else:
- temp_total[currency] = round(temp_total[currency] + price, 2)
- order_summary.total = temp_total
- country_temp_dict = eval(order_summary.country)
- if country_name in country_temp_dict:
- if is_pay == 0:
- country_temp_dict[country_name]['数量'] += 1
- if currency not in country_temp_dict[country_name]:
- country_temp_dict[country_name][currency] = price
- else:
- country_temp_dict[country_name][currency] = round(
- country_temp_dict[country_name][currency] + price, 2)
- else:
- country_temp_dict[country_name] += 1
- else:
- if is_pay == 0:
- country_temp_dict[country_name] = {'数量': 1, currency: price}
- else:
- country_temp_dict[country_name] = 1
- order_summary.country = country_temp_dict
- device_type_temp_dict = eval(order_summary.device_type)
- if device_type_name in device_type_temp_dict:
- if is_pay == 0:
- device_type_temp_dict[device_type_name]['数量'] += 1
- if currency not in device_type_temp_dict[device_type_name]:
- device_type_temp_dict[device_type_name][currency] = price
- else:
- device_type_temp_dict[device_type_name][currency] = round(
- device_type_temp_dict[device_type_name][currency] + price, 2)
- else:
- device_type_temp_dict[device_type_name] += 1
- else:
- if is_pay == 0:
- device_type_temp_dict[device_type_name] = {'数量': 1, currency: price}
- else:
- device_type_temp_dict[device_type_name] = 1
- order_summary.device_type = device_type_temp_dict
- store_meal_temp_dict = eval(order_summary.store_meal)
- if store_meal_name in store_meal_temp_dict:
- if is_pay == 0:
- store_meal_temp_dict[store_meal_name]['数量'] += 1
- if currency not in store_meal_temp_dict[store_meal_name]:
- store_meal_temp_dict[store_meal_name][currency] = price
- else:
- store_meal_temp_dict[store_meal_name][currency] = round(
- store_meal_temp_dict[store_meal_name][currency] + price, 2)
- else:
- store_meal_temp_dict[store_meal_name] += 1
- else:
- if is_pay == 0:
- store_meal_temp_dict[store_meal_name] = {'数量': 1, currency: price}
- else:
- store_meal_temp_dict[store_meal_name] = 1
- order_summary.store_meal = store_meal_temp_dict
- order_summary.save()
- else:
- final_total = {currency: price}
- if is_pay == 0:
- country_temp_dict = {
- country_name: {
- '数量': 1,
- currency: price
- }
- }
- device_type_temp_dict = {
- device_type_name: {
- '数量': 1,
- currency: price
- }
- }
- store_meal_temp_dict = {
- store_meal_name: {
- '数量': 1,
- currency: price
- }
- }
- else:
- device_type_temp_dict = {
- device_type_name: 1
- }
- store_meal_temp_dict = {
- store_meal_name: 1
- }
- country_temp_dict = {
- country_name: 1
- }
- OrdersSummary.objects.create(time=add_time_stamp, count=1, query_type=is_pay,
- service_type=order_type, total=final_total,
- country=country_temp_dict, created_time=created_time,
- device_type=device_type_temp_dict, store_meal=store_meal_temp_dict)
- return response.json(0)
- except Exception as e:
- return response.json(500, repr(e))
- @staticmethod
- def collect_device_info(response):
- try:
- created_time = int(time.time())
- today = datetime.datetime.today()
- start_time = datetime.datetime(today.year, today.month, today.day)
- end_time = start_time + datetime.timedelta(days=1)
- start_time = CommonService.str_to_timestamp(start_time.strftime('%Y-%m-%d %H:%M:%S'))
- end_time = CommonService.str_to_timestamp(end_time.strftime('%Y-%m-%d %H:%M:%S'))
- increase_device_qs = UidSetModel.objects.filter(addTime__gte=start_time, addTime__lt=end_time).values(
- 'tb_country',
- 'uid',
- 'device_type',
- 'cloud_vod',
- 'is_ai',
- 'mobile_4g',
- 'addTime')
- video_play_back_time_qs = VideoPlaybackTimeModel.objects.filter(startTime__gte=start_time,
- startTime__lt=end_time).values('uid')
- active_device_qs = UidSetModel.objects.filter(uid__in=video_play_back_time_qs).values('tb_country',
- 'addTime',
- 'device_type',
- 'cloud_vod',
- 'is_ai',
- 'mobile_4g',
- 'uid')
- increase_device_count = increase_device_qs.count()
- active_device_count = active_device_qs.count()
- # 国家表数据
- country_qs = CountryModel.objects.values('id', 'country_name', 'region__name')
- country_dict = {}
- continent_dict = {}
- for item in country_qs:
- country_dict[item['id']] = item['country_name']
- continent_dict[item['country_name']] = item['region__name']
- # 设备类型数据
- device_type_qs = DeviceTypeModel.objects.values('name', 'type')
- device_type_dict = {}
- for item in device_type_qs:
- device_type_dict[item['type']] = item['name']
- with transaction.atomic():
- if increase_device_qs.exists():
- # 国家大洲设备数据
- increase_device_country_list = increase_device_qs.values('tb_country').annotate(
- count=Count('tb_country')).order_by('count')
- increase_device_country_dict = {}
- increase_device_continent_dict = {}
- for item in increase_device_country_list:
- country_name = country_dict.get(item['tb_country'], '未知国家')
- continent_name = continent_dict.get(country_name, '未知大洲')
- increase_device_country_dict[country_name] = item['count']
- if continent_name not in increase_device_continent_dict:
- increase_device_continent_dict[continent_name] = 0
- increase_device_continent_dict[continent_name] += item['count']
- # 设备类型数据
- increase_device_type_list = increase_device_qs.values('device_type').annotate(
- count=Count('device_type')).order_by('count')
- increase_device_type_dict = {}
- for item in increase_device_type_list:
- type_name = device_type_dict.get(item['device_type'], '未知设备类型')
- increase_device_type_dict[type_name] = item['count']
- # 云存设备类型数据
- increase_device_vod_list = increase_device_qs.filter(~Q(cloud_vod=2)).values(
- 'device_type').annotate(
- count=Count('device_type')).order_by('count')
- increase_device_vod_dict = {}
- for item in increase_device_vod_list:
- type_name = device_type_dict.get(item['device_type'], '未知设备类型')
- increase_device_vod_dict[type_name] = item['count']
- # AI设备类型数据
- increase_device_ai_list = increase_device_qs.filter(~Q(is_ai=2)).values('device_type').annotate(
- count=Count('device_type')).order_by('count')
- increase_device_ai_dict = {}
- for item in increase_device_ai_list:
- type_name = device_type_dict.get(item['device_type'], '未知设备类型')
- increase_device_ai_dict[type_name] = item['count']
- # 联通设备类型数据
- increase_device_unicom_list = increase_device_qs.filter(~Q(mobile_4g=2)).values(
- 'device_type').annotate(
- count=Count('device_type')).order_by('count')
- increase_device_unicom_dict = {}
- for item in increase_device_unicom_list:
- type_name = device_type_dict.get(item['device_type'], '未知设备类型')
- increase_device_unicom_dict[type_name] = item['count']
- DeviceInfoSummary.objects.create(time=start_time, count=increase_device_count,
- query_type=0, created_time=created_time,
- country=increase_device_country_dict,
- continent=increase_device_continent_dict,
- vod_service=increase_device_vod_dict,
- ai_service=increase_device_ai_dict,
- unicom_service=increase_device_unicom_dict,
- device_type=increase_device_type_dict)
- if active_device_qs.exists():
- # 国家大洲设备数据
- active_device_country_list = active_device_qs.values('tb_country').annotate(
- count=Count('tb_country')).order_by('count')
- active_device_country_dict = {}
- active_device_continent_dict = {}
- for item in active_device_country_list:
- country_name = country_dict.get(item['tb_country'], '未知国家')
- continent_name = continent_dict.get(country_name, '未知大洲')
- active_device_country_dict[country_name] = item['count']
- if continent_name not in active_device_continent_dict:
- active_device_continent_dict[continent_name] = 0
- active_device_continent_dict[continent_name] += item['count']
- # 设备类型数据
- active_device_type_list = active_device_qs.values('device_type').annotate(
- count=Count('device_type')).order_by('count')
- active_device_type_dict = {}
- for item in active_device_type_list:
- type_name = device_type_dict.get(item['device_type'], '未知设备类型')
- active_device_type_dict[type_name] = item['count']
- # 云存设备类型数据
- active_device_vod_list = active_device_qs.filter(~Q(cloud_vod=2)).values('device_type').annotate(
- count=Count('device_type')).order_by('count')
- active_device_vod_dict = {}
- for item in active_device_vod_list:
- type_name = device_type_dict.get(item['device_type'], '未知设备类型')
- active_device_vod_dict[type_name] = item['count']
- # AI设备类型数据
- active_device_ai_list = active_device_qs.filter(~Q(is_ai=2)).values('device_type').annotate(
- count=Count('device_type')).order_by('count')
- active_device_ai_dict = {}
- for item in active_device_ai_list:
- type_name = device_type_dict.get(item['device_type'], '未知设备类型')
- active_device_ai_dict[type_name] = item['count']
- # 联通设备类型数据
- active_device_unicom_list = active_device_qs.filter(~Q(mobile_4g=2)).values('device_type').annotate(
- count=Count('device_type')).order_by('count')
- active_device_unicom_dict = {}
- for item in active_device_unicom_list:
- type_name = device_type_dict.get(item['device_type'], '未知设备类型')
- active_device_unicom_dict[type_name] = item['count']
- DeviceInfoSummary.objects.create(time=start_time, count=active_device_count,
- query_type=1, created_time=created_time,
- country=active_device_country_dict,
- continent=active_device_continent_dict,
- vod_service=active_device_vod_dict,
- ai_service=active_device_ai_dict,
- unicom_service=active_device_unicom_dict,
- device_type=active_device_type_dict)
- return response.json(0)
- except Exception as e:
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def collect_flow_info(response):
- try:
- unicom_qs = UnicomDeviceInfo.objects.filter(card_type=0).values('iccid').distinct().order_by('iccid')
- asy = threading.Thread(target=CronCollectDataView.thread_collect_flow, args=(unicom_qs,))
- asy.start()
- return response.json(0)
- except Exception as e:
- return response.json(500, repr(e))
- @staticmethod
- def thread_collect_flow(qs):
- try:
- unicom_api = UnicomObjeect()
- redis_obj = RedisObject()
- for item in qs:
- res = unicom_api.query_device_usage_history(**item)
- if res.status_code == 200:
- res_json = res.json()
- if res_json['code'] == 0:
- redis_dict = {}
- for data in res_json['data']['deviceUsageHistory']:
- year = data.get('year', None)
- month = data.get('month', None)
- flow = data.get('flowTotalUsage', None)
- if not all([year, month, flow]):
- continue
- file = str(year) + '-' + str(month)
- redis_dict[file] = flow
- key = 'monthly_flow_' + item['iccid']
- if redis_dict:
- redis_obj.set_hash_data(key, redis_dict)
- except Exception as e:
- LOGGER.info('统计联通流量失败,时间为:{}'.format(int(time.time())))
- class CronComparedDataView(View):
- def get(self, request, *args, **kwargs):
- request.encoding = 'utf-8'
- operation = kwargs.get('operation')
- return self.validation(request.GET, request, operation)
- def post(self, request, *args, **kwargs):
- request.encoding = 'utf-8'
- operation = kwargs.get('operation')
- return self.validation(request.POST, request, operation)
- def validation(self, request_dict, request, operation):
- response = ResponseObject()
- if operation == 'PaypalOrder': # 定时对比paypal订单
- return self.compared_paypal_order(response)
- elif operation == 'WechatOrder': # 定时对比微信订单
- return self.compared_wechat_order(response)
- else:
- return response.json(404)
- @staticmethod
- def compared_paypal_order(response):
- today = datetime.datetime.today()
- start_date = today - datetime.timedelta(days=2)
- start_date = datetime.datetime(start_date.year, start_date.month, start_date.day)
- end_date = start_date + datetime.timedelta(days=1)
- try:
- data = (
- ('start_date', '{}-{}-{}T08:00:00-0800'.format(start_date.year, start_date.month, start_date.day)),
- ('end_date', '{}-{}-{}T08:00:00-0800'.format(end_date.year, end_date.month, end_date.day)),
- ('fields', 'all'),
- ('page_size', '500'),
- ('page', '1'),
- ('transaction_status', 'S')
- )
- order_list = PayPalService(PAYPAL_CRD['client_id'], PAYPAL_CRD['client_secret']).get_transactions(data)
- thread = threading.Thread(target=CronComparedDataView.thread_compared_pyapal_order,
- args=(order_list['transaction_details'],))
- thread.start()
- return response.json(0)
- except Exception as e:
- LOGGER.info('CronComparedDataView.compared_paypal_order, errLine:{}, errMsg:{}'.format(
- e.__traceback__.tb_lineno, repr(e)))
- return response.json(500)
- @staticmethod
- def thread_compared_pyapal_order(order_list):
- now_time = int(time.time())
- for item in order_list:
- trade_no = item['transaction_info']['transaction_id']
- if item['transaction_info']['transaction_event_code'] == 'T1107':
- trade_no = item['transaction_info']['paypal_reference_id']
- order_qs = Order_Model.objects.filter(trade_no=trade_no, payType=1)
- if not order_qs.exists():
- transaction_subject = item['transaction_info'].get('transaction_subject', '')
- agreement_id = item['transaction_info'].get('paypal_reference_id', '')
- pay_time = datetime.datetime.strptime(item['transaction_info']['transaction_initiation_date'],
- "%Y-%m-%dT%H:%M:%S%z").timestamp()
- order_dict = {
- 'trade_no': trade_no,
- 'agreement_id': agreement_id,
- 'pay_time': pay_time,
- 'username': item['payer_info']['email_address'],
- 'price': item['transaction_info']['transaction_amount']['value'],
- 'pay_type': 1,
- 'upd_time': now_time,
- 'status': 0,
- 'meal_name': transaction_subject
- }
- if agreement_id:
- order_dict['pay_type'] = 0
- order_dict['meal_name'] = 'paypal_cycle'
- order_dict['order_id'] = transaction_subject
- params = {'trade_no': trade_no}
- response = requests.get('https://www.zositeche.com/testApi/checkOrderExist', params=params)
- if response.status_code != 200:
- # 如果响应失败,记录在数据库
- AbnormalOrder.objects.create(**order_dict)
- continue
- result = response.json()
- if result['result_code'] != 0 or not result['result']['is_exist']:
- # 如果响应结果为空,记录在数据库
- AbnormalOrder.objects.create(**order_dict)
- @staticmethod
- def compared_wechat_order(response):
- today = datetime.datetime.today()
- start_date = today - datetime.timedelta(days=1)
- start_date = start_date.strftime("%Y%m%d")
- try:
- order_list = WechatPayObject().download_bill(start_date)
- thread = threading.Thread(target=CronComparedDataView.thread_compared_wechat_order,
- args=(order_list,))
- thread.start()
- return response.json(0)
- except Exception as e:
- LOGGER.info('CronComparedDataView.compared_paypal_order, errLine:{}, errMsg:{}'.format(
- e.__traceback__.tb_lineno, repr(e)))
- return response.json(500)
- @staticmethod
- def thread_compared_wechat_order(order_list):
- now_time = int(time.time())
- for order in order_list:
- if order['交易类型'] != '`APP':
- continue
- order_id = order['商户订单号'].replace('`', '')
- order_qs = Order_Model.objects.filter(orderID=order_id)
- if not order_qs.exists():
- order_dict = {
- 'trade_no': order['微信订单号'].replace('`', ''),
- 'order_id': order_id,
- 'pay_type': 3,
- 'price': order['订单金额'].replace('`', ''),
- 'pay_time': int(datetime.datetime.strptime(order['\ufeff交易时间'], "`%Y-%m-%d %H:%M:%S").timestamp()),
- 'upd_time': now_time,
- 'meal_name': order['商品名称'].replace('`', ''),
- }
- AbnormalOrder.objects.create(**order_dict)
|