1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038 |
- #!/usr/bin/python3.6
- # -*- coding: utf-8 -*-
- #
- # Copyright (C) 2022 #
- # @Time : 2022/4/1 11:27
- # @Author : ming
- # @Email : zhangdongming@asj6.wecom.work
- # @File : CronTaskController.py
- # @Software: PyCharm
- import datetime
- import io
- import json
- import threading
- import time
- import zipfile
- import paypalrestsdk
- import requests
- import csv
- import math
- from django.db import connection, connections, transaction
- from django.db.models import Q, Sum, Count
- from django.views import View
- from Ansjer.config import USED_SERIAL_REDIS_LIST, UNUSED_SERIAL_REDIS_LIST, CONFIG_INFO, CONFIG_US, \
- RESET_REGION_ID_SERIAL_REDIS_LIST, LOGGER, PAYPAL_CRD, CONFIG_EUR, DETECT_PUSH_DOMAINS, ACCESS_KEY_ID, \
- SECRET_ACCESS_KEY, REGION_NAME, CONFIG_CN
- from Model.models import Device_User, Device_Info, UidSetModel, UID_Bucket, Unused_Uid_Meal, Order_Model, StsCrdModel, \
- VodHlsModel, ExperienceContextModel, AiService, VodHlsSummary, VideoPlaybackTimeModel, DeviceUserSummary, \
- CountryModel, DeviceTypeModel, OrdersSummary, DeviceInfoSummary, CompanySerialModel, \
- CloudLogModel, UidCloudStorageCount, UserExModel, DeviceDomainRegionModel, VodHlsTag, VodHlsTagType, \
- Store_Meal, Lang, VodBucketModel, UnicomComboOrderInfo, UnicomDeviceInfo, AbnormalOrder, DailyReconciliation, \
- CustomizedPush, UIDCompanySerialModel, UIDModel, LogModel, OperatingCosts, UidBucketStatistics, AppScannedSerial, \
- SerialUnbindUID, UidUserModel, UidPushModel, iotdeviceInfoModel, ExperienceAiModel
- from Object.AWS.AmazonS3Util import AmazonS3Util
- from Object.AWS.S3Email import S3Email
- from Object.RedisObject import RedisObject
- from Object.ResponseObject import ResponseObject
- from Object.utils import LocalDateTimeUtil
- from Service.CommonService import CommonService
- from Service.EquipmentInfoService import EQUIPMENT_INFO_MODEL_LIST, EquipmentInfoService
- from Service.VodHlsService import SplitVodHlsObject
- from Object.UnicomObject import UnicomObjeect
- from Object.WechatPayObject import WechatPayObject
- from Object.AliPayObject import AliPayObject
- from dateutil.relativedelta import relativedelta
- class CronDelDataView(View):
- def get(self, request, *args, **kwargs):
- request.encoding = 'utf-8'
- operation = kwargs.get('operation')
- return self.validation(request.GET, request, operation)
- def post(self, request, *args, **kwargs):
- request.encoding = 'utf-8'
- operation = kwargs.get('operation')
- return self.validation(request.POST, request, operation)
- def validation(self, request_dict, request, operation):
- response = ResponseObject()
- if operation == 'delAccessLog': # 定时删除访问接口数据
- return self.delAccessLog(response)
- elif operation == 'delPushInfo': # 定时删除推送数据
- return self.delPushInfo(response)
- elif operation == 'delPushInfoV2': # 定时删除推送数据V2
- return self.delPushInfoV2(response)
- elif operation == 'delSysMsg': # 定时删除系统消息数据
- return self.delSysMsg(response)
- elif operation == 'delVodHls': # 定时删除云存播放列表
- return self.delVodHls(response)
- elif operation == 'delCloudLog': # 定时删除云存接口数据
- return self.delCloudLog(response)
- elif operation == 'delTesterDevice': # 定时删除测试账号下的设备数据
- return self.delTesterDevice(response)
- elif operation == 'delAppLog': # 定时删除app日志
- return self.delAppLog(response)
- elif operation == 'delDeviceLog': # 定时删除设备日志
- return self.delDeviceLog(response)
- elif operation == 'UpdateConfiguration': # 定时更新配置
- return self.UpdateConfiguration(response)
- elif operation == 'cloud-log':
- return self.uid_cloud_storage_upload_count(response)
- elif operation == 'delDeviceLog': # 定时删除设备日志
- return self.del_device_log(response)
- else:
- return response.json(404)
- @staticmethod
- def UpdateConfiguration(response):
- """
- 定时更新配置
- @param response: 响应对象
- @return:
- """
- try:
- ucode_list = ['823C01552AA',
- '823C01550AA',
- '823C01550XA',
- '823C01850XA',
- '730201350AA',
- '730201350AA',
- '730201450AA',
- '730201450MA',
- '72V201252AA',
- '72V201253AA',
- '72V201353AA',
- '72V201354AA',
- '72V201355AA',
- '72V201254AA',
- 'V82301850AA',
- 'V82301850XA',
- '72V201257AA', '72V201256AA']
- UidSetModel.objects.filter(ucode__in=ucode_list, is_human=0).update(is_human=1)
- ucode_list = ['72V201257AA', '72V201254AA'] # 4G规格码
- UidSetModel.objects.filter(ucode__in=ucode_list, mobile_4g=0).update(mobile_4g=1)
- # 根据设备规格码定时更新默认算法类型类型
- ucode_list = ['823C01552AA', '823C01550AA', '823C01550XA', 'C18201550KA',
- '823C01550TA', '823C01550VA', '823C01850XA', 'C18201850KA',
- '823C01850TA', '823C01850VA', 'C22501850VA']
- UidSetModel.objects.filter(ucode__in=ucode_list, ai_type=0).update(ai_type=47)
- ucode_list = ['730201350AA', '730201450AA', '730201450MA', '730201450NA']
- UidSetModel.objects.filter(ucode__in=ucode_list, ai_type=0).update(ai_type=7)
- ucode_list = ['V82301850AA', 'V82301850XA']
- UidSetModel.objects.filter(ucode__in=ucode_list, ai_type=0).update(ai_type=2031)
- # 根据设备规格码更新默认个性化语音值
- ucode_list = ['823C01552AA', '823C01550XA', 'C18201550KA', '823C01550TA',
- '823C01550VA', '823C01850XA', 'C18201850KA', '823C01850TA', '823C01850VA',
- '730201450AA', '730201450MA', '730201450NA', '72V201252AA', '72V201253AA',
- '72V201353AA', '72V201354AA', '72V201355AA', '72V201254AA', 'C22501850VA',
- 'V82301850AA', 'V82301850XA', '72V201257AA', '72V201256AA']
- UidSetModel.objects.filter(ucode__in=ucode_list, is_custom_voice=0).update(is_custom_voice=1)
- # 根据设备规格码更新is_ai
- ucode_list = ['823C01552AA', '823C01550XA', 'C18201550KA', '823C01550TA',
- '823C01550VA', '823C01850XA', 'C18201850KA', '823C01850TA', '823C01850VA',
- '730201450AA', '730201450MA', '730201450NA', '72V201252AA', '72V201253AA',
- '72V201353AA', '72V201354AA', '72V201355AA', '72V201254AA', 'C22501850VA',
- 'V82301850AA', 'V82301850XA', '72V201257AA', '72V201256AA', '730201350AA']
- UidSetModel.objects.filter(ucode__in=ucode_list, is_ai=2).update(is_ai=1)
- # 根据设备规格码更新alexa
- ucode_list = ['823C01552AA', '823C01550AA', '823C01550XA', '522001352AA',
- '823C01550TA', '823C01550VA', '823C01850XA', 'C18201850KA', '823C01850TA',
- 'C22501850VA', 'V82301850AA', 'V82301850XA', '730201350AA', '72V201252AA',
- '72V201253AA', '72V201353AA', '72V201354AA', '72V201355AA', '72V201256AA']
- UidSetModel.objects.filter(ucode__in=ucode_list, is_alexa=0).update(is_alexa=1)
- # 根据ai类型和设备类型修改516 type
- uid_list = UidSetModel.objects.filter(ai_type=18563).values_list('uid')
- Device_Info.objects.filter(Type=10, UID__in=uid_list).update(Type=24)
- return response.json(0)
- except Exception as e:
- LOGGER.info('UpdateConfiguration异常详情,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def delAppLog(response):
- """
- 定时删除app日志
- @param response: 响应对象
- @return:
- """
- nowTime = int(time.time())
- try:
- cursor = connection.cursor()
- month_ago_time = nowTime - 30 * 24 * 60 * 60 # 保留近30天的数据
- sql = 'DELETE FROM `app_log` WHERE add_time<{}'.format(month_ago_time)
- cursor.execute(sql)
- cursor.close()
- return response.json(0)
- except Exception as e:
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def delDeviceLog(response):
- """
- 定时删除设备日志
- @param response: 响应对象
- @return:
- """
- nowTime = int(time.time())
- size = 5000
- try:
- cursor = connection.cursor()
- month_ago_time = nowTime - 30 * 24 * 60 * 60 # 保留近30天的数据
- month_ago_time_str = CommonService.timestamp_to_str(month_ago_time)
- sql = "DELETE FROM `device_log` WHERE add_time<'{}' LIMIT {}".format(month_ago_time_str, size)
- cursor.execute(sql)
- cursor.close()
- return response.json(0)
- except Exception as e:
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def uid_cloud_storage_upload_count(response):
- try:
- now_time = int(time.time())
- local_time = LocalDateTimeUtil.get_before_days_timestamp(now_time)
- format_str = '%Y-%m-%d'
- date_str = LocalDateTimeUtil.time_stamp_to_time(local_time, format_str)
- start_time, end_time = LocalDateTimeUtil.get_start_and_end_time(date_str, format_str)
- cs_uid_qs = UID_Bucket.objects.filter(addTime__gte=int(1669824000)).values('uid')
- if not cs_uid_qs.exists():
- return response.json(0)
- for item in cs_uid_qs:
- uid = item['uid']
- cloud_log_qs = CloudLogModel.objects.filter(uid=uid, operation=r'cloudstorage/storeplaylist',
- time__gte=start_time, time__lte=end_time)
- cloud_log_qs = cloud_log_qs.values('uid')[0:1]
- if not cloud_log_qs.exists():
- continue
- count_data = {'uid': uid, 'count': cloud_log_qs.count(), 'created_time': end_time,
- 'updated_time': end_time}
- UidCloudStorageCount.objects.create(**count_data)
- return response.json(0)
- except Exception as e:
- LOGGER.info('异常详情,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- return response.json(500)
- @staticmethod
- def delAccessLog(response):
- try:
- cursor = connection.cursor()
- # 删除7天前的数据
- last_week = LocalDateTimeUtil.get_last_week()
- sql = 'DELETE FROM access_log WHERE time < %s limit %s'
- cursor.execute(sql, [last_week, 10000])
- # 关闭游标
- cursor.close()
- connection.close()
- return response.json(0)
- except Exception as e:
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @classmethod
- def delPushInfo(cls, response):
- now_time = int(time.time())
- try:
- # 当前时间转日期
- local_date_now = str(datetime.datetime.fromtimestamp(int(now_time)).date())
- # 根据日期获取周几
- week_val = LocalDateTimeUtil.date_to_week(local_date_now)
- # 根据当前时间获取7天前时间戳
- expiration_time = LocalDateTimeUtil.get_before_days_timestamp(now_time, 7)
- # 异步删除推送消息
- kwargs = {
- 'week_val': week_val,
- 'expiration_time': expiration_time
- }
- del_push_info_thread = threading.Thread(
- target=cls.del_push_info_data,
- kwargs=kwargs)
- del_push_info_thread.start()
- return response.json(0)
- except Exception as e:
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def del_push_info_data(**kwargs):
- cursor = connections['mysql02'].cursor()
- # 获取删除星期列表
- week_val = kwargs['week_val']
- del_week_val_list = [i for i in range(1, 8)]
- # 移除当天和前后两天
- del_week_val_list.remove(week_val)
- if week_val == 1:
- pre_week_val = 7
- else:
- pre_week_val = week_val - 1
- del_week_val_list.remove(pre_week_val)
- if week_val == 7:
- nex_week_val = 1
- else:
- nex_week_val = week_val + 1
- del_week_val_list.remove(nex_week_val)
- expiration_time = kwargs['expiration_time']
- # 每次删除条数
- size = 5000
- # 删除7天前的数据
- sql = "DELETE FROM equipment_info WHERE addTime<= %s LIMIT %s "
- cursor.execute(sql, [expiration_time, size])
- for week_val in del_week_val_list:
- if week_val == 1:
- sql = "DELETE FROM equipment_info_monday WHERE add_time<= %s LIMIT %s "
- if week_val == 2:
- sql = "DELETE FROM equipment_info_tuesday WHERE add_time<= %s LIMIT %s "
- if week_val == 3:
- sql = "DELETE FROM equipment_info_wednesday WHERE add_time<= %s LIMIT %s "
- if week_val == 4:
- sql = "DELETE FROM equipment_info_thursday WHERE add_time<= %s LIMIT %s "
- if week_val == 5:
- sql = "DELETE FROM equipment_info_friday WHERE add_time<= %s LIMIT %s "
- if week_val == 6:
- sql = "DELETE FROM equipment_info_saturday WHERE add_time<= %s LIMIT %s "
- if week_val == 7:
- sql = "DELETE FROM equipment_info_sunday WHERE add_time<= %s LIMIT %s "
- cursor.execute(sql, [expiration_time, size])
- # 关闭游标
- cursor.close()
- @classmethod
- def delPushInfoV2(cls, response):
- try:
- del_push_info_thread = threading.Thread(
- target=cls.del_push_info_data_v2)
- del_push_info_thread.start()
- return response.json(0)
- except Exception as e:
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def delSysMsg(response):
- try:
- cursor = connections['mysql02'].cursor()
- # 获取90天前时间戳
- now_time = int(time.time())
- expiration_time = LocalDateTimeUtil.get_before_days_timestamp(now_time, 90)
- # 每次删除条数
- size = 5000
- sql = "DELETE FROM sys_msg WHERE addTime< %s LIMIT %s "
- cursor.execute(sql, [expiration_time, size])
- # 关闭游标
- cursor.close()
- return response.json(0)
- except Exception as e:
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def del_push_info_data_v2():
- cursor = connections['mysql02'].cursor()
- # 获取7天前时间戳
- now_time = int(time.time())
- expiration_time = LocalDateTimeUtil.get_before_days_timestamp(now_time, 7)
- # 每次删除条数
- size = 5000
- for i in range(1, len(EQUIPMENT_INFO_MODEL_LIST) + 1):
- sql = "DELETE FROM equipment_info_{} WHERE add_time< %s LIMIT %s ".format(i)
- cursor.execute(sql, [expiration_time, size])
- # 关闭游标
- cursor.close()
- @staticmethod
- def delVodHls(response):
- nowTime = int(time.time())
- try:
- CronDelDataView.del_vod_hls_tag()
- cursor = connection.cursor()
- month_ago_time = nowTime - 3 * 30 * 24 * 60 * 60 # 删除3个月前的数据
- sql = 'DELETE FROM `vod_hls` WHERE endTime<{} LIMIT 50000'.format(month_ago_time)
- cursor.execute(sql)
- cursor.close()
- # 删除vod_hls分表数据
- split_vod_hls_obj = SplitVodHlsObject()
- split_vod_hls_obj.del_vod_hls_data(end_time__lt=month_ago_time)
- return response.json(0)
- except Exception as e:
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def del_vod_hls_tag():
- """
- 删除AI标签记录
- """
- e_time = LocalDateTimeUtil.get_before_days_timestamp(int(time.time()), 30)
- VodHlsTagType.objects.filter(created_time__lt=e_time).delete()
- VodHlsTag.objects.filter(created_time__lt=e_time).delete()
- @staticmethod
- def delCloudLog(response):
- nowTime = int(time.time())
- cursor = connection.cursor()
- try:
- # 删除3个月前的数据
- sql = "DELETE FROM `cloud_log` WHERE time<={} LIMIT 50000".format(
- nowTime - 3 * 30 * 24 * 60 * 60)
- cursor.execute(sql)
- # 关闭游标
- cursor.close()
- return response.json(0)
- except Exception as e:
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def delTesterDevice(response):
- try:
- userID_list = [
- 'tech01@ansjer.com',
- 'tech02@ansjer.com',
- 'tech03@ansjer.com',
- 'tech04@ansjer.com',
- 'tech05@ansjer.com',
- 'tech06@ansjer.com',
- 'tech07@ansjer.com',
- 'tech08@ansjer.com',
- 'tech09@ansjer.com',
- 'tech10@ansjer.com',
- 'fix01@ansjer.com',
- 'fix02@ansjer.com',
- 'fix03@ansjer.com',
- 'fix04@ansjer.com',
- 'fix05@ansjer.com']
- device_user = Device_User.objects.filter(username__in=userID_list)
- device_info_qs = Device_Info.objects.filter(
- userID__in=device_user).values('UID')
- uid_list = []
- for device_info in device_info_qs:
- uid_list.append(device_info['UID'])
- with transaction.atomic():
- # 删除设备云存相关数据
- UidSetModel.objects.filter(uid__in=uid_list).delete()
- UID_Bucket.objects.filter(uid__in=uid_list).delete()
- Unused_Uid_Meal.objects.filter(uid__in=uid_list).delete()
- # Order_Model.objects.filter(UID__in=uid_list).delete()
- StsCrdModel.objects.filter(uid__in=uid_list).delete()
- VodHlsModel.objects.filter(uid__in=uid_list).delete()
- # 删除vod_hls分表数据
- split_vod_hls_obj = SplitVodHlsObject()
- split_vod_hls_obj.del_vod_hls_data(uid__in=uid_list)
- ExperienceContextModel.objects.filter(uid__in=uid_list).delete()
- Device_Info.objects.filter(userID__in=device_user).delete()
- return response.json(0)
- except Exception as e:
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def del_device_log(response):
- """
- 定时删除设备日志
- @param response: 响应对象
- @return:
- """
- nowTime = int(time.time())
- try:
- cursor = connection.cursor()
- month_ago_time = nowTime - 30 * 24 * 60 * 60 # 保留近30天的数据
- sql = 'DELETE FROM `device_log` WHERE unix_timestamp(add_time)<{}'.format(month_ago_time)
- cursor.execute(sql)
- cursor.close()
- return response.json(0)
- except Exception as e:
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- class CronUpdateDataView(View):
- def get(self, request, *args, **kwargs):
- request.encoding = 'utf-8'
- operation = kwargs.get('operation')
- return self.validation(request.GET, request, operation)
- def post(self, request, *args, **kwargs):
- request.encoding = 'utf-8'
- operation = kwargs.get('operation')
- return self.validation(request.POST, request, operation)
- def validation(self, request_dict, request, operation):
- response = ResponseObject()
- if operation == 'updateUnusedUidBucket': # 定时更新过期云存关联的未使用套餐状态
- return self.updateUnusedUidBucket(response)
- elif operation == 'updateUnusedAiService': # 定时更新过期ai关联的未使用套餐状态
- return self.updateUnusedAiService(response)
- elif operation == 'reqUpdateSerialStatus': # 定时请求更新序列号状态
- return self.reqUpdateSerialStatus(response)
- elif operation == 'updateSerialStatus': # 更新序列号状态
- return self.updateSerialStatus(request_dict, response)
- elif operation == 'deleteUidData': # 清除uid数据
- return self.deleteUidData(request_dict, response)
- elif operation == 'reset-region-id': # 重置地区id
- return self.reset_region_id(request_dict, response)
- elif operation == 'updateVodMeal': # 定时修改体验套餐有效期为1个月
- return self.update_vod_meal(request_dict, response)
- elif operation == 'checkCustomizedPush': # 定时检查定制化推送,重新执行没有推送成功的请求
- return self.check_customized_push(response)
- else:
- return response.json(404)
- @staticmethod
- def updateUnusedUidBucket(response):
- """
- 监控云存套餐过期修改状态
- @param response:
- @return:
- """
- # 定时更新已过期套餐修改状态为2
- now_time = int(time.time())
- expired_uid_bucket = UID_Bucket.objects.filter(endTime__lte=now_time)
- expired_uid_bucket = expired_uid_bucket.filter(~Q(use_status=2)).values('id')
- if expired_uid_bucket.exists():
- # 如果没有未使用套餐,或下个未使用套餐不是AI云存套餐,mqtt下发停用AI指令
- # 下个未使用套餐是AI云存套餐,mqtt下发启用AI指令
- for bucket in expired_uid_bucket:
- uid_bucket = UID_Bucket.objects.get(id=bucket['id'])
- uid = uid_bucket.uid
- # 存在序列号则为使用序列号作为物品名
- thing_name = CommonService.query_serial_with_uid(uid)
- topic_name = 'ansjer/generic/{}'.format(thing_name)
- msg = {'commandType': 'AIDisable'}
- if not uid_bucket.has_unused:
- CommonService.req_publish_mqtt_msg(thing_name, topic_name, msg)
- else:
- next_unused = Unused_Uid_Meal.objects.filter(uid=uid).order_by('addTime').first()
- if next_unused is None or not getattr(next_unused, 'is_ai', 0):
- CommonService.req_publish_mqtt_msg(thing_name, topic_name, msg)
- else:
- msg = {'commandType': 'AIEnable'}
- CommonService.req_publish_mqtt_msg(thing_name, topic_name, msg)
- expired_uid_bucket.update(use_status=2)
- # 监控有未使用套餐则自动生效
- expired_uid_buckets = UID_Bucket.objects.filter(endTime__lte=now_time, has_unused=1).values("id", "uid")[0:1000]
- for expired_uid_bucket in expired_uid_buckets:
- unuseds = Unused_Uid_Meal.objects.filter(
- uid=expired_uid_bucket['uid']).values(
- "id",
- "uid",
- "channel",
- "addTime",
- "expire",
- "is_ai",
- "bucket_id",
- "order_id").order_by('addTime')
- if not unuseds.exists():
- continue
- unused = unuseds[0]
- try:
- with transaction.atomic():
- count_unused = Unused_Uid_Meal.objects.filter(uid=expired_uid_bucket['uid']).count()
- has_unused = 1 if count_unused > 1 else 0
- end_time = CommonService.calcMonthLater(unused['expire'])
- UID_Bucket.objects.filter(
- uid=expired_uid_bucket['uid']).update(
- channel=unused['channel'],
- endTime=end_time,
- bucket_id=unused['bucket_id'],
- updateTime=now_time,
- use_status=1,
- has_unused=has_unused,
- orderId=unused['order_id'])
- if unused['is_ai']:
- ai_service = AiService.objects.filter(uid=expired_uid_bucket['uid'], channel=unused['channel'])
- if ai_service.exists():
- ai_service.update(updTime=now_time, use_status=1, orders_id=unused['order_id'],
- endTime=end_time)
- else:
- AiService.objects.create(uid=expired_uid_bucket['uid'], channel=unused['channel'],
- detect_status=1, addTime=now_time, orders_id=unused['order_id'],
- updTime=now_time, endTime=end_time, use_status=1)
- Unused_Uid_Meal.objects.filter(id=unused['id']).delete()
- StsCrdModel.objects.filter(uid=expired_uid_bucket['uid']).delete() # 删除sts记录
- except Exception as e:
- print(repr(e))
- continue
- return response.json(0)
- @staticmethod
- def updateUnusedAiService(response):
- now_time = int(time.time())
- ai_service_qs = AiService.objects.filter(
- endTime__lte=now_time,
- use_status=1).values(
- 'id',
- 'uid')[
- 0:200]
- for ai_service in ai_service_qs:
- try:
- with transaction.atomic():
- AiService.objects.filter(
- id=ai_service['id']).update(
- use_status=2) # 更新过期ai订单状态
- # 如果存在未使用套餐,更新为使用
- unused_ai_service = AiService.objects.filter(
- uid=ai_service['uid'],
- use_status=0).order_by('addTime')[
- :1].values(
- 'id',
- 'endTime')
- if unused_ai_service.exists():
- # 未使用套餐的endTime在购买的时候保存为有效时间
- effective_day = unused_ai_service[0]['endTime']
- endTime = now_time + effective_day
- AiService.objects.filter(
- id=unused_ai_service[0]['id']).update(
- use_status=1, endTime=endTime, updTime=now_time)
- except Exception:
- continue
- return response.json(0)
- @classmethod
- def reqUpdateSerialStatus(cls, response):
- redis_obj = RedisObject()
- # 更新已使用序列号其他服务器的状态
- used_serial_redis_list = redis_obj.lrange(USED_SERIAL_REDIS_LIST, 0, -1) # 读取redis已使用序列号
- if used_serial_redis_list:
- LOGGER.info('---请求更新已使用序列号列表---used_serial_redis_list:{}'.format(used_serial_redis_list))
- used_serial_redis_list = [str(i, 'utf-8') for i in used_serial_redis_list]
- cls.do_request_function(used_serial_redis_list, 3)
- # 更新未使用序列号其他服务器的状态
- unused_serial_redis_list = redis_obj.lrange(UNUSED_SERIAL_REDIS_LIST, 0, -1) # 读取redis未使用序列号
- if unused_serial_redis_list:
- LOGGER.info('---请求更新未使用序列号列表---unused_serial_redis_list:{}'.format(unused_serial_redis_list))
- unused_serial_redis_list = [str(i, 'utf-8') for i in unused_serial_redis_list]
- cls.do_request_function(unused_serial_redis_list, 1)
- # 重置地区id
- reset_region_id_serial_redis_list = redis_obj.lrange(RESET_REGION_ID_SERIAL_REDIS_LIST, 0, -1) # 读取redis未使用序列号
- if reset_region_id_serial_redis_list:
- LOGGER.info('---请求重置地区id的序列号列表---:{}'.format(reset_region_id_serial_redis_list))
- reset_region_id_serial_redis_list = [str(i, 'utf-8') for i in reset_region_id_serial_redis_list]
- cls.do_request_reset_region_id(reset_region_id_serial_redis_list)
- return response.json(0)
- @staticmethod
- def do_request_function(serial_redis_list, status):
- """
- 请求更新序列号状态
- @param serial_redis_list: 序列号redis列表
- @param status: 状态, 1: 未使用, 3: 已占用
- """
- data = {
- 'serial_redis_list': str(serial_redis_list),
- 'status': status
- }
- # 确认域名列表
- orders_domain_name_list = CommonService.get_orders_domain_name_list()
- redis_obj = RedisObject()
- LOGGER.info('---请求更新序列号线程---data:{},orders_domain_name_list:{}'.format(data, orders_domain_name_list))
- try:
- requests_failed_flag = False # 请求失败标志位
- for domain_name in orders_domain_name_list:
- url = '{}cron/update/updateSerialStatus'.format(domain_name)
- response = requests.post(url=url, data=data, timeout=5)
- LOGGER.info('---请求更新序列号响应时间---:{}'.format(response.elapsed.total_seconds()))
- result = response.json()
- if result['result_code'] != 0: # 请求失败标志位置位
- requests_failed_flag = True
- break
- # 状态为未使用,重置扫码记录和美洲服的地区id
- if status == 1:
- # 扫码记录
- AppScannedSerial.objects.filter(serial__in=serial_redis_list).delete()
- # 地区id
- # 美洲服直接更新
- if CONFIG_INFO == CONFIG_US:
- DeviceDomainRegionModel.objects.filter(~Q(region_id=0), serial_number__in=serial_redis_list). \
- update(region_id=0)
- # 其他服请求到美洲服更新
- else:
- req_url = 'https://www.dvema.com/cron/update/reset-region-id'
- req_data = {
- 'serial_redis_list': str(serial_redis_list)
- }
- response = requests.post(url=req_url, data=req_data, timeout=5)
- LOGGER.info('---请求重置地区id响应时间---:{}'.format(response.elapsed.total_seconds()))
- result = response.json()
- if result['result_code'] != 0: # 请求失败标志位置位
- requests_failed_flag = True
- break
- if not requests_failed_flag: # 请求成功删除redis序列号
- if status == 1:
- for i in serial_redis_list:
- redis_obj.lrem(UNUSED_SERIAL_REDIS_LIST, 0, i)
- elif status == 3:
- for i in serial_redis_list:
- redis_obj.lrem(USED_SERIAL_REDIS_LIST, 0, i)
- except Exception as e:
- LOGGER.info('---更新序列号状态异常---:{}'.format(repr(e)))
- @staticmethod
- def do_request_reset_region_id(reset_region_id_serial_redis_list):
- """
- 请求重置地区id
- @param reset_region_id_serial_redis_list: 序列号redis列表
- """
- redis_obj = RedisObject()
- requests_failed_flag = False # 请求失败标志位
- data = {
- 'serial_redis_list': str(reset_region_id_serial_redis_list),
- }
- url = 'https://www.dvema.com/cron/update/reset-region-id'
- try:
- response = requests.post(url=url, data=data, timeout=5)
- result = response.json()
- if result['result_code'] != 0: # 请求失败标志位置位
- requests_failed_flag = True
- if not requests_failed_flag: # 请求成功删除redis序列号
- for serial in reset_region_id_serial_redis_list:
- redis_obj.lrem(RESET_REGION_ID_SERIAL_REDIS_LIST, 0, serial)
- except Exception as e:
- LOGGER.info('---请求重置地区id异常---:{}'.format(repr(e)))
- @staticmethod
- def updateSerialStatus(request_dict, response):
- """
- 更新序列号状态
- @param request_dict: 请求参数
- @request_dict serial_redis_list: 序列号redis列表
- @request_dict status: 状态, 1: 未使用, 3: 已占用
- @param response: 响应对象
- """
- serial_redis_list = request_dict.get('serial_redis_list', None)
- status = request_dict.get('status', None)
- LOGGER.info('---更新序列号状态参数---serial_redis_list:{},status:{}'.format(serial_redis_list, status))
- if not all([serial_redis_list, status]):
- return response.json(444)
- now_time = int(time.time())
- try:
- serial_redis_list = eval(serial_redis_list)
- CompanySerialModel.objects.filter(serial_number__in=serial_redis_list).update(status=int(status),
- update_time=now_time)
- uid_serial_qs = UIDCompanySerialModel.objects.filter(company_serial__serial_number__in=serial_redis_list)
- if uid_serial_qs:
- uid_list = list(uid_serial_qs.values_list('uid__uid', flat=True))
- serial_list = list(uid_serial_qs.values_list('company_serial__serial_number', flat=True))
- UIDModel.objects.filter(uid__in=uid_list).update(status=3, mac='', update_time=now_time)
- uid_serial_qs.delete()
- # 记录操作日志
- content = json.loads(json.dumps(request_dict))
- log = {
- 'ip': '127.0.0.1',
- 'user_id': 1,
- 'status': 200,
- 'time': now_time,
- 'content': json.dumps(content),
- 'url': 'cron/update/updateSerialStatus',
- 'operation': '序列号{}解绑uid: {}'.format(serial_list, uid_list),
- }
- LogModel.objects.create(**log)
- return response.json(0)
- except Exception as e:
- LOGGER.info('---更新序列号状态异常---:{}'.format(repr(e)))
- return response.json(500)
- @staticmethod
- def deleteUidData(request_dict, response):
- """
- 清除uid数据
- @param request_dict: 请求参数
- @request_dict serial_redis_list: 序列号redis列表
- @param response: 响应对象
- """
- serial_unbind_uid_qs = SerialUnbindUID.objects.filter(status=0).values('serial', 'uid')
- # 没有需要清除的数据直接返回
- if not serial_unbind_uid_qs.exists():
- return response.json(0)
- # 获取序列号,uid列表
- serial_list, uid_list = [], []
- for serial_unbind_uid in serial_unbind_uid_qs:
- serial_list.append(serial_unbind_uid['serial'])
- uid_list.append(serial_unbind_uid['uid'])
- now_time = int(time.time())
- redis_obj = RedisObject()
- try:
- with transaction.atomic():
- # 更新序列号解绑uid表状态
- serial_unbind_uid_qs.update(status=1, updated_time=now_time)
- # 更新序列号状态
- CompanySerialModel.objects.filter(serial_number__in=serial_list).update(status=1, update_time=now_time)
- UIDCompanySerialModel.objects.filter(company_serial__serial_number__in=serial_list).delete()
- # 删除设备相关数据,参考后台的设备重置删除的数据
- Device_Info.objects.filter(UID__in=uid_list).delete()
- UidSetModel.objects.filter(uid__in=uid_list).delete()
- UidUserModel.objects.filter(UID__in=uid_list).delete()
- UidPushModel.objects.filter(uid_set__uid__in=uid_list).delete()
- iotdeviceInfoModel.objects.filter(serial_number__in=serial_list).delete()
- # 删除推送消息
- EquipmentInfoService.delete_all_equipment_info(device_uid__in=uid_list)
- # 重置设备云存
- UID_Bucket.objects.filter(uid__in=uid_list).delete()
- Unused_Uid_Meal.objects.filter(uid__in=uid_list).delete()
- # Order_Model.objects.filter(UID__in=uid_list).delete()
- StsCrdModel.objects.filter(uid__in=uid_list).delete()
- VodHlsModel.objects.filter(uid__in=uid_list).delete()
- # 删除vod_hls分表数据
- split_vod_hls_obj = SplitVodHlsObject()
- split_vod_hls_obj.del_vod_hls_data(uid__in=uid_list)
- ExperienceContextModel.objects.filter(uid__in=uid_list).delete()
- # 重置AI
- ExperienceAiModel.objects.filter(uid__in=uid_list).delete()
- AiService.objects.filter(uid__in=uid_list).delete()
- # 写入未使用序列号redis列表
- redis_obj.rpush_list(UNUSED_SERIAL_REDIS_LIST, serial_list)
- # 重置region_id,不为美洲服,则写入redis列表
- if CONFIG_INFO == CONFIG_US:
- DeviceDomainRegionModel.objects.filter(serial_number__in=serial_list).update(region_id=0)
- else:
- redis_obj.rpush_list(RESET_REGION_ID_SERIAL_REDIS_LIST, serial_list)
- # 重置已使用的uid的使用状态为未使用,更新时间
- UIDModel.objects.filter(uid__in=uid_list, status=2).update(status=0, mac='', update_time=now_time)
- # 重置扫码记录
- AppScannedSerial.objects.filter(serial__in=serial_list).delete()
- # 记录操作日志
- end_time = int(time.time())
- log = {
- 'user_id': 1,
- 'status': 200,
- 'time': now_time,
- 'url': 'cron/update/deleteUidData',
- 'operation': '已解绑序列号{}清除uid{}成功,执行时间{}秒'.format(serial_list, uid_list, end_time-now_time)
- }
- LogModel.objects.create(**log)
- return response.json(0)
- except Exception as e:
- # 记录操作日志
- log = {
- 'user_id': 1,
- 'status': 200,
- 'time': now_time,
- 'url': 'cron/update/deleteUidData',
- 'operation': '已解绑序列号{}清除uid{}异常:{}'.format(serial_list, uid_list, repr(e))
- }
- LogModel.objects.create(**log)
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def reset_region_id(request_dict, response):
- """
- 重置地区id
- @param request_dict: 请求参数
- @request_dict serial_redis_list: 序列号redis列表
- @param response: 响应对象
- """
- serial_redis_list = request_dict.get('serial_redis_list', None)
- LOGGER.info('---重置地区id参数---serial_redis_list:{}'.format(serial_redis_list))
- if not serial_redis_list:
- return response.json(444)
- try:
- serial_redis_list = eval(serial_redis_list)
- DeviceDomainRegionModel.objects.filter(serial_number__in=serial_redis_list).update(region_id=0)
- return response.json(0)
- except Exception as e:
- LOGGER.info('---重置地区id异常---:{}'.format(repr(e)))
- return response.json(500)
- @staticmethod
- def update_vod_meal(request_dict, response):
- """
- 定时修改体验套餐有效期为1个月
- @param request_dict: 请求参数
- @param response: 响应对象
- """
- try:
- Store_Meal.objects.filter(is_show=0, expire=12, pixel_level=0).update(price='39.99',
- virtual_price='56.6',
- sort=1)
- Store_Meal.objects.filter(is_show=0, cycle_config_id=1, pixel_level=0).update(price='3.65',
- virtual_price='5.66',
- sort=2)
- Store_Meal.objects.filter(id=12).update(price='3.99', virtual_price='5.66', sort=3)
- Store_Meal.objects.filter(id__in=(16, 17, 18)).update(is_show=0)
- return response.json(0)
- except Exception as e:
- LOGGER.info('---修改云存套餐内容异常---:{}'.format(repr(e)))
- return response.json(500)
- @staticmethod
- def check_customized_push(response):
- try:
- now_time = int(time.time())
- # 查询推送时间小于当前时间且推送状态为待推送的数据
- customized_push_qs = CustomizedPush.objects.filter(push_timestamp__lt=now_time, push_satus=0).values('id')
- if customized_push_qs.exists():
- for customized_push in customized_push_qs:
- customized_push_id = customized_push['id']
- data = {'customized_push_id': customized_push_id}
- url = DETECT_PUSH_DOMAINS + 'customized_push/start'
- req = requests.post(url=url, data=data, timeout=8)
- return response.json(0)
- except Exception as e:
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- class CronCollectDataView(View):
- def get(self, request, *args, **kwargs):
- request.encoding = 'utf-8'
- operation = kwargs.get('operation')
- return self.validation(request.GET, request, operation)
- def post(self, request, *args, **kwargs):
- request.encoding = 'utf-8'
- operation = kwargs.get('operation')
- return self.validation(request.POST, request, operation)
- def validation(self, request_dict, request, operation):
- response = ResponseObject()
- if operation == 'collectPlayBack': # 定时保存云存视频回放
- return self.collect_play_back(response)
- elif operation == 'collectDeviceUser': # 定时保存用户数据
- return self.collect_device_user(response)
- elif operation == 'collectActivityUser': # 定时保存用户数据
- return self.collect_activity_user(response)
- elif operation == 'collectOrder': # 定时保存订单数据
- return self.collect_order(response)
- elif operation == 'collectDeviceInfo': # 定时保存设备数据
- return self.collect_device_info(response)
- elif operation == 'collectFlowInfo': # 定时保存设备数据
- return self.collect_flow_info(response)
- elif operation == 'collectOperatingCosts': # 定时运营成本
- return self.collect_operating_costs(response)
- elif operation == 'collectObjSize': # 定时设备s3存储量
- return self.collect_obj_size(response)
- elif operation == 'checkRequest': # 定时检查各个服的https请求
- return self.check_request(response)
- else:
- return response.json(404)
- @staticmethod
- def collect_play_back(response):
- try:
- now_time = int(time.time())
- today = datetime.datetime.today()
- start_time = datetime.datetime(today.year, today.month, today.day)
- end_time = start_time + datetime.timedelta(days=1)
- start_time = CommonService.str_to_timestamp(start_time.strftime('%Y-%m-%d %H:%M:%S'))
- end_time = CommonService.str_to_timestamp(end_time.strftime('%Y-%m-%d %H:%M:%S'))
- this_month_str = datetime.datetime(today.year, today.month, 1)
- this_month_stamp = CommonService.str_to_timestamp(this_month_str.strftime('%Y-%m-%d %H:%M:%S'))
- video_play_back_time_qs = VideoPlaybackTimeModel.objects.filter(startTime__gte=start_time,
- startTime__lt=end_time,
- playMode='cloud').values('uid').annotate(
- play_duration=Sum('duration'), play_frequency=Count('uid'))
- with transaction.atomic():
- for item in video_play_back_time_qs:
- vod_hls_summary_qs = VodHlsSummary.objects.filter(uid=item['uid'], time=this_month_stamp)
- if vod_hls_summary_qs.exists():
- vod_hls_summary = vod_hls_summary_qs.first()
- vod_hls_summary.play_duration += item['play_duration']
- vod_hls_summary.play_frequency += 1
- vod_hls_summary.updated_time = now_time
- vod_hls_summary.save()
- else:
- VodHlsSummary.objects.create(uid=item['uid'], time=this_month_stamp, created_time=now_time,
- play_duration=item['play_duration'], play_frequency=1,
- updated_time=now_time)
- return response.json(0)
- except Exception as e:
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def collect_device_user(response):
- try:
- created_time = int(time.time())
- today = datetime.datetime.today()
- start_time = datetime.datetime(today.year, today.month, today.day)
- end_time = start_time + datetime.timedelta(days=1)
- increase_user_qs = Device_User.objects.filter(data_joined__year=today.year, data_joined__month=today.month,
- data_joined__day=today.day).values('region_country')
- start_time = CommonService.str_to_timestamp(start_time.strftime('%Y-%m-%d %H:%M:%S'))
- end_time = CommonService.str_to_timestamp(end_time.strftime('%Y-%m-%d %H:%M:%S'))
- active_user_qs = UserExModel.objects.filter(updTime__gte=start_time, updTime__lt=end_time).values(
- 'userID__region_country')
- country_qs = CountryModel.objects.all().values('id', 'region__name', 'country_name')
- country_dict = {}
- continent_dict = {}
- for item in country_qs:
- country_dict[item['id']] = item['country_name']
- continent_dict[item['country_name']] = item['region__name']
- with transaction.atomic():
- if increase_user_qs.exists():
- increase_user_count = increase_user_qs.count()
- increase_user_country_list = increase_user_qs.values('region_country').annotate(
- count=Count('region_country')).order_by('count')
- increase_user_country_dict = {}
- increase_user_continent_dict = {}
- for item in increase_user_country_list:
- country_name = country_dict.get(item['region_country'], '未知国家')
- continent_name = continent_dict.get(country_name, '未知大洲')
- increase_user_country_dict[country_name] = item['count']
- if continent_name not in increase_user_continent_dict:
- increase_user_continent_dict[continent_name] = 0
- increase_user_continent_dict[continent_name] += item['count']
- DeviceUserSummary.objects.create(time=start_time, count=increase_user_count,
- country=increase_user_country_dict, created_time=created_time,
- continent=increase_user_continent_dict)
- if active_user_qs.exists():
- active_user_count = active_user_qs.count()
- active_user_country_list = active_user_qs.values('userID__region_country').annotate(
- count=Count('userID__region_country')).order_by('count')
- active_user_country_dict = {}
- active_user_continent_dict = {}
- for item in active_user_country_list:
- country_name = country_dict.get(item['userID__region_country'], '未知国家')
- continent_name = continent_dict.get(country_name, '未知大洲')
- active_user_country_dict[country_name] = item['count']
- if continent_name not in active_user_continent_dict:
- active_user_continent_dict[continent_name] = 0
- active_user_continent_dict[continent_name] += item['count']
- user_qs = DeviceUserSummary.objects.filter(time=start_time, query_type=1)
- if user_qs.exists():
- user_qs.update(count=active_user_count, country=active_user_country_dict,
- continent=active_user_continent_dict)
- else:
- DeviceUserSummary.objects.create(time=start_time, query_type=1, count=active_user_count,
- country=active_user_country_dict, created_time=created_time,
- continent=active_user_continent_dict)
- return response.json(0)
- except Exception as e:
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def collect_order(response):
- try:
- created_time = int(time.time())
- today = datetime.datetime.today()
- start_time = datetime.datetime(today.year, today.month, today.day)
- end_time = start_time + datetime.timedelta(days=1)
- start_time = CommonService.str_to_timestamp(start_time.strftime('%Y-%m-%d %H:%M:%S'))
- end_time = CommonService.str_to_timestamp(end_time.strftime('%Y-%m-%d %H:%M:%S'))
- order_qs = Order_Model.objects.filter(addTime__gte=start_time, addTime__lt=end_time,
- status=1).values('UID', 'order_type',
- 'store_meal_name', 'price',
- 'addTime', 'currency').order_by(
- 'addTime')
- uid_list = []
- all_order_qs = Order_Model.objects.filter(addTime__lt=start_time, status=1).values('UID')
- for item in all_order_qs:
- if item['UID'] not in uid_list:
- uid_list.append(item['UID'])
- # 国家表数据
- country_qs = CountryModel.objects.values('id', 'country_name')
- country_dict = {}
- for item in country_qs:
- country_dict[item['id']] = item['country_name']
- # 设备类型数据
- device_type_qs = DeviceTypeModel.objects.values('name', 'type')
- device_type_dict = {}
- for item in device_type_qs:
- device_type_dict[item['type']] = item['name']
- with transaction.atomic():
- for item in order_qs:
- is_pay = 0
- price = float(item['price'])
- currency = item['currency']
- uid_set_qs = UidSetModel.objects.filter(uid=item['UID']).values('tb_country')
- country_id = uid_set_qs[0]['tb_country'] if uid_set_qs.exists() else 0
- country_name = country_dict.get(country_id, '未知国家')
- order_type = item['order_type']
- device_info_qs = Device_Info.objects.filter(UID=item['UID']).values('Type')
- device_type_id = device_info_qs[0]['Type'] if device_info_qs.exists() else 0
- device_type_name = device_type_dict.get(device_type_id, '未知设备')
- store_meal_name = item['store_meal_name']
- add_time_stamp = item['addTime']
- add_time_str = datetime.datetime.fromtimestamp(int(add_time_stamp))
- add_time_str = datetime.datetime(add_time_str.year, add_time_str.month, add_time_str.day)
- add_time_stamp = CommonService.str_to_timestamp(add_time_str.strftime('%Y-%m-%d %H:%M:%S'))
- if price == 0:
- is_pay = 1
- order_summary_qs = OrdersSummary.objects.filter(time=add_time_stamp, query_type=1,
- service_type=order_type)
- else:
- order_summary_qs = OrdersSummary.objects.filter(time=add_time_stamp, query_type=0,
- service_type=order_type)
- if item['UID'] not in uid_list:
- pay_order_summary_qs = OrdersSummary.objects.filter(time=add_time_stamp, query_type=2,
- service_type=order_type)
- query_type = 2
- else:
- pay_order_summary_qs = OrdersSummary.objects.filter(time=add_time_stamp, query_type=3,
- service_type=order_type)
- query_type = 3
- if pay_order_summary_qs.exists():
- pay_order_summary = pay_order_summary_qs.first()
- pay_order_summary.count += 1
- temp_total = eval(pay_order_summary.total)
- if currency not in temp_total:
- temp_total[currency] = price
- else:
- temp_total[currency] = round(temp_total[currency] + price, 2)
- pay_order_summary.total = temp_total
- country_temp_dict = eval(pay_order_summary.country)
- if country_name in country_temp_dict:
- country_temp_dict[country_name]['数量'] += 1
- if currency not in country_temp_dict[country_name]:
- country_temp_dict[country_name][currency] = price
- else:
- country_temp_dict[country_name][currency] = round(
- country_temp_dict[country_name][currency] + price, 2)
- else:
- country_temp_dict[country_name] = {'数量': 1, currency: price}
- pay_order_summary.country = country_temp_dict
- device_type_temp_dict = eval(pay_order_summary.device_type)
- if device_type_name in device_type_temp_dict:
- device_type_temp_dict[device_type_name]['数量'] += 1
- if currency not in device_type_temp_dict[device_type_name]:
- device_type_temp_dict[device_type_name][currency] = price
- else:
- device_type_temp_dict[device_type_name][currency] = round(
- device_type_temp_dict[device_type_name][currency] + price, 2)
- else:
- device_type_temp_dict[device_type_name] = {'数量': 1, currency: price}
- pay_order_summary.device_type = device_type_temp_dict
- store_meal_temp_dict = eval(pay_order_summary.store_meal)
- if store_meal_name in store_meal_temp_dict:
- store_meal_temp_dict[store_meal_name]['数量'] += 1
- if currency not in store_meal_temp_dict[store_meal_name]:
- store_meal_temp_dict[store_meal_name][currency] = price
- else:
- store_meal_temp_dict[store_meal_name][currency] = round(
- store_meal_temp_dict[store_meal_name][currency] + price, 2)
- else:
- store_meal_temp_dict[store_meal_name] = {'数量': 1, currency: price}
- pay_order_summary.store_meal = store_meal_temp_dict
- pay_order_summary.save()
- else:
- final_total = {currency: price}
- country_temp_dict = {
- country_name: {
- '数量': 1,
- currency: price
- }
- }
- device_type_temp_dict = {
- device_type_name: {
- '数量': 1,
- currency: price
- }
- }
- store_meal_temp_dict = {
- store_meal_name: {
- '数量': 1,
- currency: price
- }
- }
- OrdersSummary.objects.create(time=add_time_stamp, count=1, query_type=query_type,
- service_type=order_type, total=final_total,
- country=country_temp_dict, created_time=created_time,
- device_type=device_type_temp_dict,
- store_meal=store_meal_temp_dict)
- if order_summary_qs.exists():
- order_summary = order_summary_qs.first()
- order_summary.count += 1
- temp_total = eval(order_summary.total)
- if currency not in temp_total:
- temp_total[currency] = price
- else:
- temp_total[currency] = round(temp_total[currency] + price, 2)
- order_summary.total = temp_total
- country_temp_dict = eval(order_summary.country)
- if country_name in country_temp_dict:
- if is_pay == 0:
- country_temp_dict[country_name]['数量'] += 1
- if currency not in country_temp_dict[country_name]:
- country_temp_dict[country_name][currency] = price
- else:
- country_temp_dict[country_name][currency] = round(
- country_temp_dict[country_name][currency] + price, 2)
- else:
- country_temp_dict[country_name] += 1
- else:
- if is_pay == 0:
- country_temp_dict[country_name] = {'数量': 1, currency: price}
- else:
- country_temp_dict[country_name] = 1
- order_summary.country = country_temp_dict
- device_type_temp_dict = eval(order_summary.device_type)
- if device_type_name in device_type_temp_dict:
- if is_pay == 0:
- device_type_temp_dict[device_type_name]['数量'] += 1
- if currency not in device_type_temp_dict[device_type_name]:
- device_type_temp_dict[device_type_name][currency] = price
- else:
- device_type_temp_dict[device_type_name][currency] = round(
- device_type_temp_dict[device_type_name][currency] + price, 2)
- else:
- device_type_temp_dict[device_type_name] += 1
- else:
- if is_pay == 0:
- device_type_temp_dict[device_type_name] = {'数量': 1, currency: price}
- else:
- device_type_temp_dict[device_type_name] = 1
- order_summary.device_type = device_type_temp_dict
- store_meal_temp_dict = eval(order_summary.store_meal)
- if store_meal_name in store_meal_temp_dict:
- if is_pay == 0:
- store_meal_temp_dict[store_meal_name]['数量'] += 1
- if currency not in store_meal_temp_dict[store_meal_name]:
- store_meal_temp_dict[store_meal_name][currency] = price
- else:
- store_meal_temp_dict[store_meal_name][currency] = round(
- store_meal_temp_dict[store_meal_name][currency] + price, 2)
- else:
- store_meal_temp_dict[store_meal_name] += 1
- else:
- if is_pay == 0:
- store_meal_temp_dict[store_meal_name] = {'数量': 1, currency: price}
- else:
- store_meal_temp_dict[store_meal_name] = 1
- order_summary.store_meal = store_meal_temp_dict
- order_summary.save()
- else:
- final_total = {currency: price}
- if is_pay == 0:
- country_temp_dict = {
- country_name: {
- '数量': 1,
- currency: price
- }
- }
- device_type_temp_dict = {
- device_type_name: {
- '数量': 1,
- currency: price
- }
- }
- store_meal_temp_dict = {
- store_meal_name: {
- '数量': 1,
- currency: price
- }
- }
- else:
- device_type_temp_dict = {
- device_type_name: 1
- }
- store_meal_temp_dict = {
- store_meal_name: 1
- }
- country_temp_dict = {
- country_name: 1
- }
- OrdersSummary.objects.create(time=add_time_stamp, count=1, query_type=is_pay,
- service_type=order_type, total=final_total,
- country=country_temp_dict, created_time=created_time,
- device_type=device_type_temp_dict, store_meal=store_meal_temp_dict)
- return response.json(0)
- except Exception as e:
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def collect_operating_costs(response):
- try:
- today = datetime.datetime.today()
- end_time = datetime.datetime(today.year, today.month, today.day)
- yesterday = end_time - datetime.timedelta(days=1)
- start_time = datetime.datetime(yesterday.year, yesterday.month, 1)
- start_time_stamp = int(start_time.timestamp())
- end_time_stamp = int(end_time.timestamp())
- thread = threading.Thread(target=CronCollectDataView.thread_collect_operating_costs,
- args=(start_time_stamp, end_time_stamp, start_time, end_time))
- thread.start() # 启动线程
- return response.json(0)
- except Exception as e:
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def thread_collect_operating_costs(start_time_stamp, end_time_stamp, start_time, end_time):
- try:
- create_time = int(time.time())
- today_end_time = end_time_stamp + 86400
- operating_costs_qs_1 = OperatingCosts.objects.filter(time=start_time_stamp).exclude(
- created_time__gte=end_time_stamp, created_time__lt=today_end_time).values('order_id', 'end_time', 'uid')
- operating_costs_qs_2 = OperatingCosts.objects.filter(time=start_time_stamp,
- created_time__gte=end_time_stamp,
- created_time__lt=today_end_time, start_time=0).values(
- 'order_id', 'end_time', 'uid')
- operating_costs_qs = operating_costs_qs_1.union(operating_costs_qs_2)
- storage_univalence = 0.023 / 30
- api_univalence = 0.005 / 1000
- region = '国内' if CONFIG_INFO == CONFIG_CN else '国外'
- country_qs = CountryModel.objects.values('id', 'country_name')
- country_dict = {}
- for item in country_qs:
- country_dict[item['id']] = item['country_name']
- for item in operating_costs_qs:
- order_qs = Order_Model.objects.filter(orderID=item['order_id'], order_type__in=[0, 1]).values('price',
- 'payTime',
- 'rank__expire',
- 'fee',
- 'payType',
- 'userID__region_country')
- if order_qs.exists():
- order = order_qs[0]
- country_name = country_dict.get(order['userID__region_country'], '未知国家')
- order_type = '云存'
- expire = str(order_qs[0]['rank__expire']) + '个月'
- price = float(order['price'])
- if order['payType'] in [2, 3]:
- fee = round(0.0054 * price, 2)
- else:
- fee = float(order['fee']) if order['fee'] else 0
- order_start_time = int((datetime.datetime.fromtimestamp(item['end_time']) - relativedelta(
- months=order['rank__expire'])).timestamp())
- order_days = math.ceil((item['end_time'] - order_start_time) / 86400)
- if item['end_time'] > end_time_stamp: # 订单结束时间大于统计时间
- if order_start_time <= start_time_stamp: # 订单月初之前开始
- settlement_days = (end_time - start_time).days # 当月结算天数=月初-月底
- uid_bucket_statistics = UidBucketStatistics.objects.filter(time__gte=start_time_stamp,
- time__lte=end_time_stamp,
- uid=item['uid'])
- elif order_start_time >= end_time_stamp: # 订单在统计时间之后开始
- settlement_days = 1
- uid_bucket_statistics = UidBucketStatistics.objects.filter(time__gte=end_time_stamp,
- time__lt=order_start_time,
- uid=item['uid'])
- else: # 订单月初和统计时间之间开始
- settlement_days = math.ceil((end_time_stamp - order_start_time) / 86400)
- uid_bucket_statistics = UidBucketStatistics.objects.filter(time__gte=order_start_time,
- time__lte=end_time_stamp,
- uid=item['uid'])
- remaining_usage_time = math.ceil((item['end_time'] - end_time_stamp) / 86400) # 剩余使用时间
- else: # 订单结束时间小于统计时间
- if order_start_time <= start_time_stamp:
- settlement_days = math.ceil((item['end_time'] - start_time_stamp) / 86400)
- uid_bucket_statistics = UidBucketStatistics.objects.filter(time__gte=start_time_stamp,
- time__lt=item['end_time'],
- uid=item['uid'])
- else:
- settlement_days = math.ceil((item['end_time'] - order_start_time) / 86400)
- uid_bucket_statistics = UidBucketStatistics.objects.filter(time__gte=order_start_time,
- time__lt=item['end_time'],
- uid=item['uid'])
- remaining_usage_time = 0
- day_average_price = round(price / order_days, 2) # 收入分摊/天
- month_average_price = round(day_average_price * settlement_days, 2) # 收入分摊/月
- monthly_income = round((price - fee) / order_days * settlement_days, 2) # 当月结算收入
- real_income = round(price - fee, 2)
- result = uid_bucket_statistics.aggregate(size=Sum('storage_size'), api_count=Sum('api_count'))
- actual_storage = round(result['size'], 2) if result['size'] else 0
- actual_api = result['api_count'] if result['api_count'] else 0
- storage_cost = actual_storage / 1024 * storage_univalence * settlement_days
- api_cost = actual_api * api_univalence
- if CONFIG_INFO == CONFIG_CN: # 国内要换算汇率
- storage_cost = storage_cost * 7
- api_cost = api_cost * 7
- profit = round(monthly_income - storage_cost - api_cost, 2) # 利润=月结算金额-月成本
- storage_cost = round(storage_cost, 2)
- api_cost = round(api_cost, 2)
- if monthly_income == 0.0:
- profit_margin = 0
- else:
- profit_margin = round(profit / month_average_price, 2) # 利润率=利润/每月收入分摊
- OperatingCosts.objects.filter(time=start_time_stamp, order_id=item['order_id'],
- uid=item['uid']).update(day_average_price=day_average_price,
- month_average_price=month_average_price,
- monthly_income=monthly_income,
- actual_storage=actual_storage,
- settlement_days=settlement_days,
- actual_api=actual_api, fee=fee,
- created_time=create_time, region=region,
- start_time=order_start_time,
- remaining_usage_time=remaining_usage_time,
- storage_cost=storage_cost, api_cost=api_cost,
- profit=profit, profit_margin=profit_margin,
- real_income=real_income, price=price,
- country_name=country_name,
- order_type=order_type, expire=expire)
- print('结束')
- except Exception as e:
- LOGGER.info(
- 'thread_collect_operating_costs接口异常:errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def collect_obj_size(response):
- try:
- today = datetime.datetime.today()
- end_time = datetime.datetime(today.year, today.month, today.day)
- start_time = end_time - datetime.timedelta(days=1)
- first_date = datetime.datetime(start_time.year, start_time.month, 1)
- start_time_stamp = int(start_time.timestamp())
- end_time_stamp = int(end_time.timestamp())
- first_date_stamp = int(first_date.timestamp())
- thread = threading.Thread(target=CronCollectDataView.thread_collect_obj_size,
- args=(start_time_stamp, end_time_stamp, first_date_stamp))
- thread.start() # 启动线程
- return response.json(0)
- except Exception as e:
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def thread_collect_obj_size(start_time, end_time, first_date):
- try:
- creat_time = int(time.time())
- uid_list = UidBucketStatistics.objects.filter(time=start_time).values_list('uid', flat=True)
- uid_vod = UID_Bucket.objects.filter(Q(endTime__gte=start_time), ~Q(uid__in=uid_list)).values('uid',
- 'bucket__bucket',
- 'orderId',
- 'channel',
- 'endTime')
- s3_obj = AmazonS3Util(ACCESS_KEY_ID, SECRET_ACCESS_KEY, REGION_NAME)
- for item in uid_vod:
- path = '{uid}/vod{channel}'.format(uid=item['uid'], channel=item['channel'])
- s3_result = s3_obj.get_object_list(item['bucket__bucket'], path,
- path + '/{}'.format(start_time), end_time)
- actual_storage = 0
- actual_api = 0
- for obj in s3_result:
- temp_time = int(obj['Key'].split('/')[2])
- if temp_time < end_time:
- actual_storage += obj['Size']
- actual_api += 1
- actual_storage = round(actual_storage / 1024 / 1024, 2)
- with transaction.atomic():
- if actual_api:
- UidBucketStatistics.objects.create(uid=item['uid'], storage_size=actual_storage,
- api_count=actual_api,
- created_time=creat_time,
- time=start_time)
- operating_costs_qs = OperatingCosts.objects.filter(order_id=item['orderId'], uid=item['uid'],
- time=first_date)
- if not operating_costs_qs.exists():
- OperatingCosts.objects.create(order_id=item['orderId'], uid=item['uid'],
- created_time=creat_time, time=first_date,
- end_time=item['endTime'])
- print(actual_storage, actual_api)
- print('结束')
- except Exception as e:
- LOGGER.info('统计s3信息异常:errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @classmethod
- def check_request(cls, response):
- domain_name_list = [
- 'test.zositechc.cn', 'test.push.zositechc.cn',
- 'www.zositechc.cn', 'api.loocam2.cn', 'common.neutral2.cn', 'api.aiotserver.cn', 'push.zositechc.cn',
- 'www.dvema.com', 'api.zositech2.com', 'api.loocam2.com', 'common.neutral2.com', '149.neutral2.com',
- '365.neutral2.com', 'push.dvema.com',
- 'www.zositeche.com', 'api.zositeche.com', 'api.loocam3.com', 'common.neutral3.com',
- 'push.zositeche.com',
- 'www.zositech.xyz', 'smart.loocam2.com'
- ]
- for domain_name in domain_name_list:
- thread = threading.Thread(target=cls.initial_request, args=(domain_name,))
- thread.start()
- return response.json(0)
- @staticmethod
- def initial_request(domain_name):
- url = 'https://{}/init/health-check'.format(domain_name)
- try:
- requests.post(url=url, timeout=30)
- except Exception as e:
- email_content = 'https请求域名{}出现异常!error_msg:{}'.format(domain_name, repr(e))
- S3Email().faEmail(email_content, 'servers@ansjer.com')
- @staticmethod
- def collect_device_info(response):
- try:
- created_time = int(time.time())
- today = datetime.datetime.today()
- start_time = datetime.datetime(today.year, today.month, today.day)
- end_time = start_time + datetime.timedelta(days=1)
- start_time = CommonService.str_to_timestamp(start_time.strftime('%Y-%m-%d %H:%M:%S'))
- end_time = CommonService.str_to_timestamp(end_time.strftime('%Y-%m-%d %H:%M:%S'))
- increase_device_qs = UidSetModel.objects.filter(addTime__gte=start_time, addTime__lt=end_time).values(
- 'tb_country',
- 'uid',
- 'device_type',
- 'cloud_vod',
- 'is_ai',
- 'mobile_4g',
- 'addTime')
- video_play_back_time_qs = VideoPlaybackTimeModel.objects.filter(startTime__gte=start_time,
- startTime__lt=end_time).values('uid')
- active_device_qs = UidSetModel.objects.filter(uid__in=video_play_back_time_qs).values('tb_country',
- 'addTime',
- 'device_type',
- 'cloud_vod',
- 'is_ai',
- 'mobile_4g',
- 'uid')
- increase_device_count = increase_device_qs.count()
- active_device_count = active_device_qs.count()
- # 国家表数据
- country_qs = CountryModel.objects.values('id', 'country_name', 'region__name')
- country_dict = {}
- continent_dict = {}
- for item in country_qs:
- country_dict[item['id']] = item['country_name']
- continent_dict[item['country_name']] = item['region__name']
- # 设备类型数据
- device_type_qs = DeviceTypeModel.objects.values('name', 'type')
- device_type_dict = {}
- for item in device_type_qs:
- device_type_dict[item['type']] = item['name']
- with transaction.atomic():
- if increase_device_qs.exists():
- # 国家大洲设备数据
- increase_device_country_list = increase_device_qs.values('tb_country').annotate(
- count=Count('tb_country')).order_by('count')
- increase_device_country_dict = {}
- increase_device_continent_dict = {}
- for item in increase_device_country_list:
- country_name = country_dict.get(item['tb_country'], '未知国家')
- continent_name = continent_dict.get(country_name, '未知大洲')
- increase_device_country_dict[country_name] = item['count']
- if continent_name not in increase_device_continent_dict:
- increase_device_continent_dict[continent_name] = 0
- increase_device_continent_dict[continent_name] += item['count']
- # 设备类型数据
- increase_device_type_list = increase_device_qs.values('device_type').annotate(
- count=Count('device_type')).order_by('count')
- increase_device_type_dict = {}
- for item in increase_device_type_list:
- type_name = device_type_dict.get(item['device_type'], '未知设备类型')
- increase_device_type_dict[type_name] = item['count']
- # 云存设备类型数据
- increase_device_vod_list = increase_device_qs.filter(~Q(cloud_vod=2)).values(
- 'device_type').annotate(
- count=Count('device_type')).order_by('count')
- increase_device_vod_dict = {}
- for item in increase_device_vod_list:
- type_name = device_type_dict.get(item['device_type'], '未知设备类型')
- increase_device_vod_dict[type_name] = item['count']
- # AI设备类型数据
- increase_device_ai_list = increase_device_qs.filter(~Q(is_ai=2)).values('device_type').annotate(
- count=Count('device_type')).order_by('count')
- increase_device_ai_dict = {}
- for item in increase_device_ai_list:
- type_name = device_type_dict.get(item['device_type'], '未知设备类型')
- increase_device_ai_dict[type_name] = item['count']
- # 联通设备类型数据
- increase_device_unicom_list = increase_device_qs.filter(~Q(mobile_4g=2)).values(
- 'device_type').annotate(
- count=Count('device_type')).order_by('count')
- increase_device_unicom_dict = {}
- for item in increase_device_unicom_list:
- type_name = device_type_dict.get(item['device_type'], '未知设备类型')
- increase_device_unicom_dict[type_name] = item['count']
- DeviceInfoSummary.objects.create(time=start_time, count=increase_device_count,
- query_type=0, created_time=created_time,
- country=increase_device_country_dict,
- continent=increase_device_continent_dict,
- vod_service=increase_device_vod_dict,
- ai_service=increase_device_ai_dict,
- unicom_service=increase_device_unicom_dict,
- device_type=increase_device_type_dict)
- if active_device_qs.exists():
- # 国家大洲设备数据
- active_device_country_list = active_device_qs.values('tb_country').annotate(
- count=Count('tb_country')).order_by('count')
- active_device_country_dict = {}
- active_device_continent_dict = {}
- for item in active_device_country_list:
- country_name = country_dict.get(item['tb_country'], '未知国家')
- continent_name = continent_dict.get(country_name, '未知大洲')
- active_device_country_dict[country_name] = item['count']
- if continent_name not in active_device_continent_dict:
- active_device_continent_dict[continent_name] = 0
- active_device_continent_dict[continent_name] += item['count']
- # 设备类型数据
- active_device_type_list = active_device_qs.values('device_type').annotate(
- count=Count('device_type')).order_by('count')
- active_device_type_dict = {}
- for item in active_device_type_list:
- type_name = device_type_dict.get(item['device_type'], '未知设备类型')
- active_device_type_dict[type_name] = item['count']
- # 云存设备类型数据
- active_device_vod_list = active_device_qs.filter(~Q(cloud_vod=2)).values('device_type').annotate(
- count=Count('device_type')).order_by('count')
- active_device_vod_dict = {}
- for item in active_device_vod_list:
- type_name = device_type_dict.get(item['device_type'], '未知设备类型')
- active_device_vod_dict[type_name] = item['count']
- # AI设备类型数据
- active_device_ai_list = active_device_qs.filter(~Q(is_ai=2)).values('device_type').annotate(
- count=Count('device_type')).order_by('count')
- active_device_ai_dict = {}
- for item in active_device_ai_list:
- type_name = device_type_dict.get(item['device_type'], '未知设备类型')
- active_device_ai_dict[type_name] = item['count']
- # 联通设备类型数据
- active_device_unicom_list = active_device_qs.filter(~Q(mobile_4g=2)).values('device_type').annotate(
- count=Count('device_type')).order_by('count')
- active_device_unicom_dict = {}
- for item in active_device_unicom_list:
- type_name = device_type_dict.get(item['device_type'], '未知设备类型')
- active_device_unicom_dict[type_name] = item['count']
- DeviceInfoSummary.objects.create(time=start_time, count=active_device_count,
- query_type=1, created_time=created_time,
- country=active_device_country_dict,
- continent=active_device_continent_dict,
- vod_service=active_device_vod_dict,
- ai_service=active_device_ai_dict,
- unicom_service=active_device_unicom_dict,
- device_type=active_device_type_dict)
- return response.json(0)
- except Exception as e:
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def collect_activity_user(response):
- try:
- created_time = int(time.time())
- today = datetime.datetime.today()
- start_time = datetime.datetime(today.year, today.month, today.day)
- end_time = start_time + datetime.timedelta(days=1)
- start_time = CommonService.str_to_timestamp(start_time.strftime('%Y-%m-%d %H:%M:%S'))
- end_time = CommonService.str_to_timestamp(end_time.strftime('%Y-%m-%d %H:%M:%S'))
- thread = threading.Thread(target=CronCollectDataView.thread_collect_activity_user,
- args=(start_time, end_time, created_time))
- thread.start() # 启动线程
- return response.json(0)
- except Exception as e:
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def thread_collect_activity_user(start_time, end_time, created_time):
- try:
- active_uid = UserExModel.objects.filter(updTime__gte=start_time, updTime__lt=end_time).values_list(
- 'userID__device_info__UID', flat=True).distinct()
- active_device_qs = UidSetModel.objects.filter(uid__in=active_uid).values('addTime', 'device_type', 'ucode')
- # 设备类型数据
- device_type_qs = DeviceTypeModel.objects.values('name', 'type')
- device_type_dict = {}
- for item in device_type_qs:
- device_type_dict[item['type']] = item['name']
- with transaction.atomic():
- if active_device_qs.exists():
- # 按设备设备类型
- active_device_type_list = active_device_qs.values('device_type').annotate(
- count=Count('device_type')).order_by('count')
- active_device_type_dict = {}
- for item in active_device_type_list:
- type_name = device_type_dict.get(item['device_type'], '未知设备类型')
- active_device_type_dict[type_name] = item['count']
- # 按ucode分类
- active_ucode_list = active_device_qs.values('ucode').annotate(count=Count('ucode')).order_by('count')
- active_ucode_dict = {}
- for item in active_ucode_list:
- ucode = item['ucode']
- if item['ucode'] == '':
- ucode = '未知ucode'
- active_ucode_dict[ucode] = item['count']
- user_qs = DeviceUserSummary.objects.filter(time=start_time, query_type=1)
- if user_qs.exists():
- user_qs.update(device_type=active_device_type_dict, ucode=active_ucode_dict)
- else:
- DeviceUserSummary.objects.create(time=start_time, query_type=1, created_time=created_time,
- device_type=active_device_type_dict, ucode=active_ucode_dict)
- except Exception as e:
- LOGGER.info(
- 'thread_collect_activity_user接口异常:errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno,
- repr(e)))
- @staticmethod
- def collect_flow_info(response):
- try:
- unicom_qs = UnicomDeviceInfo.objects.filter(card_type=0).values('iccid').distinct().order_by('iccid')
- asy = threading.Thread(target=CronCollectDataView.thread_collect_flow, args=(unicom_qs,))
- asy.start()
- return response.json(0)
- except Exception as e:
- return response.json(500, repr(e))
- @staticmethod
- def thread_collect_flow(qs):
- try:
- unicom_api = UnicomObjeect()
- redis_obj = RedisObject()
- for item in qs:
- res = unicom_api.query_device_usage_history(**item)
- if res.status_code == 200:
- res_json = res.json()
- if res_json['code'] == 0:
- redis_dict = {}
- for data in res_json['data']['deviceUsageHistory']:
- year = data.get('year', None)
- month = data.get('month', None)
- flow = data.get('flowTotalUsage', None)
- if not all([year, month, flow]):
- continue
- file = str(year) + '-' + str(month)
- redis_dict[file] = flow
- key = 'monthly_flow_' + item['iccid']
- if redis_dict:
- redis_obj.set_hash_data(key, redis_dict)
- except Exception as e:
- LOGGER.info('统计联通流量失败,时间为:{}'.format(int(time.time())))
- class CronComparedDataView(View):
- def get(self, request, *args, **kwargs):
- request.encoding = 'utf-8'
- operation = kwargs.get('operation')
- return self.validation(request.GET, request, operation)
- def post(self, request, *args, **kwargs):
- request.encoding = 'utf-8'
- operation = kwargs.get('operation')
- return self.validation(request.POST, request, operation)
- def validation(self, request_dict, request, operation):
- response = ResponseObject()
- if operation == 'PaypalOrder': # 定时对比paypal订单
- return self.compared_paypal_order(request_dict, response)
- elif operation == 'WechatOrder': # 定时对比微信订单
- return self.compared_wechat_order(response)
- elif operation == 'AlipayOrder': # 定时对比阿里订单
- return self.compared_alipay_order(response)
- elif operation == 'AnsjerOrder': # 定时对比后台订单
- return self.compared_ansjer_order(request_dict, response)
- else:
- return response.json(404)
- @staticmethod
- def compared_paypal_order(request_dict, response):
- time_stamp = request_dict.get('time', None)
- if time_stamp:
- end_date = datetime.datetime.fromtimestamp(int(time_stamp))
- start_date = end_date - datetime.timedelta(days=1)
- else:
- today = datetime.datetime.today()
- start_date = today - datetime.timedelta(days=2)
- start_date = datetime.datetime(start_date.year, start_date.month, start_date.day)
- end_date = start_date + datetime.timedelta(days=1)
- try:
- zosi_paypal_api = paypalrestsdk.Api(PAYPAL_CRD['Zosi'])
- vsees_paypal_api = paypalrestsdk.Api(PAYPAL_CRD['Vsees'])
- paypal_url = 'v1/reporting/transactions?start_date={}-{}-{}T08:00:00-0800&end_date={}-{}-{}T08:00:00-0800&fields=all&page_size=500&page=1&transaction_status=S'.format(
- start_date.year, start_date.month, start_date.day, end_date.year, end_date.month, end_date.day)
- zosi_order_list = zosi_paypal_api.get(paypal_url)
- vsees_order_list = vsees_paypal_api.get(paypal_url)
- order_list = zosi_order_list['transaction_details'] + vsees_order_list['transaction_details']
- # data = (
- # ('start_date', '{}-{}-{}T08:00:00-0800'.format(start_date.year, start_date.month, start_date.day)),
- # ('end_date', '{}-{}-{}T08:00:00-0800'.format(end_date.year, end_date.month, end_date.day)),
- # ('fields', 'all'),
- # ('page_size', '500'),
- # ('page', '1'),
- # ('transaction_status', 'S')
- # )
- # order_list = PayPalService(PAYPAL_CRD['client_id'], PAYPAL_CRD['client_secret']).get_transactions(data)
- thread = threading.Thread(target=CronComparedDataView.thread_compared_paypal_order,
- args=(order_list, end_date))
- thread.start() # 启动线程
- return response.json(0)
- except Exception as e:
- LOGGER.info('CronComparedDataView.compared_paypal_order, errLine:{}, errMsg:{}'.format(
- e.__traceback__.tb_lineno, repr(e)))
- return response.json(500)
- @staticmethod
- def thread_compared_paypal_order(order_list, start_time):
- try:
- now_time = int(time.time())
- timestamp = int(start_time.timestamp())
- count = 0
- total = 0
- more_order_list = []
- for item in order_list:
- if 'custom_field' in item['transaction_info'] and item['transaction_info']['custom_field'] == 'Shopify':
- continue
- count += 1
- total += float(item['transaction_info']['transaction_amount']['value'])
- trade_no = item['transaction_info']['transaction_id']
- if item['transaction_info']['transaction_event_code'] in ['T1106', 'T1107', 'T1202']: # 付款退款
- trade_no = item['transaction_info']['paypal_reference_id']
- transaction_subject = item['transaction_info'].get('transaction_subject', '')
- agreement_id = item['transaction_info'].get('paypal_reference_id', '')
- refund_order = False
- if item['transaction_info']['transaction_event_code'] in ['T1106', 'T1107', 'T1201', 'T0114', 'T1108']:
- agreement_id = ''
- if item['transaction_info']['transaction_event_code'] in ['T0114']:
- transaction_subject = '争议费'
- elif item['transaction_info']['transaction_event_code'] in ['T1108']:
- transaction_subject = 'Fee reversal'
- else:
- refund_order = True
- transaction_subject = '退款费'
- more_order_list.append(trade_no)
- pay_time = int(datetime.datetime.strptime(item['transaction_info']['transaction_updated_date'],
- "%Y-%m-%dT%H:%M:%S%z").timestamp())
- order_qs = Order_Model.objects.filter(trade_no=trade_no, payType=1)
- if not order_qs.exists():
- order_dict = {
- 'trade_no': trade_no,
- 'agreement_id': agreement_id,
- 'pay_time': pay_time,
- 'username': item['payer_info'].get('email_address', ''),
- 'price': item['transaction_info']['transaction_amount']['value'],
- 'pay_type': 1,
- 'upd_time': now_time,
- 'status': 0,
- 'meal_name': transaction_subject
- }
- if agreement_id:
- order_dict['pay_type'] = 0
- order_dict['meal_name'] = 'paypal_cycle'
- order_dict['order_id'] = transaction_subject
- params = {'trade_no': trade_no, 'pay_time': pay_time, 'refund_order': refund_order}
- response = requests.get('https://www.zositeche.com/testApi/checkOrderExist', params=params)
- if response.status_code != 200:
- # 如果响应失败,记录在数据库
- abnormal_qs = AbnormalOrder.objects.filter(trade_no=trade_no)
- if not abnormal_qs.exists():
- AbnormalOrder.objects.create(**order_dict)
- continue
- result = response.json()
- if result['result_code'] != 0 or not result['result']['is_exist']:
- # 如果响应结果为空,记录在数据库
- abnormal_qs = AbnormalOrder.objects.filter(trade_no=trade_no)
- if not abnormal_qs.exists():
- AbnormalOrder.objects.create(**order_dict)
- more_order_list.append(trade_no)
- else:
- if not refund_order:
- order_qs.update(payTime=pay_time)
- total = round(total, 2)
- daily_reconciliation = DailyReconciliation.objects.filter(time=timestamp)
- if daily_reconciliation.exists():
- if daily_reconciliation.first().order_ids:
- old_order_list = daily_reconciliation.first().order_ids.split(',')
- more_order_list = list(set(old_order_list) | set(more_order_list))
- order_ids = ','.join(set(more_order_list))
- daily_reconciliation.update(paypal_num=count, paypal_total=total, upd_time=now_time,
- order_ids=order_ids)
- else:
- order_ids = ','.join(set(more_order_list))
- DailyReconciliation.objects.create(paypal_num=count, paypal_total=total, time=timestamp,
- order_ids=order_ids, creat_time=now_time, upd_time=now_time)
- except Exception as e:
- LOGGER.info('paypal每日对账异常:errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def compared_wechat_order(response):
- today = datetime.datetime.today()
- start_date = today - datetime.timedelta(days=1)
- start_date = start_date.strftime("%Y%m%d")
- try:
- order_list = WechatPayObject().download_bill(start_date)
- thread = threading.Thread(target=CronComparedDataView.thread_compared_wechat_order,
- args=(order_list,))
- thread.start()
- return response.json(0)
- except Exception as e:
- LOGGER.info('CronComparedDataView.compared_wechat_order, errLine:{}, errMsg:{}'.format(
- e.__traceback__.tb_lineno, repr(e)))
- return response.json(500)
- @staticmethod
- def thread_compared_wechat_order(order_list):
- now_time = int(time.time())
- for order in order_list:
- if order['交易类型'] != '`APP':
- continue
- order_id = order['商户订单号'].replace('`', '')
- order_qs = Order_Model.objects.filter(orderID=order_id)
- if not order_qs.exists():
- order_dict = {
- 'trade_no': order['微信订单号'].replace('`', ''),
- 'order_id': order_id,
- 'pay_type': 3,
- 'price': order['订单金额'].replace('`', ''),
- 'pay_time': int(datetime.datetime.strptime(order['\ufeff交易时间'], "`%Y-%m-%d %H:%M:%S").timestamp()),
- 'upd_time': now_time,
- 'meal_name': order['商品名称'].replace('`', ''),
- }
- AbnormalOrder.objects.create(**order_dict)
- @staticmethod
- def compared_alipay_order(response):
- today = datetime.datetime.today()
- start_date = today - datetime.timedelta(days=1)
- start_date = start_date.strftime("%Y-%m-%d")
- try:
- ali_pay_obj = AliPayObject()
- alipay = ali_pay_obj.conf()
- result = alipay.server_api(
- api_name='alipay.data.dataservice.bill.downloadurl.query',
- biz_content={'bill_type': 'trade',
- 'bill_date': start_date,
- }
- )
- res = requests.get(result['bill_download_url'])
- zip_file = res.content
- zip_data = io.BytesIO(zip_file)
- data = []
- with zipfile.ZipFile(zip_data, 'r') as zip_ref:
- for file in zip_ref.namelist():
- if '汇总' not in file.encode('cp437').decode('gbk'):
- with zip_ref.open(file) as f:
- reader = csv.reader(io.TextIOWrapper(f, 'gbk'))
- for row in reader:
- data.append(row)
- key_list = data[4]
- orders = data[5:-4]
- order_list = []
- for item in orders:
- order_list.append(dict(zip(key_list, item)))
- thread = threading.Thread(target=CronComparedDataView.thread_compared_alipay_order,
- args=(order_list,))
- thread.start()
- return response.json(0)
- except Exception as e:
- LOGGER.info('CronComparedDataView.compared_alipay_order, errLine:{}, errMsg:{}'.format(
- e.__traceback__.tb_lineno, repr(e)))
- return response.json(500)
- @staticmethod
- def thread_compared_alipay_order(order_list):
- now_time = int(time.time())
- for order in order_list:
- order_id = order['商户订单号'].replace('\t', '')
- if len(order_id) != 20:
- continue
- order_qs = Order_Model.objects.filter(orderID=order_id)
- if not order_qs.exists():
- order_dict = {
- 'trade_no': order['支付宝交易号'].replace('\t', ''),
- 'order_id': order_id,
- 'pay_type': 2,
- 'price': order['订单金额(元)'].replace('\t', ''),
- 'pay_time': int(datetime.datetime.strptime(order['完成时间'], "%Y-%m-%d %H:%M:%S").timestamp()),
- 'upd_time': now_time,
- 'meal_name': order['商品名称'].replace('\t', ''),
- 'username': order['对方账户'].replace('\t', ''),
- }
- AbnormalOrder.objects.create(**order_dict)
- @staticmethod
- def compared_ansjer_order(request_dict, response):
- start_date_stamp = request_dict.get('time', None)
- end_time = request_dict.get('end_time', None)
- if start_date_stamp:
- start_date = datetime.datetime.fromtimestamp(int(start_date_stamp))
- end_date = start_date + datetime.timedelta(days=1)
- end_date_stamp = int(end_date.timestamp())
- else:
- today = datetime.datetime.today()
- start_date = today - datetime.timedelta(days=1)
- start_date = datetime.datetime(start_date.year, start_date.month, start_date.day)
- end_date = datetime.datetime(today.year, today.month, today.day)
- start_date_stamp = int(start_date.timestamp())
- end_date_stamp = int(end_date.timestamp())
- try:
- if end_time:
- end_date_stamp = end_time
- order_qs = Order_Model.objects.filter(status__in=[1, 5, 6], payType=1, payTime__gte=start_date_stamp,
- payTime__lt=end_date_stamp).values('trade_no', 'orderID', 'UID',
- 'userID__username',
- 'userID__NickName', 'channel',
- 'desc', 'payType',
- 'price', 'status',
- 'refunded_amount', 'addTime',
- 'updTime', 'app_type')
- if CONFIG_INFO == CONFIG_EUR:
- return response.json(0, list(order_qs))
- thread = threading.Thread(target=CronComparedDataView.thread_compared_ansjer_order,
- args=(list(order_qs), start_date))
- thread.start() # 启动线程
- return response.json(0)
- except Exception as e:
- LOGGER.info('CronComparedDataView.compared_ansjer_order, errLine:{}, errMsg:{}'.format(
- e.__traceback__.tb_lineno, repr(e)))
- return response.json(500)
- @staticmethod
- def thread_compared_ansjer_order(order_list, start_time):
- while True:
- response = requests.get('https://www.zositeche.com/cron/compared/AnsjerOrder',
- params={'time': int(start_time.timestamp())})
- if response.status_code == 200:
- result = response.json()
- if result['result_code'] == 0:
- eur_order_list = result['result']
- break
- try:
- begin_date = start_time - datetime.timedelta(days=15)
- end_date = start_time + datetime.timedelta(days=15)
- start_timestamp = int(start_time.timestamp())
- now_time = int(time.time())
- more_order_list = []
- total = 0
- all_order_list = order_list + eur_order_list
- count = len(all_order_list)
- zosi_paypal_api = paypalrestsdk.Api(PAYPAL_CRD['Zosi'])
- vsees_paypal_api = paypalrestsdk.Api(PAYPAL_CRD['Vsees'])
- for index, order in enumerate(all_order_list):
- total += float(order['price'])
- if not order['trade_no']:
- more_order_list.append(order['orderID'])
- continue
- if all_order_list.index(order) != index:
- more_order_list.append(order['orderID'])
- continue
- paypal_url = 'v1/reporting/transactions?start_date={}-{}-{}T00:00:00-0000&end_date={}-{}-{}T00:00:00-0000&transaction_id={}&fields=all&page_size=100&page=1'.format(
- begin_date.year, begin_date.month, begin_date.day, end_date.year, end_date.month, end_date.day,
- order['trade_no'])
- if order['app_type'] == 1:
- paypal_order_list = zosi_paypal_api.get(paypal_url)
- elif order['app_type'] == 2:
- paypal_order_list = vsees_paypal_api.get(paypal_url)
- else:
- continue
- if not paypal_order_list['transaction_details']:
- more_order_list.append(order['orderID'])
- total = round(total, 2)
- daily_reconciliation = DailyReconciliation.objects.filter(time=start_timestamp)
- if daily_reconciliation.exists():
- if daily_reconciliation.first().order_ids:
- old_order_list = daily_reconciliation.first().order_ids.split(',')
- more_order_list = list(set(old_order_list) | set(more_order_list))
- order_ids = ','.join(set(more_order_list))
- daily_reconciliation.update(ansjer_total=total, ansjer_num=count, order_ids=order_ids,
- upd_time=now_time)
- else:
- order_ids = ','.join(more_order_list)
- DailyReconciliation.objects.create(order_ids=order_ids, ansjer_total=total, ansjer_num=count,
- time=start_timestamp, creat_time=now_time, upd_time=now_time)
- except Exception as e:
- LOGGER.info('后台每日对账异常:errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
|