123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235 |
- #!/usr/bin/python3.6
- # -*- coding: utf-8 -*-
- #
- # Copyright (C) 2022 #
- # @Time : 2022/4/1 11:27
- # @Author : ming
- # @Email : zhangdongming@asj6.wecom.work
- # @File : CronTaskController.py
- # @Software: PyCharm
- import datetime
- import io
- import json
- import threading
- import time
- import zipfile
- import calendar
- import paypalrestsdk
- import requests
- import csv
- import math
- from django.db import connection, connections, transaction
- from django.db.models import Q, Sum, Count
- from django.views import View
- from Ansjer.config import USED_SERIAL_REDIS_LIST, UNUSED_SERIAL_REDIS_LIST, CONFIG_INFO, CONFIG_US, \
- RESET_REGION_ID_SERIAL_REDIS_LIST, LOGGER, PAYPAL_CRD, CONFIG_EUR, DETECT_PUSH_DOMAINS, ACCESS_KEY_ID, \
- SECRET_ACCESS_KEY, REGION_NAME, CONFIG_CN
- from Model.models import Device_User, Device_Info, UidSetModel, UID_Bucket, Unused_Uid_Meal, Order_Model, StsCrdModel, \
- VodHlsModel, ExperienceContextModel, AiService, VodHlsSummary, VideoPlaybackTimeModel, DeviceUserSummary, \
- CountryModel, DeviceTypeModel, OrdersSummary, DeviceInfoSummary, CompanySerialModel, \
- CloudLogModel, UidCloudStorageCount, UserExModel, DeviceDomainRegionModel, VodHlsTag, VodHlsTagType, IcloudService, \
- Store_Meal, Lang, VodBucketModel, UnicomComboOrderInfo, UnicomDeviceInfo, AbnormalOrder, DailyReconciliation, \
- CustomizedPush, UIDCompanySerialModel, UIDModel, LogModel, OperatingCosts, UidBucketStatistics, AppScannedSerial, \
- SerialUnbindUID, UidUserModel, UidPushModel, iotdeviceInfoModel, ExperienceAiModel
- from Object.AWS.AmazonS3Util import AmazonS3Util
- from Object.AWS.S3Email import S3Email
- from Object.RedisObject import RedisObject
- from Object.ResponseObject import ResponseObject
- from Object.utils import LocalDateTimeUtil
- from Service.CommonService import CommonService
- from Service.EquipmentInfoService import EQUIPMENT_INFO_MODEL_LIST, EquipmentInfoService
- from Service.VodHlsService import SplitVodHlsObject
- from Object.UnicomObject import UnicomObjeect
- from Object.WechatPayObject import WechatPayObject
- from Object.AliPayObject import AliPayObject
- from dateutil.relativedelta import relativedelta
- class CronDelDataView(View):
- def get(self, request, *args, **kwargs):
- request.encoding = 'utf-8'
- operation = kwargs.get('operation')
- return self.validation(request.GET, request, operation)
- def post(self, request, *args, **kwargs):
- request.encoding = 'utf-8'
- operation = kwargs.get('operation')
- return self.validation(request.POST, request, operation)
- def validation(self, request_dict, request, operation):
- response = ResponseObject()
- if operation == 'delAccessLog': # 定时删除访问接口数据
- return self.delAccessLog(response)
- elif operation == 'delPushInfo': # 定时删除推送数据
- return self.delPushInfo(response)
- elif operation == 'delPushInfoV2': # 定时删除推送数据V2
- return self.delPushInfoV2(response)
- elif operation == 'delSysMsg': # 定时删除系统消息数据
- return self.delSysMsg(response)
- elif operation == 'delVodHls': # 定时删除云存播放列表
- return self.delVodHls(response)
- elif operation == 'delCloudLog': # 定时删除云存接口数据
- return self.delCloudLog(response)
- elif operation == 'delTesterDevice': # 定时删除测试账号下的设备数据
- return self.delTesterDevice(response)
- elif operation == 'delAppLog': # 定时删除app日志
- return self.delAppLog(response)
- elif operation == 'delDeviceLog': # 定时删除设备日志
- return self.delDeviceLog(response)
- elif operation == 'UpdateConfiguration': # 定时更新配置
- return self.UpdateConfiguration(response)
- elif operation == 'cloud-log':
- return self.uid_cloud_storage_upload_count(response)
- elif operation == 'delDeviceLog': # 定时删除设备日志
- return self.del_device_log(response)
- elif operation == 'delCampaignsLogs':
- return self.del_campaigns_log(response)
- else:
- return response.json(404)
- @staticmethod
- def UpdateConfiguration(response):
- """
- 定时更新配置
- @param response: 响应对象
- @return:
- """
- try:
- ucode_list = ['823C01552AA',
- '823C01550AA',
- '823C01550XA',
- '823C01850XA',
- '730201350AA',
- '730201350AA',
- '730201450AA',
- '730201450MA',
- '72V201252AA',
- '72V201253AA',
- '72V201353AA',
- '72V201354AA',
- '72V201355AA',
- '72V201254AA',
- 'V82301850AA',
- 'V82301850XA',
- '72V201257AA', '72V201256AA']
- UidSetModel.objects.filter(ucode__in=ucode_list, is_human=0).update(is_human=1)
- ucode_list = ['72V201257AA', '72V201254AA'] # 4G规格码
- UidSetModel.objects.filter(ucode__in=ucode_list, mobile_4g=0).update(mobile_4g=1)
- # 根据设备规格码定时更新默认算法类型类型
- ucode_list = ['823C01552AA', '823C01550AA', '823C01550XA', 'C18201550KA',
- '823C01550TA', '823C01550VA', '823C01850XA', 'C18201850KA',
- '823C01850TA', '823C01850VA', 'C22501850VA']
- UidSetModel.objects.filter(ucode__in=ucode_list, ai_type=0).update(ai_type=47)
- ucode_list = ['730201350AA', '730201450AA', '730201450MA', '730201450NA']
- UidSetModel.objects.filter(ucode__in=ucode_list, ai_type=0).update(ai_type=7)
- ucode_list = ['V82301850AA', 'V82301850XA']
- UidSetModel.objects.filter(ucode__in=ucode_list, ai_type=0).update(ai_type=2031)
- # 根据设备规格码更新默认个性化语音值
- ucode_list = ['823C01552AA', '823C01550XA', 'C18201550KA', '823C01550TA',
- '823C01550VA', '823C01850XA', 'C18201850KA', '823C01850TA', '823C01850VA',
- '730201450AA', '730201450MA', '730201450NA', '72V201252AA', '72V201253AA',
- '72V201353AA', '72V201354AA', '72V201355AA', '72V201254AA', 'C22501850VA',
- 'V82301850AA', 'V82301850XA', '72V201257AA', '72V201256AA']
- UidSetModel.objects.filter(ucode__in=ucode_list, is_custom_voice=0).update(is_custom_voice=1)
- # 根据设备规格码更新is_ai
- ucode_list = ['823C01552AA', '823C01550XA', 'C18201550KA', '823C01550TA',
- '823C01550VA', '823C01850XA', 'C18201850KA', '823C01850TA', '823C01850VA',
- '730201450AA', '730201450MA', '730201450NA', '72V201252AA', '72V201253AA',
- '72V201353AA', '72V201354AA', '72V201355AA', '72V201254AA', 'C22501850VA',
- 'V82301850AA', 'V82301850XA', '72V201257AA', '72V201256AA', '730201350AA']
- UidSetModel.objects.filter(ucode__in=ucode_list, is_ai=2).update(is_ai=1)
- # 根据设备规格码更新alexa
- ucode_list = ['823C01552AA', '823C01550AA', '823C01550XA', '522001352AA',
- '823C01550TA', '823C01550VA', '823C01850XA', 'C18201850KA', '823C01850TA',
- 'C22501850VA', 'V82301850AA', 'V82301850XA', '730201350AA', '72V201252AA',
- '72V201253AA', '72V201353AA', '72V201354AA', '72V201355AA', '72V201256AA']
- UidSetModel.objects.filter(ucode__in=ucode_list, is_alexa=0).update(is_alexa=1)
- # 根据ai类型和设备类型修改516 type
- uid_list = UidSetModel.objects.filter(ai_type=18563).values_list('uid')
- Device_Info.objects.filter(Type=10, UID__in=uid_list).update(Type=24)
- return response.json(0)
- except Exception as e:
- LOGGER.info('UpdateConfiguration异常详情,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def delAppLog(response):
- """
- 定时删除app日志
- @param response: 响应对象
- @return:
- """
- nowTime = int(time.time())
- try:
- cursor = connection.cursor()
- month_ago_time = nowTime - 30 * 24 * 60 * 60 # 保留近30天的数据
- sql = 'DELETE FROM `app_log` WHERE add_time<{}'.format(month_ago_time)
- cursor.execute(sql)
- cursor.close()
- return response.json(0)
- except Exception as e:
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def delDeviceLog(response):
- """
- 定时删除设备日志
- @param response: 响应对象
- @return:
- """
- nowTime = int(time.time())
- size = 5000
- try:
- cursor = connection.cursor()
- month_ago_time = nowTime - 30 * 24 * 60 * 60 # 保留近30天的数据
- month_ago_time_str = CommonService.timestamp_to_str(month_ago_time)
- sql = "DELETE FROM `device_log` WHERE add_time<'{}' LIMIT {}".format(month_ago_time_str, size)
- cursor.execute(sql)
- cursor.close()
- return response.json(0)
- except Exception as e:
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def uid_cloud_storage_upload_count(response):
- try:
- now_time = int(time.time())
- local_time = LocalDateTimeUtil.get_before_days_timestamp(now_time)
- format_str = '%Y-%m-%d'
- date_str = LocalDateTimeUtil.time_stamp_to_time(local_time, format_str)
- start_time, end_time = LocalDateTimeUtil.get_start_and_end_time(date_str, format_str)
- cs_uid_qs = UID_Bucket.objects.filter(addTime__gte=int(1669824000)).values('uid')
- if not cs_uid_qs.exists():
- return response.json(0)
- for item in cs_uid_qs:
- uid = item['uid']
- cloud_log_qs = CloudLogModel.objects.filter(uid=uid, operation=r'cloudstorage/storeplaylist',
- time__gte=start_time, time__lte=end_time)
- cloud_log_qs = cloud_log_qs.values('uid')[0:1]
- if not cloud_log_qs.exists():
- continue
- count_data = {'uid': uid, 'count': cloud_log_qs.count(), 'created_time': end_time,
- 'updated_time': end_time}
- UidCloudStorageCount.objects.create(**count_data)
- return response.json(0)
- except Exception as e:
- LOGGER.info('异常详情,errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- return response.json(500)
- @staticmethod
- def delAccessLog(response):
- try:
- cursor = connection.cursor()
- # 删除7天前的数据
- last_week = LocalDateTimeUtil.get_last_week()
- sql = 'DELETE FROM access_log WHERE time < %s limit %s'
- cursor.execute(sql, [last_week, 10000])
- # 关闭游标
- cursor.close()
- connection.close()
- return response.json(0)
- except Exception as e:
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @classmethod
- def delPushInfo(cls, response):
- now_time = int(time.time())
- try:
- # 当前时间转日期
- local_date_now = str(datetime.datetime.fromtimestamp(int(now_time)).date())
- # 根据日期获取周几
- week_val = LocalDateTimeUtil.date_to_week(local_date_now)
- # 根据当前时间获取7天前时间戳
- expiration_time = LocalDateTimeUtil.get_before_days_timestamp(now_time, 7)
- # 异步删除推送消息
- kwargs = {
- 'week_val': week_val,
- 'expiration_time': expiration_time
- }
- del_push_info_thread = threading.Thread(
- target=cls.del_push_info_data,
- kwargs=kwargs)
- del_push_info_thread.start()
- return response.json(0)
- except Exception as e:
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def del_push_info_data(**kwargs):
- cursor = connections['mysql02'].cursor()
- # 获取删除星期列表
- week_val = kwargs['week_val']
- del_week_val_list = [i for i in range(1, 8)]
- # 移除当天和前后两天
- del_week_val_list.remove(week_val)
- if week_val == 1:
- pre_week_val = 7
- else:
- pre_week_val = week_val - 1
- del_week_val_list.remove(pre_week_val)
- if week_val == 7:
- nex_week_val = 1
- else:
- nex_week_val = week_val + 1
- del_week_val_list.remove(nex_week_val)
- expiration_time = kwargs['expiration_time']
- # 每次删除条数
- size = 5000
- # 删除7天前的数据
- sql = "DELETE FROM equipment_info WHERE addTime<= %s LIMIT %s "
- cursor.execute(sql, [expiration_time, size])
- for week_val in del_week_val_list:
- if week_val == 1:
- sql = "DELETE FROM equipment_info_monday WHERE add_time<= %s LIMIT %s "
- if week_val == 2:
- sql = "DELETE FROM equipment_info_tuesday WHERE add_time<= %s LIMIT %s "
- if week_val == 3:
- sql = "DELETE FROM equipment_info_wednesday WHERE add_time<= %s LIMIT %s "
- if week_val == 4:
- sql = "DELETE FROM equipment_info_thursday WHERE add_time<= %s LIMIT %s "
- if week_val == 5:
- sql = "DELETE FROM equipment_info_friday WHERE add_time<= %s LIMIT %s "
- if week_val == 6:
- sql = "DELETE FROM equipment_info_saturday WHERE add_time<= %s LIMIT %s "
- if week_val == 7:
- sql = "DELETE FROM equipment_info_sunday WHERE add_time<= %s LIMIT %s "
- cursor.execute(sql, [expiration_time, size])
- # 关闭游标
- cursor.close()
- @classmethod
- def delPushInfoV2(cls, response):
- try:
- del_push_info_thread = threading.Thread(
- target=cls.del_push_info_data_v2)
- del_push_info_thread.start()
- return response.json(0)
- except Exception as e:
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def delSysMsg(response):
- try:
- cursor = connections['mysql02'].cursor()
- # 获取90天前时间戳
- now_time = int(time.time())
- expiration_time = LocalDateTimeUtil.get_before_days_timestamp(now_time, 90)
- # 每次删除条数
- size = 5000
- sql = "DELETE FROM sys_msg WHERE addTime< %s LIMIT %s "
- cursor.execute(sql, [expiration_time, size])
- # 关闭游标
- cursor.close()
- return response.json(0)
- except Exception as e:
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def del_push_info_data_v2():
- cursor = connections['mysql02'].cursor()
- # 获取7天前时间戳
- now_time = int(time.time())
- expiration_time = LocalDateTimeUtil.get_before_days_timestamp(now_time, 7)
- # 每次删除条数
- size = 5000
- for i in range(1, len(EQUIPMENT_INFO_MODEL_LIST) + 1):
- sql = "DELETE FROM equipment_info_{} WHERE add_time< %s LIMIT %s ".format(i)
- cursor.execute(sql, [expiration_time, size])
- # 关闭游标
- cursor.close()
- @staticmethod
- def delVodHls(response):
- nowTime = int(time.time())
- try:
- CronDelDataView.del_vod_hls_tag()
- cursor = connection.cursor()
- month_ago_time = nowTime - 3 * 30 * 24 * 60 * 60 # 删除3个月前的数据
- sql = 'DELETE FROM `vod_hls` WHERE endTime<{} LIMIT 50000'.format(month_ago_time)
- cursor.execute(sql)
- cursor.close()
- # 删除vod_hls分表数据
- split_vod_hls_obj = SplitVodHlsObject()
- split_vod_hls_obj.del_vod_hls_data(end_time__lt=month_ago_time)
- return response.json(0)
- except Exception as e:
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def del_vod_hls_tag():
- """
- 删除AI标签记录
- """
- e_time = LocalDateTimeUtil.get_before_days_timestamp(int(time.time()), 30)
- VodHlsTagType.objects.filter(created_time__lt=e_time).delete()
- VodHlsTag.objects.filter(created_time__lt=e_time).delete()
- @staticmethod
- def delCloudLog(response):
- nowTime = int(time.time())
- cursor = connection.cursor()
- try:
- # 删除3个月前的数据
- sql = "DELETE FROM `cloud_log` WHERE time<={} LIMIT 50000".format(
- nowTime - 3 * 30 * 24 * 60 * 60)
- cursor.execute(sql)
- # 关闭游标
- cursor.close()
- return response.json(0)
- except Exception as e:
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def delTesterDevice(response):
- try:
- userID_list = [
- 'tech01@ansjer.com',
- 'tech02@ansjer.com',
- 'tech03@ansjer.com',
- 'tech04@ansjer.com',
- 'tech05@ansjer.com',
- 'tech06@ansjer.com',
- 'tech07@ansjer.com',
- 'tech08@ansjer.com',
- 'tech09@ansjer.com',
- 'tech10@ansjer.com',
- 'fix01@ansjer.com',
- 'fix02@ansjer.com',
- 'fix03@ansjer.com',
- 'fix04@ansjer.com',
- 'fix05@ansjer.com']
- device_user = Device_User.objects.filter(username__in=userID_list)
- device_info_qs = Device_Info.objects.filter(
- userID__in=device_user).values('UID')
- uid_list = []
- for device_info in device_info_qs:
- uid_list.append(device_info['UID'])
- with transaction.atomic():
- # 删除设备云存相关数据
- UidSetModel.objects.filter(uid__in=uid_list).delete()
- UID_Bucket.objects.filter(uid__in=uid_list).delete()
- Unused_Uid_Meal.objects.filter(uid__in=uid_list).delete()
- # Order_Model.objects.filter(UID__in=uid_list).delete()
- StsCrdModel.objects.filter(uid__in=uid_list).delete()
- VodHlsModel.objects.filter(uid__in=uid_list).delete()
- # 删除vod_hls分表数据
- split_vod_hls_obj = SplitVodHlsObject()
- split_vod_hls_obj.del_vod_hls_data(uid__in=uid_list)
- ExperienceContextModel.objects.filter(uid__in=uid_list).delete()
- Device_Info.objects.filter(userID__in=device_user).delete()
- return response.json(0)
- except Exception as e:
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def del_device_log(response):
- """
- 定时删除设备日志
- @param response: 响应对象
- @return:
- """
- nowTime = int(time.time())
- try:
- cursor = connection.cursor()
- expired_time = nowTime - 7 * 24 * 60 * 60 # 保留近7天的数据
- sql = 'DELETE FROM `device_log` WHERE unix_timestamp(add_time)<{}'.format(expired_time)
- cursor.execute(sql)
- cursor.close()
- return response.json(0)
- except Exception as e:
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def del_campaigns_log(response):
- """
- 定时删除广告日志
- @param response: 响应对象
- @return:
- """
- nowTime = int(time.time())
- try:
- cursor = connection.cursor()
- month_ago_time = nowTime - 90 * 24 * 60 * 60 # 保留近90天的数据
- sql = f'DELETE FROM `open_screen_campaign` WHERE update_time<{month_ago_time}'
- cursor.execute(sql)
- cursor.close()
- return response.json(0)
- except Exception as e:
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- class CronUpdateDataView(View):
- def get(self, request, *args, **kwargs):
- request.encoding = 'utf-8'
- operation = kwargs.get('operation')
- return self.validation(request.GET, request, operation)
- def post(self, request, *args, **kwargs):
- request.encoding = 'utf-8'
- operation = kwargs.get('operation')
- return self.validation(request.POST, request, operation)
- def validation(self, request_dict, request, operation):
- response = ResponseObject()
- if operation == 'updateUnusedUidBucket': # 定时更新过期云存关联的未使用套餐状态
- return self.updateUnusedUidBucket(response)
- elif operation == 'updateUnusedAiService': # 定时更新过期ai关联的未使用套餐状态
- return self.updateUnusedAiService(response)
- elif operation == 'updateIcloudService': # 定时更新过期云盘套餐使用状态
- return self.updateIcloudService(response)
- elif operation == 'reqUpdateSerialStatus': # 定时请求更新序列号状态
- return self.reqUpdateSerialStatus(response)
- elif operation == 'updateSerialStatus': # 更新序列号状态
- return self.updateSerialStatus(request_dict, response)
- elif operation == 'deleteUidData': # 清除uid数据
- return self.deleteUidData(request_dict, response)
- elif operation == 'reset-region-id': # 重置地区id
- return self.reset_region_id(request_dict, response)
- elif operation == 'updateVodMeal': # 定时修改体验套餐有效期为1个月
- return self.update_vod_meal(request_dict, response)
- elif operation == 'checkCustomizedPush': # 定时检查定制化推送,重新执行没有推送成功的请求
- return self.check_customized_push(response)
- else:
- return response.json(404)
- @staticmethod
- def updateUnusedUidBucket(response):
- """
- 监控云存套餐过期修改状态
- @param response:
- @return:
- """
- # 定时更新已过期套餐修改状态为2
- now_time = int(time.time())
- expired_uid_bucket = UID_Bucket.objects.filter(endTime__lte=now_time)
- expired_uid_bucket = expired_uid_bucket.filter(~Q(use_status=2)).values('id')
- if expired_uid_bucket.exists():
- # 如果没有未使用套餐,或下个未使用套餐不是AI云存套餐,mqtt下发停用AI指令
- # 下个未使用套餐是AI云存套餐,mqtt下发启用AI指令
- for bucket in expired_uid_bucket:
- uid_bucket = UID_Bucket.objects.get(id=bucket['id'])
- uid = uid_bucket.uid
- # 存在序列号则为使用序列号作为物品名
- thing_name = CommonService.query_serial_with_uid(uid)
- topic_name = 'ansjer/generic/{}'.format(thing_name)
- msg = {'commandType': 'AIDisable'}
- if not uid_bucket.has_unused:
- CommonService.req_publish_mqtt_msg(thing_name, topic_name, msg)
- else:
- next_unused = Unused_Uid_Meal.objects.filter(uid=uid).order_by('addTime').first()
- if next_unused is None or not getattr(next_unused, 'is_ai', 0):
- CommonService.req_publish_mqtt_msg(thing_name, topic_name, msg)
- else:
- msg = {'commandType': 'AIEnable'}
- CommonService.req_publish_mqtt_msg(thing_name, topic_name, msg)
- expired_uid_bucket.update(use_status=2)
- # 监控有未使用套餐则自动生效
- expired_uid_buckets = UID_Bucket.objects.filter(endTime__lte=now_time, has_unused=1).values("id", "uid")[0:1000]
- for expired_uid_bucket in expired_uid_buckets:
- unuseds = Unused_Uid_Meal.objects.filter(
- uid=expired_uid_bucket['uid']).values(
- "id",
- "uid",
- "channel",
- "addTime",
- "expire",
- "is_ai",
- "bucket_id",
- "order_id").order_by('addTime')
- if not unuseds.exists():
- continue
- unused = unuseds[0]
- try:
- with transaction.atomic():
- count_unused = Unused_Uid_Meal.objects.filter(uid=expired_uid_bucket['uid']).count()
- has_unused = 1 if count_unused > 1 else 0
- end_time = CommonService.calcMonthLater(unused['expire'])
- UID_Bucket.objects.filter(
- uid=expired_uid_bucket['uid']).update(
- channel=unused['channel'],
- endTime=end_time,
- bucket_id=unused['bucket_id'],
- updateTime=now_time,
- use_status=1,
- has_unused=has_unused,
- orderId=unused['order_id'])
- if unused['is_ai']:
- ai_service = AiService.objects.filter(uid=expired_uid_bucket['uid'], channel=unused['channel'])
- if ai_service.exists():
- ai_service.update(updTime=now_time, use_status=1, orders_id=unused['order_id'],
- endTime=end_time)
- else:
- AiService.objects.create(uid=expired_uid_bucket['uid'], channel=unused['channel'],
- detect_status=1, addTime=now_time, orders_id=unused['order_id'],
- updTime=now_time, endTime=end_time, use_status=1)
- Unused_Uid_Meal.objects.filter(id=unused['id']).delete()
- StsCrdModel.objects.filter(uid=expired_uid_bucket['uid']).delete() # 删除sts记录
- except Exception as e:
- print(repr(e))
- continue
- return response.json(0)
- @staticmethod
- def updateUnusedAiService(response):
- now_time = int(time.time())
- ai_service_qs = AiService.objects.filter(
- endTime__lte=now_time,
- use_status=1).values(
- 'id',
- 'uid')[
- 0:200]
- for ai_service in ai_service_qs:
- try:
- with transaction.atomic():
- AiService.objects.filter(
- id=ai_service['id']).update(
- use_status=2) # 更新过期ai订单状态
- # 如果存在未使用套餐,更新为使用
- unused_ai_service = AiService.objects.filter(
- uid=ai_service['uid'],
- use_status=0).order_by('addTime')[
- :1].values(
- 'id',
- 'endTime')
- if unused_ai_service.exists():
- # 未使用套餐的endTime在购买的时候保存为有效时间
- effective_day = unused_ai_service[0]['endTime']
- endTime = now_time + effective_day
- AiService.objects.filter(
- id=unused_ai_service[0]['id']).update(
- use_status=1, endTime=endTime, updTime=now_time)
- except Exception:
- continue
- return response.json(0)
- @staticmethod
- def updateIcloudService(response):
- """
- 监控云盘套餐过期修改状态
- @param response:
- @return:
- """
- # 定时更新已过期套餐修改状态为2
- now_time = int(time.time())
- try:
- IcloudService.objects.filter(Q(end_time__lte=now_time), ~Q(end_time=0),
- ~Q(use_status=1)).update(use_status=1)
- return response.json(0)
- except Exception as e:
- return response.json(500)
- @classmethod
- def reqUpdateSerialStatus(cls, response):
- redis_obj = RedisObject()
- # 更新已使用序列号其他服务器的状态
- used_serial_redis_list = redis_obj.lrange(USED_SERIAL_REDIS_LIST, 0, -1) # 读取redis已使用序列号
- if used_serial_redis_list:
- LOGGER.info('---请求更新已使用序列号列表---used_serial_redis_list:{}'.format(used_serial_redis_list))
- used_serial_redis_list = [str(i, 'utf-8') for i in used_serial_redis_list]
- cls.do_request_function(used_serial_redis_list, 3)
- # 更新未使用序列号其他服务器的状态
- unused_serial_redis_list = redis_obj.lrange(UNUSED_SERIAL_REDIS_LIST, 0, -1) # 读取redis未使用序列号
- if unused_serial_redis_list:
- LOGGER.info('---请求更新未使用序列号列表---unused_serial_redis_list:{}'.format(unused_serial_redis_list))
- unused_serial_redis_list = [str(i, 'utf-8') for i in unused_serial_redis_list]
- cls.do_request_function(unused_serial_redis_list, 1)
- # 重置地区id
- reset_region_id_serial_redis_list = redis_obj.lrange(RESET_REGION_ID_SERIAL_REDIS_LIST, 0, -1) # 读取redis未使用序列号
- if reset_region_id_serial_redis_list:
- LOGGER.info('---请求重置地区id的序列号列表---:{}'.format(reset_region_id_serial_redis_list))
- reset_region_id_serial_redis_list = [str(i, 'utf-8') for i in reset_region_id_serial_redis_list]
- cls.do_request_reset_region_id(reset_region_id_serial_redis_list)
- return response.json(0)
- @staticmethod
- def do_request_function(serial_redis_list, status):
- """
- 请求更新序列号状态
- @param serial_redis_list: 序列号redis列表
- @param status: 状态, 1: 未使用, 3: 已占用
- """
- data = {
- 'serial_redis_list': str(serial_redis_list),
- 'status': status
- }
- # 确认域名列表
- orders_domain_name_list = CommonService.get_orders_domain_name_list()
- redis_obj = RedisObject()
- LOGGER.info('---请求更新序列号线程---data:{},orders_domain_name_list:{}'.format(data, orders_domain_name_list))
- try:
- requests_failed_flag = False # 请求失败标志位
- for domain_name in orders_domain_name_list:
- url = '{}cron/update/updateSerialStatus'.format(domain_name)
- response = requests.post(url=url, data=data, timeout=5)
- LOGGER.info('---请求更新序列号响应时间---:{}'.format(response.elapsed.total_seconds()))
- result = response.json()
- if result['result_code'] != 0: # 请求失败标志位置位
- requests_failed_flag = True
- break
- # 状态为未使用,重置扫码记录和美洲服的地区id
- if status == 1:
- # 扫码记录
- AppScannedSerial.objects.filter(serial__in=serial_redis_list).delete()
- # 地区id
- # 美洲服直接更新
- if CONFIG_INFO == CONFIG_US:
- DeviceDomainRegionModel.objects.filter(~Q(region_id=0), serial_number__in=serial_redis_list). \
- update(region_id=0)
- # 其他服请求到美洲服更新
- else:
- req_url = 'https://www.dvema.com/cron/update/reset-region-id'
- req_data = {
- 'serial_redis_list': str(serial_redis_list)
- }
- response = requests.post(url=req_url, data=req_data, timeout=5)
- LOGGER.info('---请求重置地区id响应时间---:{}'.format(response.elapsed.total_seconds()))
- result = response.json()
- if result['result_code'] != 0: # 请求失败标志位置位
- requests_failed_flag = True
- break
- if not requests_failed_flag: # 请求成功删除redis序列号
- if status == 1:
- for i in serial_redis_list:
- redis_obj.lrem(UNUSED_SERIAL_REDIS_LIST, 0, i)
- elif status == 3:
- for i in serial_redis_list:
- redis_obj.lrem(USED_SERIAL_REDIS_LIST, 0, i)
- except Exception as e:
- LOGGER.info('---更新序列号状态异常---:{}'.format(repr(e)))
- @staticmethod
- def do_request_reset_region_id(reset_region_id_serial_redis_list):
- """
- 请求重置地区id
- @param reset_region_id_serial_redis_list: 序列号redis列表
- """
- redis_obj = RedisObject()
- requests_failed_flag = False # 请求失败标志位
- data = {
- 'serial_redis_list': str(reset_region_id_serial_redis_list),
- }
- url = 'https://www.dvema.com/cron/update/reset-region-id'
- try:
- response = requests.post(url=url, data=data, timeout=5)
- result = response.json()
- if result['result_code'] != 0: # 请求失败标志位置位
- requests_failed_flag = True
- if not requests_failed_flag: # 请求成功删除redis序列号
- for serial in reset_region_id_serial_redis_list:
- redis_obj.lrem(RESET_REGION_ID_SERIAL_REDIS_LIST, 0, serial)
- except Exception as e:
- LOGGER.info('---请求重置地区id异常---:{}'.format(repr(e)))
- @staticmethod
- def updateSerialStatus(request_dict, response):
- """
- 更新序列号状态
- @param request_dict: 请求参数
- @request_dict serial_redis_list: 序列号redis列表
- @request_dict status: 状态, 1: 未使用, 3: 已占用
- @param response: 响应对象
- """
- serial_redis_list = request_dict.get('serial_redis_list', None)
- status = request_dict.get('status', None)
- LOGGER.info('---更新序列号状态参数---serial_redis_list:{},status:{}'.format(serial_redis_list, status))
- if not all([serial_redis_list, status]):
- return response.json(444)
- now_time = int(time.time())
- try:
- serial_redis_list = eval(serial_redis_list)
- CompanySerialModel.objects.filter(serial_number__in=serial_redis_list).update(status=int(status),
- update_time=now_time)
- uid_serial_qs = UIDCompanySerialModel.objects.filter(company_serial__serial_number__in=serial_redis_list)
- if uid_serial_qs:
- uid_list = list(uid_serial_qs.values_list('uid__uid', flat=True))
- serial_list = list(uid_serial_qs.values_list('company_serial__serial_number', flat=True))
- UIDModel.objects.filter(uid__in=uid_list).update(status=3, mac='', update_time=now_time)
- uid_serial_qs.delete()
- # 记录操作日志
- content = json.loads(json.dumps(request_dict))
- log = {
- 'ip': '127.0.0.1',
- 'user_id': 1,
- 'status': 200,
- 'time': now_time,
- 'content': json.dumps(content),
- 'url': 'cron/update/updateSerialStatus',
- 'operation': '序列号{}解绑uid: {}'.format(serial_list, uid_list),
- }
- LogModel.objects.create(**log)
- return response.json(0)
- except Exception as e:
- LOGGER.info('---更新序列号状态异常---:{}'.format(repr(e)))
- return response.json(500)
- @staticmethod
- def deleteUidData(request_dict, response):
- """
- 清除uid数据
- @param request_dict: 请求参数
- @request_dict serial_redis_list: 序列号redis列表
- @param response: 响应对象
- """
- serial_unbind_uid_qs = SerialUnbindUID.objects.filter(status=0).values('serial', 'uid')
- # 没有需要清除的数据直接返回
- if not serial_unbind_uid_qs.exists():
- return response.json(0)
- # 获取序列号,uid列表
- serial_list, uid_list = [], []
- for serial_unbind_uid in serial_unbind_uid_qs:
- serial_list.append(serial_unbind_uid['serial'])
- uid_list.append(serial_unbind_uid['uid'])
- now_time = int(time.time())
- redis_obj = RedisObject()
- try:
- with transaction.atomic():
- # 更新序列号解绑uid表状态
- serial_unbind_uid_qs.update(status=1, updated_time=now_time)
- # 更新序列号状态
- CompanySerialModel.objects.filter(serial_number__in=serial_list).update(status=1, update_time=now_time)
- UIDCompanySerialModel.objects.filter(company_serial__serial_number__in=serial_list).delete()
- # 删除设备相关数据,参考后台的设备重置删除的数据
- Device_Info.objects.filter(UID__in=uid_list).delete()
- UidSetModel.objects.filter(uid__in=uid_list).delete()
- UidUserModel.objects.filter(UID__in=uid_list).delete()
- UidPushModel.objects.filter(uid_set__uid__in=uid_list).delete()
- iotdeviceInfoModel.objects.filter(serial_number__in=serial_list).delete()
- # 删除推送消息
- EquipmentInfoService.delete_all_equipment_info(device_uid__in=uid_list)
- # 重置设备云存
- UID_Bucket.objects.filter(uid__in=uid_list).delete()
- Unused_Uid_Meal.objects.filter(uid__in=uid_list).delete()
- # Order_Model.objects.filter(UID__in=uid_list).delete()
- StsCrdModel.objects.filter(uid__in=uid_list).delete()
- VodHlsModel.objects.filter(uid__in=uid_list).delete()
- # 删除vod_hls分表数据
- split_vod_hls_obj = SplitVodHlsObject()
- split_vod_hls_obj.del_vod_hls_data(uid__in=uid_list)
- ExperienceContextModel.objects.filter(uid__in=uid_list).delete()
- # 重置AI
- ExperienceAiModel.objects.filter(uid__in=uid_list).delete()
- AiService.objects.filter(uid__in=uid_list).delete()
- # 写入未使用序列号redis列表
- redis_obj.rpush_list(UNUSED_SERIAL_REDIS_LIST, serial_list)
- # 重置region_id,不为美洲服,则写入redis列表
- if CONFIG_INFO == CONFIG_US:
- DeviceDomainRegionModel.objects.filter(serial_number__in=serial_list).update(region_id=0)
- else:
- redis_obj.rpush_list(RESET_REGION_ID_SERIAL_REDIS_LIST, serial_list)
- # 重置已使用的uid的使用状态为未使用,更新时间
- UIDModel.objects.filter(uid__in=uid_list, status=2).update(status=0, mac='', update_time=now_time)
- # 重置扫码记录
- AppScannedSerial.objects.filter(serial__in=serial_list).delete()
- # 记录操作日志
- end_time = int(time.time())
- log = {
- 'user_id': 1,
- 'status': 200,
- 'time': now_time,
- 'url': 'cron/update/deleteUidData',
- 'operation': '已解绑序列号{}清除uid{}成功,执行时间{}秒'.format(serial_list, uid_list, end_time-now_time)
- }
- LogModel.objects.create(**log)
- return response.json(0)
- except Exception as e:
- # 记录操作日志
- log = {
- 'user_id': 1,
- 'status': 200,
- 'time': now_time,
- 'url': 'cron/update/deleteUidData',
- 'operation': '已解绑序列号{}清除uid{}异常:{}'.format(serial_list, uid_list, repr(e))
- }
- LogModel.objects.create(**log)
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def reset_region_id(request_dict, response):
- """
- 重置地区id
- @param request_dict: 请求参数
- @request_dict serial_redis_list: 序列号redis列表
- @param response: 响应对象
- """
- serial_redis_list = request_dict.get('serial_redis_list', None)
- LOGGER.info('---重置地区id参数---serial_redis_list:{}'.format(serial_redis_list))
- if not serial_redis_list:
- return response.json(444)
- try:
- serial_redis_list = eval(serial_redis_list)
- DeviceDomainRegionModel.objects.filter(serial_number__in=serial_redis_list).update(region_id=0)
- return response.json(0)
- except Exception as e:
- LOGGER.info('---重置地区id异常---:{}'.format(repr(e)))
- return response.json(500)
- @staticmethod
- def update_vod_meal(request_dict, response):
- """
- 定时修改体验套餐有效期为1个月
- @param request_dict: 请求参数
- @param response: 响应对象
- """
- try:
- Store_Meal.objects.filter(is_show=0, expire=12, pixel_level=0).update(price='39.99',
- virtual_price='56.6',
- sort=1)
- Store_Meal.objects.filter(is_show=0, cycle_config_id=1, pixel_level=0).update(price='3.65',
- virtual_price='5.66',
- sort=2)
- Store_Meal.objects.filter(id=12).update(price='3.99', virtual_price='5.66', sort=3)
- Store_Meal.objects.filter(id__in=(16, 17, 18)).update(is_show=0)
- return response.json(0)
- except Exception as e:
- LOGGER.info('---修改云存套餐内容异常---:{}'.format(repr(e)))
- return response.json(500)
- @staticmethod
- def check_customized_push(response):
- try:
- now_time = int(time.time())
- # 查询推送时间小于当前时间且推送状态为待推送的数据
- customized_push_qs = CustomizedPush.objects.filter(push_timestamp__lt=now_time, push_satus=0).values('id')
- if customized_push_qs.exists():
- for customized_push in customized_push_qs:
- customized_push_id = customized_push['id']
- data = {'customized_push_id': customized_push_id}
- url = DETECT_PUSH_DOMAINS + 'customized_push/start'
- req = requests.post(url=url, data=data, timeout=8)
- return response.json(0)
- except Exception as e:
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- class CronCollectDataView(View):
- def get(self, request, *args, **kwargs):
- request.encoding = 'utf-8'
- operation = kwargs.get('operation')
- return self.validation(request.GET, request, operation)
- def post(self, request, *args, **kwargs):
- request.encoding = 'utf-8'
- operation = kwargs.get('operation')
- return self.validation(request.POST, request, operation)
- def validation(self, request_dict, request, operation):
- response = ResponseObject()
- if operation == 'collectPlayBack': # 定时保存云存视频回放
- return self.collect_play_back(response)
- elif operation == 'collectDeviceUser': # 定时保存用户数据
- return self.collect_device_user(response)
- elif operation == 'collectActivityUser': # 定时保存用户数据
- return self.collect_activity_user(response)
- elif operation == 'collectOrder': # 定时保存订单数据
- return self.collect_order(response)
- elif operation == 'collectIcloudOrder': # 定时保存云盘订单数据
- return self.collect_icloud_order(response)
- elif operation == 'collectDeviceInfo': # 定时保存设备数据
- return self.collect_device_info(response)
- elif operation == 'collectFlowInfo': # 定时保存设备数据
- return self.collect_flow_info(response)
- elif operation == 'collectOperatingCosts': # 定时运营成本
- return self.collect_operating_costs(response)
- elif operation == 'collectObjSize': # 定时设备s3存储量
- return self.collect_obj_size(response)
- elif operation == 'checkRequest': # 定时检查各个服的https请求
- return self.check_request(response)
- else:
- return response.json(404)
- @staticmethod
- def collect_play_back(response):
- try:
- now_time = int(time.time())
- today = datetime.datetime.today()
- start_time = datetime.datetime(today.year, today.month, today.day)
- end_time = start_time + datetime.timedelta(days=1)
- start_time = CommonService.str_to_timestamp(start_time.strftime('%Y-%m-%d %H:%M:%S'))
- end_time = CommonService.str_to_timestamp(end_time.strftime('%Y-%m-%d %H:%M:%S'))
- this_month_str = datetime.datetime(today.year, today.month, 1)
- this_month_stamp = CommonService.str_to_timestamp(this_month_str.strftime('%Y-%m-%d %H:%M:%S'))
- video_play_back_time_qs = VideoPlaybackTimeModel.objects.filter(startTime__gte=start_time,
- startTime__lt=end_time,
- playMode='cloud').values('uid').annotate(
- play_duration=Sum('duration'), play_frequency=Count('uid'))
- with transaction.atomic():
- for item in video_play_back_time_qs:
- vod_hls_summary_qs = VodHlsSummary.objects.filter(uid=item['uid'], time=this_month_stamp)
- if vod_hls_summary_qs.exists():
- vod_hls_summary = vod_hls_summary_qs.first()
- vod_hls_summary.play_duration += item['play_duration']
- vod_hls_summary.play_frequency += 1
- vod_hls_summary.updated_time = now_time
- vod_hls_summary.save()
- else:
- VodHlsSummary.objects.create(uid=item['uid'], time=this_month_stamp, created_time=now_time,
- play_duration=item['play_duration'], play_frequency=1,
- updated_time=now_time)
- return response.json(0)
- except Exception as e:
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def collect_device_user(response):
- try:
- created_time = int(time.time())
- today = datetime.datetime.today()
- start_time = datetime.datetime(today.year, today.month, today.day)
- end_time = start_time + datetime.timedelta(days=1)
- increase_user_qs = Device_User.objects.filter(data_joined__year=today.year, data_joined__month=today.month,
- data_joined__day=today.day).values('region_country')
- start_time = CommonService.str_to_timestamp(start_time.strftime('%Y-%m-%d %H:%M:%S'))
- end_time = CommonService.str_to_timestamp(end_time.strftime('%Y-%m-%d %H:%M:%S'))
- active_user_qs = UserExModel.objects.filter(updTime__gte=start_time, updTime__lt=end_time).values(
- 'userID__region_country')
- country_qs = CountryModel.objects.all().values('id', 'region__name', 'country_name')
- country_dict = {}
- continent_dict = {}
- for item in country_qs:
- country_dict[item['id']] = item['country_name']
- continent_dict[item['country_name']] = item['region__name']
- with transaction.atomic():
- if increase_user_qs.exists():
- increase_user_count = increase_user_qs.count()
- increase_user_country_list = increase_user_qs.values('region_country').annotate(
- count=Count('region_country')).order_by('count')
- increase_user_country_dict = {}
- increase_user_continent_dict = {}
- for item in increase_user_country_list:
- country_name = country_dict.get(item['region_country'], '未知国家')
- continent_name = continent_dict.get(country_name, '未知大洲')
- increase_user_country_dict[country_name] = item['count']
- if continent_name not in increase_user_continent_dict:
- increase_user_continent_dict[continent_name] = 0
- increase_user_continent_dict[continent_name] += item['count']
- DeviceUserSummary.objects.create(time=start_time, count=increase_user_count,
- country=increase_user_country_dict, created_time=created_time,
- continent=increase_user_continent_dict)
- if active_user_qs.exists():
- active_user_count = active_user_qs.count()
- active_user_country_list = active_user_qs.values('userID__region_country').annotate(
- count=Count('userID__region_country')).order_by('count')
- active_user_country_dict = {}
- active_user_continent_dict = {}
- for item in active_user_country_list:
- country_name = country_dict.get(item['userID__region_country'], '未知国家')
- continent_name = continent_dict.get(country_name, '未知大洲')
- active_user_country_dict[country_name] = item['count']
- if continent_name not in active_user_continent_dict:
- active_user_continent_dict[continent_name] = 0
- active_user_continent_dict[continent_name] += item['count']
- user_qs = DeviceUserSummary.objects.filter(time=start_time, query_type=1)
- if user_qs.exists():
- user_qs.update(count=active_user_count, country=active_user_country_dict,
- continent=active_user_continent_dict)
- else:
- DeviceUserSummary.objects.create(time=start_time, query_type=1, count=active_user_count,
- country=active_user_country_dict, created_time=created_time,
- continent=active_user_continent_dict)
- return response.json(0)
- except Exception as e:
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def collect_order(response):
- try:
- created_time = int(time.time())
- today = datetime.datetime.today()
- start_time = datetime.datetime(today.year, today.month, today.day)
- end_time = start_time + datetime.timedelta(days=1)
- start_time = CommonService.str_to_timestamp(start_time.strftime('%Y-%m-%d %H:%M:%S'))
- end_time = CommonService.str_to_timestamp(end_time.strftime('%Y-%m-%d %H:%M:%S'))
- order_qs = Order_Model.objects.filter(addTime__gte=start_time, addTime__lt=end_time,
- status=1).values('UID', 'order_type',
- 'store_meal_name', 'price',
- 'addTime', 'currency').order_by(
- 'addTime')
- uid_list = []
- all_order_qs = Order_Model.objects.filter(addTime__lt=start_time, status=1).values('UID')
- for item in all_order_qs:
- if item['UID'] not in uid_list:
- uid_list.append(item['UID'])
- # 国家表数据
- country_qs = CountryModel.objects.values('id', 'country_name')
- country_dict = {}
- for item in country_qs:
- country_dict[item['id']] = item['country_name']
- # 设备类型数据
- device_type_qs = DeviceTypeModel.objects.values('name', 'type')
- device_type_dict = {}
- for item in device_type_qs:
- device_type_dict[item['type']] = item['name']
- with transaction.atomic():
- for item in order_qs:
- is_pay = 0
- price = float(item['price'])
- currency = item['currency']
- uid_set_qs = UidSetModel.objects.filter(uid=item['UID']).values('tb_country')
- country_id = uid_set_qs[0]['tb_country'] if uid_set_qs.exists() else 0
- country_name = country_dict.get(country_id, '未知国家')
- order_type = item['order_type']
- if order_type == '4' or order_type == 4:
- continue
- device_info_qs = Device_Info.objects.filter(UID=item['UID']).values('Type')
- device_type_id = device_info_qs[0]['Type'] if device_info_qs.exists() else 0
- device_type_name = device_type_dict.get(device_type_id, '未知设备')
- store_meal_name = item['store_meal_name']
- add_time_stamp = item['addTime']
- add_time_str = datetime.datetime.fromtimestamp(int(add_time_stamp))
- add_time_str = datetime.datetime(add_time_str.year, add_time_str.month, add_time_str.day)
- add_time_stamp = CommonService.str_to_timestamp(add_time_str.strftime('%Y-%m-%d %H:%M:%S'))
- if price == 0:
- is_pay = 1
- order_summary_qs = OrdersSummary.objects.filter(time=add_time_stamp, query_type=1,
- service_type=order_type)
- else:
- order_summary_qs = OrdersSummary.objects.filter(time=add_time_stamp, query_type=0,
- service_type=order_type)
- if item['UID'] not in uid_list:
- pay_order_summary_qs = OrdersSummary.objects.filter(time=add_time_stamp, query_type=2,
- service_type=order_type)
- query_type = 2
- else:
- pay_order_summary_qs = OrdersSummary.objects.filter(time=add_time_stamp, query_type=3,
- service_type=order_type)
- query_type = 3
- if pay_order_summary_qs.exists():
- pay_order_summary = pay_order_summary_qs.first()
- pay_order_summary.count += 1
- temp_total = eval(pay_order_summary.total)
- if currency not in temp_total:
- temp_total[currency] = price
- else:
- temp_total[currency] = round(temp_total[currency] + price, 2)
- pay_order_summary.total = temp_total
- country_temp_dict = eval(pay_order_summary.country)
- if country_name in country_temp_dict:
- country_temp_dict[country_name]['数量'] += 1
- if currency not in country_temp_dict[country_name]:
- country_temp_dict[country_name][currency] = price
- else:
- country_temp_dict[country_name][currency] = round(
- country_temp_dict[country_name][currency] + price, 2)
- else:
- country_temp_dict[country_name] = {'数量': 1, currency: price}
- pay_order_summary.country = country_temp_dict
- device_type_temp_dict = eval(pay_order_summary.device_type)
- if device_type_name in device_type_temp_dict:
- device_type_temp_dict[device_type_name]['数量'] += 1
- if currency not in device_type_temp_dict[device_type_name]:
- device_type_temp_dict[device_type_name][currency] = price
- else:
- device_type_temp_dict[device_type_name][currency] = round(
- device_type_temp_dict[device_type_name][currency] + price, 2)
- else:
- device_type_temp_dict[device_type_name] = {'数量': 1, currency: price}
- pay_order_summary.device_type = device_type_temp_dict
- store_meal_temp_dict = eval(pay_order_summary.store_meal)
- if store_meal_name in store_meal_temp_dict:
- store_meal_temp_dict[store_meal_name]['数量'] += 1
- if currency not in store_meal_temp_dict[store_meal_name]:
- store_meal_temp_dict[store_meal_name][currency] = price
- else:
- store_meal_temp_dict[store_meal_name][currency] = round(
- store_meal_temp_dict[store_meal_name][currency] + price, 2)
- else:
- store_meal_temp_dict[store_meal_name] = {'数量': 1, currency: price}
- pay_order_summary.store_meal = store_meal_temp_dict
- pay_order_summary.save()
- else:
- final_total = {currency: price}
- country_temp_dict = {
- country_name: {
- '数量': 1,
- currency: price
- }
- }
- device_type_temp_dict = {
- device_type_name: {
- '数量': 1,
- currency: price
- }
- }
- store_meal_temp_dict = {
- store_meal_name: {
- '数量': 1,
- currency: price
- }
- }
- OrdersSummary.objects.create(time=add_time_stamp, count=1, query_type=query_type,
- service_type=order_type, total=final_total,
- country=country_temp_dict, created_time=created_time,
- device_type=device_type_temp_dict,
- store_meal=store_meal_temp_dict)
- if order_summary_qs.exists():
- order_summary = order_summary_qs.first()
- order_summary.count += 1
- temp_total = eval(order_summary.total)
- if currency not in temp_total:
- temp_total[currency] = price
- else:
- temp_total[currency] = round(temp_total[currency] + price, 2)
- order_summary.total = temp_total
- country_temp_dict = eval(order_summary.country)
- if country_name in country_temp_dict:
- if is_pay == 0:
- country_temp_dict[country_name]['数量'] += 1
- if currency not in country_temp_dict[country_name]:
- country_temp_dict[country_name][currency] = price
- else:
- country_temp_dict[country_name][currency] = round(
- country_temp_dict[country_name][currency] + price, 2)
- else:
- country_temp_dict[country_name] += 1
- else:
- if is_pay == 0:
- country_temp_dict[country_name] = {'数量': 1, currency: price}
- else:
- country_temp_dict[country_name] = 1
- order_summary.country = country_temp_dict
- device_type_temp_dict = eval(order_summary.device_type)
- if device_type_name in device_type_temp_dict:
- if is_pay == 0:
- device_type_temp_dict[device_type_name]['数量'] += 1
- if currency not in device_type_temp_dict[device_type_name]:
- device_type_temp_dict[device_type_name][currency] = price
- else:
- device_type_temp_dict[device_type_name][currency] = round(
- device_type_temp_dict[device_type_name][currency] + price, 2)
- else:
- device_type_temp_dict[device_type_name] += 1
- else:
- if is_pay == 0:
- device_type_temp_dict[device_type_name] = {'数量': 1, currency: price}
- else:
- device_type_temp_dict[device_type_name] = 1
- order_summary.device_type = device_type_temp_dict
- store_meal_temp_dict = eval(order_summary.store_meal)
- if store_meal_name in store_meal_temp_dict:
- if is_pay == 0:
- store_meal_temp_dict[store_meal_name]['数量'] += 1
- if currency not in store_meal_temp_dict[store_meal_name]:
- store_meal_temp_dict[store_meal_name][currency] = price
- else:
- store_meal_temp_dict[store_meal_name][currency] = round(
- store_meal_temp_dict[store_meal_name][currency] + price, 2)
- else:
- store_meal_temp_dict[store_meal_name] += 1
- else:
- if is_pay == 0:
- store_meal_temp_dict[store_meal_name] = {'数量': 1, currency: price}
- else:
- store_meal_temp_dict[store_meal_name] = 1
- order_summary.store_meal = store_meal_temp_dict
- order_summary.save()
- else:
- final_total = {currency: price}
- if is_pay == 0:
- country_temp_dict = {
- country_name: {
- '数量': 1,
- currency: price
- }
- }
- device_type_temp_dict = {
- device_type_name: {
- '数量': 1,
- currency: price
- }
- }
- store_meal_temp_dict = {
- store_meal_name: {
- '数量': 1,
- currency: price
- }
- }
- else:
- device_type_temp_dict = {
- device_type_name: 1
- }
- store_meal_temp_dict = {
- store_meal_name: 1
- }
- country_temp_dict = {
- country_name: 1
- }
- OrdersSummary.objects.create(time=add_time_stamp, count=1, query_type=is_pay,
- service_type=order_type, total=final_total,
- country=country_temp_dict, created_time=created_time,
- device_type=device_type_temp_dict, store_meal=store_meal_temp_dict)
- return response.json(0)
- except Exception as e:
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def collect_operating_costs(response):
- try:
- today = datetime.datetime.today()
- end_time = datetime.datetime(today.year, today.month, today.day)
- yesterday = end_time - datetime.timedelta(days=1)
- start_time = datetime.datetime(yesterday.year, yesterday.month, 1)
- start_time_stamp = int(start_time.timestamp())
- end_time_stamp = int(end_time.timestamp())
- thread = threading.Thread(target=CronCollectDataView.thread_collect_operating_costs,
- args=(start_time_stamp, end_time_stamp, start_time, end_time))
- thread.start() # 启动线程
- return response.json(0)
- except Exception as e:
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def thread_collect_operating_costs(start_time_stamp, end_time_stamp, start_time, end_time):
- try:
- create_time = int(time.time())
- today_end_time = end_time_stamp + 86400
- operating_costs_qs_1 = OperatingCosts.objects.filter(time=start_time_stamp).exclude(
- created_time__gte=end_time_stamp, created_time__lt=today_end_time).values('order_id', 'end_time', 'uid')
- operating_costs_qs_2 = OperatingCosts.objects.filter(time=start_time_stamp,
- created_time__gte=end_time_stamp,
- created_time__lt=today_end_time, start_time=0).values(
- 'order_id', 'end_time', 'uid')
- operating_costs_qs = operating_costs_qs_1.union(operating_costs_qs_2)
- storage_univalence = 0.023 / 30
- api_univalence = 0.005 / 1000
- region = '国内' if CONFIG_INFO == CONFIG_CN else '国外'
- country_qs = CountryModel.objects.values('id', 'country_name')
- country_dict = {}
- for item in country_qs:
- country_dict[item['id']] = item['country_name']
- for item in operating_costs_qs:
- order_qs = Order_Model.objects.filter(orderID=item['order_id'], order_type__in=[0, 1]).values('price',
- 'payTime',
- 'rank__expire',
- 'fee',
- 'payType',
- 'userID__region_country')
- if order_qs.exists():
- order = order_qs[0]
- country_name = country_dict.get(order['userID__region_country'], '未知国家')
- order_type = '云存'
- expire = str(order_qs[0]['rank__expire']) + '个月'
- price = float(order['price'])
- if order['payType'] in [2, 3]:
- fee = round(0.0054 * price, 2)
- else:
- fee = float(order['fee']) if order['fee'] else 0
- order_start_time = int((datetime.datetime.fromtimestamp(item['end_time']) - relativedelta(
- months=order['rank__expire'])).timestamp())
- order_days = math.ceil((item['end_time'] - order_start_time) / 86400)
- if item['end_time'] > end_time_stamp: # 订单结束时间大于统计时间
- if order_start_time <= start_time_stamp: # 订单月初之前开始
- settlement_days = (end_time - start_time).days # 当月结算天数=月初-月底
- uid_bucket_statistics = UidBucketStatistics.objects.filter(time__gte=start_time_stamp,
- time__lte=end_time_stamp,
- uid=item['uid'])
- elif order_start_time >= end_time_stamp: # 订单在统计时间之后开始
- settlement_days = 1
- uid_bucket_statistics = UidBucketStatistics.objects.filter(time__gte=end_time_stamp,
- time__lt=order_start_time,
- uid=item['uid'])
- else: # 订单月初和统计时间之间开始
- settlement_days = math.ceil((end_time_stamp - order_start_time) / 86400)
- uid_bucket_statistics = UidBucketStatistics.objects.filter(time__gte=order_start_time,
- time__lte=end_time_stamp,
- uid=item['uid'])
- remaining_usage_time = math.ceil((item['end_time'] - end_time_stamp) / 86400) # 剩余使用时间
- else: # 订单结束时间小于统计时间
- if order_start_time <= start_time_stamp:
- settlement_days = math.ceil((item['end_time'] - start_time_stamp) / 86400)
- uid_bucket_statistics = UidBucketStatistics.objects.filter(time__gte=start_time_stamp,
- time__lt=item['end_time'],
- uid=item['uid'])
- else:
- settlement_days = math.ceil((item['end_time'] - order_start_time) / 86400)
- uid_bucket_statistics = UidBucketStatistics.objects.filter(time__gte=order_start_time,
- time__lt=item['end_time'],
- uid=item['uid'])
- remaining_usage_time = 0
- day_average_price = round(price / order_days, 2) # 收入分摊/天
- month_average_price = round(day_average_price * settlement_days, 2) # 收入分摊/月
- monthly_income = round((price - fee) / order_days * settlement_days, 2) # 当月结算收入
- real_income = round(price - fee, 2)
- result = uid_bucket_statistics.aggregate(size=Sum('storage_size'), api_count=Sum('api_count'))
- actual_storage = round(result['size'], 2) if result['size'] else 0
- actual_api = result['api_count'] if result['api_count'] else 0
- storage_cost = actual_storage / 1024 * storage_univalence * settlement_days
- api_cost = actual_api * api_univalence
- if CONFIG_INFO == CONFIG_CN: # 国内要换算汇率
- storage_cost = storage_cost * 7
- api_cost = api_cost * 7
- profit = round(monthly_income - storage_cost - api_cost, 2) # 利润=月结算金额-月成本
- storage_cost = round(storage_cost, 2)
- api_cost = round(api_cost, 2)
- if monthly_income == 0.0:
- profit_margin = 0
- else:
- profit_margin = round(profit / month_average_price, 2) # 利润率=利润/每月收入分摊
- OperatingCosts.objects.filter(time=start_time_stamp, order_id=item['order_id'],
- uid=item['uid']).update(day_average_price=day_average_price,
- month_average_price=month_average_price,
- monthly_income=monthly_income,
- actual_storage=actual_storage,
- settlement_days=settlement_days,
- actual_api=actual_api, fee=fee,
- created_time=create_time, region=region,
- start_time=order_start_time,
- remaining_usage_time=remaining_usage_time,
- storage_cost=storage_cost, api_cost=api_cost,
- profit=profit, profit_margin=profit_margin,
- real_income=real_income, price=price,
- country_name=country_name,
- order_type=order_type, expire=expire)
- print('结束')
- except Exception as e:
- LOGGER.info(
- 'thread_collect_operating_costs接口异常:errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno,
- repr(e)))
- @staticmethod
- def collect_obj_size(response):
- try:
- today = datetime.datetime.today()
- end_time = datetime.datetime(today.year, today.month, today.day)
- start_time = end_time - datetime.timedelta(days=1)
- first_date = datetime.datetime(start_time.year, start_time.month, 1)
- start_time_stamp = int(start_time.timestamp())
- end_time_stamp = int(end_time.timestamp())
- first_date_stamp = int(first_date.timestamp())
- thread = threading.Thread(target=CronCollectDataView.thread_collect_obj_size,
- args=(start_time_stamp, end_time_stamp, first_date_stamp))
- thread.start() # 启动线程
- return response.json(0)
- except Exception as e:
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def thread_collect_obj_size(start_time, end_time, first_date):
- try:
- creat_time = int(time.time())
- uid_list = UidBucketStatistics.objects.filter(time=start_time).values_list('uid', flat=True)
- uid_vod = UID_Bucket.objects.filter(Q(endTime__gte=start_time), ~Q(uid__in=uid_list)).values('uid',
- 'bucket__bucket',
- 'orderId',
- 'channel',
- 'endTime')
- s3_obj = AmazonS3Util(ACCESS_KEY_ID, SECRET_ACCESS_KEY, REGION_NAME)
- for item in uid_vod:
- path = '{uid}/vod{channel}'.format(uid=item['uid'], channel=item['channel'])
- s3_result = s3_obj.get_object_list(item['bucket__bucket'], path,
- path + '/{}'.format(start_time), end_time)
- actual_storage = 0
- actual_api = 0
- for obj in s3_result:
- temp_time = int(obj['Key'].split('/')[2])
- if temp_time < end_time:
- actual_storage += obj['Size']
- actual_api += 1
- actual_storage = round(actual_storage / 1024 / 1024, 2)
- with transaction.atomic():
- if actual_api:
- UidBucketStatistics.objects.create(uid=item['uid'], storage_size=actual_storage,
- api_count=actual_api,
- created_time=creat_time,
- time=start_time)
- operating_costs_qs = OperatingCosts.objects.filter(order_id=item['orderId'], uid=item['uid'],
- time=first_date)
- if not operating_costs_qs.exists():
- OperatingCosts.objects.create(order_id=item['orderId'], uid=item['uid'],
- created_time=creat_time, time=first_date,
- end_time=item['endTime'])
- print(actual_storage, actual_api)
- print('结束')
- except Exception as e:
- print('error')
- LOGGER.info('统计s3信息异常:errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @classmethod
- def check_request(cls, response):
- domain_name_list = [
- 'test.zositechc.cn', 'test.push.zositechc.cn',
- 'www.zositechc.cn', 'api.loocam2.cn', 'common.neutral2.cn', 'api.aiotserver.cn', 'push.zositechc.cn',
- 'www.dvema.com', 'api.zositech2.com', 'api.loocam2.com', 'common.neutral2.com', '149.neutral2.com',
- '365.neutral2.com', 'push.dvema.com',
- 'www.zositeche.com', 'api.zositeche.com', 'api.loocam3.com', 'common.neutral3.com',
- 'push.zositeche.com',
- 'www.zositech.xyz', 'smart.loocam2.com'
- ]
- for domain_name in domain_name_list:
- thread = threading.Thread(target=cls.initial_request, args=(domain_name,))
- thread.start()
- return response.json(0)
- @staticmethod
- def initial_request(domain_name):
- url = 'https://{}/init/health-check'.format(domain_name)
- try:
- requests.post(url=url, timeout=30)
- except Exception as e:
- email_content = 'https请求域名{}出现异常!error_msg:{}'.format(domain_name, repr(e))
- S3Email().faEmail(email_content, 'servers@ansjer.com')
- @staticmethod
- def collect_icloud_order(response):
- try:
- order_type = 4
- created_time = int(time.time())
- today = datetime.datetime.today()
- start_time = datetime.datetime(today.year, today.month, today.day)
- end_time = start_time + datetime.timedelta(days=1)
- start_time = CommonService.str_to_timestamp(start_time.strftime('%Y-%m-%d %H:%M:%S'))
- end_time = CommonService.str_to_timestamp(end_time.strftime('%Y-%m-%d %H:%M:%S'))
- order_qs = Order_Model.objects.filter(addTime__gte=start_time, addTime__lt=end_time, order_type=order_type,
- status=1).values('userID',
- 'store_meal_name', 'price',
- 'addTime', 'currency').order_by(
- 'addTime')
- user_list = []
- all_order_qs = Order_Model.objects.filter(addTime__lt=start_time, status=1, order_type=order_type).values(
- 'userID')
- for item in all_order_qs:
- if item['userID'] not in user_list:
- user_list.append(item['userID'])
- # 国家表数据
- country_qs = CountryModel.objects.values('id', 'country_name')
- country_dict = {}
- for item in country_qs:
- country_dict[item['id']] = item['country_name']
- with transaction.atomic():
- for item in order_qs:
- price = float(item['price'])
- currency = item['currency']
- user_qs = Device_User.objects.filter(userID=item['userID']).values('region_country')
- country_id = user_qs[0]['region_country'] if user_qs.exists() else 0
- country_name = country_dict.get(country_id, '未知国家')
- store_meal_name = item['store_meal_name']
- add_time_stamp = item['addTime']
- add_time_str = datetime.datetime.fromtimestamp(int(add_time_stamp))
- add_time_str = datetime.datetime(add_time_str.year, add_time_str.month, add_time_str.day)
- add_time_stamp = CommonService.str_to_timestamp(add_time_str.strftime('%Y-%m-%d %H:%M:%S'))
- order_summary_qs = OrdersSummary.objects.filter(time=add_time_stamp, query_type=0,
- service_type=order_type)
- if item['userID'] not in user_list:
- pay_order_summary_qs = OrdersSummary.objects.filter(time=add_time_stamp, query_type=2,
- service_type=order_type)
- query_type = 2
- else:
- pay_order_summary_qs = OrdersSummary.objects.filter(time=add_time_stamp, query_type=3,
- service_type=order_type)
- query_type = 3
- if pay_order_summary_qs.exists():
- pay_order_summary = pay_order_summary_qs.first()
- pay_order_summary.count += 1
- temp_total = eval(pay_order_summary.total)
- if currency not in temp_total:
- temp_total[currency] = price
- else:
- temp_total[currency] = round(temp_total[currency] + price, 2)
- pay_order_summary.total = temp_total
- country_temp_dict = eval(pay_order_summary.country)
- if country_name in country_temp_dict:
- country_temp_dict[country_name]['数量'] += 1
- if currency not in country_temp_dict[country_name]:
- country_temp_dict[country_name][currency] = price
- else:
- country_temp_dict[country_name][currency] = round(
- country_temp_dict[country_name][currency] + price, 2)
- else:
- country_temp_dict[country_name] = {'数量': 1, currency: price}
- pay_order_summary.country = country_temp_dict
- store_meal_temp_dict = eval(pay_order_summary.store_meal)
- if store_meal_name in store_meal_temp_dict:
- store_meal_temp_dict[store_meal_name]['数量'] += 1
- if currency not in store_meal_temp_dict[store_meal_name]:
- store_meal_temp_dict[store_meal_name][currency] = price
- else:
- store_meal_temp_dict[store_meal_name][currency] = round(
- store_meal_temp_dict[store_meal_name][currency] + price, 2)
- else:
- store_meal_temp_dict[store_meal_name] = {'数量': 1, currency: price}
- pay_order_summary.store_meal = store_meal_temp_dict
- pay_order_summary.save()
- else:
- final_total = {currency: price}
- country_temp_dict = {
- country_name: {
- '数量': 1,
- currency: price
- }
- }
- store_meal_temp_dict = {
- store_meal_name: {
- '数量': 1,
- currency: price
- }
- }
- OrdersSummary.objects.create(time=add_time_stamp, count=1, query_type=query_type,
- service_type=order_type, total=final_total,
- country=country_temp_dict, created_time=created_time,
- device_type={},
- store_meal=store_meal_temp_dict)
- if order_summary_qs.exists():
- order_summary = order_summary_qs.first()
- order_summary.count += 1
- temp_total = eval(order_summary.total)
- if currency not in temp_total:
- temp_total[currency] = price
- else:
- temp_total[currency] = round(temp_total[currency] + price, 2)
- order_summary.total = temp_total
- country_temp_dict = eval(order_summary.country)
- if country_name in country_temp_dict:
- country_temp_dict[country_name]['数量'] += 1
- if currency not in country_temp_dict[country_name]:
- country_temp_dict[country_name][currency] = price
- else:
- country_temp_dict[country_name][currency] = round(
- country_temp_dict[country_name][currency] + price, 2)
- else:
- country_temp_dict[country_name] = {'数量': 1, currency: price}
- order_summary.country = country_temp_dict
- store_meal_temp_dict = eval(order_summary.store_meal)
- if store_meal_name in store_meal_temp_dict:
- store_meal_temp_dict[store_meal_name]['数量'] += 1
- if currency not in store_meal_temp_dict[store_meal_name]:
- store_meal_temp_dict[store_meal_name][currency] = price
- else:
- store_meal_temp_dict[store_meal_name][currency] = round(
- store_meal_temp_dict[store_meal_name][currency] + price, 2)
- else:
- store_meal_temp_dict[store_meal_name] = {'数量': 1, currency: price}
- order_summary.store_meal = store_meal_temp_dict
- order_summary.save()
- else:
- final_total = {currency: price}
- country_temp_dict = {
- country_name: {
- '数量': 1,
- currency: price
- }
- }
- store_meal_temp_dict = {
- store_meal_name: {
- '数量': 1,
- currency: price
- }
- }
- OrdersSummary.objects.create(time=add_time_stamp, count=1, query_type=0,
- service_type=order_type, total=final_total,
- country=country_temp_dict, created_time=created_time,
- device_type={}, store_meal=store_meal_temp_dict)
- return response.json(0)
- except Exception as e:
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def collect_device_info(response):
- try:
- created_time = int(time.time())
- today = datetime.datetime.today()
- start_time = datetime.datetime(today.year, today.month, today.day)
- end_time = start_time + datetime.timedelta(days=1)
- start_time = CommonService.str_to_timestamp(start_time.strftime('%Y-%m-%d %H:%M:%S'))
- end_time = CommonService.str_to_timestamp(end_time.strftime('%Y-%m-%d %H:%M:%S'))
- increase_device_qs = UidSetModel.objects.filter(addTime__gte=start_time, addTime__lt=end_time).values(
- 'tb_country',
- 'uid',
- 'device_type',
- 'cloud_vod',
- 'is_ai',
- 'mobile_4g',
- 'addTime')
- video_play_back_time_qs = VideoPlaybackTimeModel.objects.filter(startTime__gte=start_time,
- startTime__lt=end_time).values('uid')
- active_device_qs = UidSetModel.objects.filter(uid__in=video_play_back_time_qs).values('tb_country',
- 'addTime',
- 'device_type',
- 'cloud_vod',
- 'is_ai',
- 'mobile_4g',
- 'uid')
- increase_device_count = increase_device_qs.count()
- active_device_count = active_device_qs.count()
- # 国家表数据
- country_qs = CountryModel.objects.values('id', 'country_name', 'region__name')
- country_dict = {}
- continent_dict = {}
- for item in country_qs:
- country_dict[item['id']] = item['country_name']
- continent_dict[item['country_name']] = item['region__name']
- # 设备类型数据
- device_type_qs = DeviceTypeModel.objects.values('name', 'type')
- device_type_dict = {}
- for item in device_type_qs:
- device_type_dict[item['type']] = item['name']
- with transaction.atomic():
- if increase_device_qs.exists():
- # 国家大洲设备数据
- increase_device_country_list = increase_device_qs.values('tb_country').annotate(
- count=Count('tb_country')).order_by('count')
- increase_device_country_dict = {}
- increase_device_continent_dict = {}
- for item in increase_device_country_list:
- country_name = country_dict.get(item['tb_country'], '未知国家')
- continent_name = continent_dict.get(country_name, '未知大洲')
- increase_device_country_dict[country_name] = item['count']
- if continent_name not in increase_device_continent_dict:
- increase_device_continent_dict[continent_name] = 0
- increase_device_continent_dict[continent_name] += item['count']
- # 设备类型数据
- increase_device_type_list = increase_device_qs.values('device_type').annotate(
- count=Count('device_type')).order_by('count')
- increase_device_type_dict = {}
- for item in increase_device_type_list:
- type_name = device_type_dict.get(item['device_type'], '未知设备类型')
- increase_device_type_dict[type_name] = item['count']
- # 云存设备类型数据
- increase_device_vod_list = increase_device_qs.filter(~Q(cloud_vod=2)).values(
- 'device_type').annotate(
- count=Count('device_type')).order_by('count')
- increase_device_vod_dict = {}
- for item in increase_device_vod_list:
- type_name = device_type_dict.get(item['device_type'], '未知设备类型')
- increase_device_vod_dict[type_name] = item['count']
- # AI设备类型数据
- increase_device_ai_list = increase_device_qs.filter(~Q(is_ai=2)).values('device_type').annotate(
- count=Count('device_type')).order_by('count')
- increase_device_ai_dict = {}
- for item in increase_device_ai_list:
- type_name = device_type_dict.get(item['device_type'], '未知设备类型')
- increase_device_ai_dict[type_name] = item['count']
- # 联通设备类型数据
- increase_device_unicom_list = increase_device_qs.filter(~Q(mobile_4g=2)).values(
- 'device_type').annotate(
- count=Count('device_type')).order_by('count')
- increase_device_unicom_dict = {}
- for item in increase_device_unicom_list:
- type_name = device_type_dict.get(item['device_type'], '未知设备类型')
- increase_device_unicom_dict[type_name] = item['count']
- DeviceInfoSummary.objects.create(time=start_time, count=increase_device_count,
- query_type=0, created_time=created_time,
- country=increase_device_country_dict,
- continent=increase_device_continent_dict,
- vod_service=increase_device_vod_dict,
- ai_service=increase_device_ai_dict,
- unicom_service=increase_device_unicom_dict,
- device_type=increase_device_type_dict)
- if active_device_qs.exists():
- # 国家大洲设备数据
- active_device_country_list = active_device_qs.values('tb_country').annotate(
- count=Count('tb_country')).order_by('count')
- active_device_country_dict = {}
- active_device_continent_dict = {}
- for item in active_device_country_list:
- country_name = country_dict.get(item['tb_country'], '未知国家')
- continent_name = continent_dict.get(country_name, '未知大洲')
- active_device_country_dict[country_name] = item['count']
- if continent_name not in active_device_continent_dict:
- active_device_continent_dict[continent_name] = 0
- active_device_continent_dict[continent_name] += item['count']
- # 设备类型数据
- active_device_type_list = active_device_qs.values('device_type').annotate(
- count=Count('device_type')).order_by('count')
- active_device_type_dict = {}
- for item in active_device_type_list:
- type_name = device_type_dict.get(item['device_type'], '未知设备类型')
- active_device_type_dict[type_name] = item['count']
- # 云存设备类型数据
- active_device_vod_list = active_device_qs.filter(~Q(cloud_vod=2)).values('device_type').annotate(
- count=Count('device_type')).order_by('count')
- active_device_vod_dict = {}
- for item in active_device_vod_list:
- type_name = device_type_dict.get(item['device_type'], '未知设备类型')
- active_device_vod_dict[type_name] = item['count']
- # AI设备类型数据
- active_device_ai_list = active_device_qs.filter(~Q(is_ai=2)).values('device_type').annotate(
- count=Count('device_type')).order_by('count')
- active_device_ai_dict = {}
- for item in active_device_ai_list:
- type_name = device_type_dict.get(item['device_type'], '未知设备类型')
- active_device_ai_dict[type_name] = item['count']
- # 联通设备类型数据
- active_device_unicom_list = active_device_qs.filter(~Q(mobile_4g=2)).values('device_type').annotate(
- count=Count('device_type')).order_by('count')
- active_device_unicom_dict = {}
- for item in active_device_unicom_list:
- type_name = device_type_dict.get(item['device_type'], '未知设备类型')
- active_device_unicom_dict[type_name] = item['count']
- DeviceInfoSummary.objects.create(time=start_time, count=active_device_count,
- query_type=1, created_time=created_time,
- country=active_device_country_dict,
- continent=active_device_continent_dict,
- vod_service=active_device_vod_dict,
- ai_service=active_device_ai_dict,
- unicom_service=active_device_unicom_dict,
- device_type=active_device_type_dict)
- return response.json(0)
- except Exception as e:
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def collect_activity_user(response):
- try:
- created_time = int(time.time())
- today = datetime.datetime.today()
- start_time = datetime.datetime(today.year, today.month, today.day)
- end_time = start_time + datetime.timedelta(days=1)
- start_time = CommonService.str_to_timestamp(start_time.strftime('%Y-%m-%d %H:%M:%S'))
- end_time = CommonService.str_to_timestamp(end_time.strftime('%Y-%m-%d %H:%M:%S'))
- thread = threading.Thread(target=CronCollectDataView.thread_collect_activity_user,
- args=(start_time, end_time, created_time))
- thread.start() # 启动线程
- return response.json(0)
- except Exception as e:
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def thread_collect_activity_user(start_time, end_time, created_time):
- try:
- active_uid = UserExModel.objects.filter(updTime__gte=start_time, updTime__lt=end_time).values_list(
- 'userID__device_info__UID', flat=True).distinct()
- active_device_qs = UidSetModel.objects.filter(uid__in=active_uid).values('addTime', 'device_type', 'ucode')
- # 设备类型数据
- device_type_qs = DeviceTypeModel.objects.values('name', 'type')
- device_type_dict = {}
- for item in device_type_qs:
- device_type_dict[item['type']] = item['name']
- with transaction.atomic():
- if active_device_qs.exists():
- # 按设备设备类型
- active_device_type_list = active_device_qs.values('device_type').annotate(
- count=Count('device_type')).order_by('count')
- active_device_type_dict = {}
- for item in active_device_type_list:
- type_name = device_type_dict.get(item['device_type'], '未知设备类型')
- active_device_type_dict[type_name] = item['count']
- # 按ucode分类
- active_ucode_list = active_device_qs.values('ucode').annotate(count=Count('ucode')).order_by('count')
- active_ucode_dict = {}
- for item in active_ucode_list:
- ucode = item['ucode']
- if item['ucode'] == '':
- ucode = '未知ucode'
- active_ucode_dict[ucode] = item['count']
- user_qs = DeviceUserSummary.objects.filter(time=start_time, query_type=1)
- if user_qs.exists():
- user_qs.update(device_type=active_device_type_dict, ucode=active_ucode_dict)
- else:
- DeviceUserSummary.objects.create(time=start_time, query_type=1, created_time=created_time,
- device_type=active_device_type_dict, ucode=active_ucode_dict)
- except Exception as e:
- LOGGER.info(
- 'thread_collect_activity_user接口异常:errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno,
- repr(e)))
- @staticmethod
- def collect_flow_info(response):
- try:
- unicom_qs = UnicomDeviceInfo.objects.filter(card_type=0).values('iccid').distinct().order_by('iccid')
- asy = threading.Thread(target=CronCollectDataView.thread_collect_flow, args=(unicom_qs,))
- asy.start()
- return response.json(0)
- except Exception as e:
- return response.json(500, repr(e))
- @staticmethod
- def thread_collect_flow(qs):
- try:
- unicom_api = UnicomObjeect()
- redis_obj = RedisObject()
- for item in qs:
- res = unicom_api.query_device_usage_history(**item)
- if res.status_code == 200:
- res_json = res.json()
- if res_json['code'] == 0:
- redis_dict = {}
- for data in res_json['data']['deviceUsageHistory']:
- year = data.get('year', None)
- month = data.get('month', None)
- flow = data.get('flowTotalUsage', None)
- if not all([year, month, flow]):
- continue
- file = str(year) + '-' + str(month)
- redis_dict[file] = flow
- key = 'monthly_flow_' + item['iccid']
- if redis_dict:
- redis_obj.set_hash_data(key, redis_dict)
- except Exception as e:
- LOGGER.info('统计联通流量失败,时间为:{}'.format(int(time.time())))
- class CronComparedDataView(View):
- def get(self, request, *args, **kwargs):
- request.encoding = 'utf-8'
- operation = kwargs.get('operation')
- return self.validation(request.GET, request, operation)
- def post(self, request, *args, **kwargs):
- request.encoding = 'utf-8'
- operation = kwargs.get('operation')
- return self.validation(request.POST, request, operation)
- def validation(self, request_dict, request, operation):
- response = ResponseObject()
- if operation == 'PaypalOrder': # 定时对比paypal订单
- return self.compared_paypal_order(request_dict, response)
- elif operation == 'WechatOrder': # 定时对比微信订单
- return self.compared_wechat_order(response)
- elif operation == 'AlipayOrder': # 定时对比阿里订单
- return self.compared_alipay_order(response)
- elif operation == 'AnsjerOrder': # 定时对比后台订单
- return self.compared_ansjer_order(request_dict, response)
- else:
- return response.json(404)
- @staticmethod
- def compared_paypal_order(request_dict, response):
- time_stamp = request_dict.get('time', None)
- if time_stamp:
- end_date = datetime.datetime.fromtimestamp(int(time_stamp))
- start_date = end_date - datetime.timedelta(days=1)
- else:
- today = datetime.datetime.today()
- start_date = today - datetime.timedelta(days=2)
- start_date = datetime.datetime(start_date.year, start_date.month, start_date.day)
- end_date = start_date + datetime.timedelta(days=1)
- try:
- zosi_paypal_api = paypalrestsdk.Api(PAYPAL_CRD['Zosi'])
- vsees_paypal_api = paypalrestsdk.Api(PAYPAL_CRD['Vsees'])
- paypal_url = 'v1/reporting/transactions?start_date={}-{}-{}T08:00:00-0800&end_date={}-{}-{}T08:00:00-0800&fields=all&page_size=500&page=1&transaction_status=S'.format(
- start_date.year, start_date.month, start_date.day, end_date.year, end_date.month, end_date.day)
- zosi_order_list = zosi_paypal_api.get(paypal_url)
- vsees_order_list = vsees_paypal_api.get(paypal_url)
- order_list = zosi_order_list['transaction_details'] + vsees_order_list['transaction_details']
- # data = (
- # ('start_date', '{}-{}-{}T08:00:00-0800'.format(start_date.year, start_date.month, start_date.day)),
- # ('end_date', '{}-{}-{}T08:00:00-0800'.format(end_date.year, end_date.month, end_date.day)),
- # ('fields', 'all'),
- # ('page_size', '500'),
- # ('page', '1'),
- # ('transaction_status', 'S')
- # )
- # order_list = PayPalService(PAYPAL_CRD['client_id'], PAYPAL_CRD['client_secret']).get_transactions(data)
- thread = threading.Thread(target=CronComparedDataView.thread_compared_paypal_order,
- args=(order_list, end_date))
- thread.start() # 启动线程
- return response.json(0)
- except Exception as e:
- LOGGER.info('CronComparedDataView.compared_paypal_order, errLine:{}, errMsg:{}'.format(
- e.__traceback__.tb_lineno, repr(e)))
- return response.json(500)
- @staticmethod
- def thread_compared_paypal_order(order_list, start_time):
- try:
- now_time = int(time.time())
- timestamp = int(start_time.timestamp())
- count = 0
- total = 0
- more_order_list = []
- for item in order_list:
- if 'custom_field' in item['transaction_info'] and item['transaction_info']['custom_field'] == 'Shopify':
- continue
- count += 1
- total += float(item['transaction_info']['transaction_amount']['value'])
- trade_no = item['transaction_info']['transaction_id']
- if item['transaction_info']['transaction_event_code'] in ['T1106', 'T1107', 'T1202']: # 付款退款
- trade_no = item['transaction_info']['paypal_reference_id']
- transaction_subject = item['transaction_info'].get('transaction_subject', '')
- agreement_id = item['transaction_info'].get('paypal_reference_id', '')
- refund_order = False
- if item['transaction_info']['transaction_event_code'] in ['T1106', 'T1107', 'T1201', 'T0114', 'T1108']:
- agreement_id = ''
- if item['transaction_info']['transaction_event_code'] in ['T0114']:
- transaction_subject = '争议费'
- elif item['transaction_info']['transaction_event_code'] in ['T1108']:
- transaction_subject = 'Fee reversal'
- else:
- refund_order = True
- transaction_subject = '退款费'
- more_order_list.append(trade_no)
- pay_time = int(datetime.datetime.strptime(item['transaction_info']['transaction_updated_date'],
- "%Y-%m-%dT%H:%M:%S%z").timestamp())
- order_qs = Order_Model.objects.filter(trade_no=trade_no, payType=1)
- if not order_qs.exists():
- order_dict = {
- 'trade_no': trade_no,
- 'agreement_id': agreement_id,
- 'pay_time': pay_time,
- 'username': item['payer_info'].get('email_address', ''),
- 'price': item['transaction_info']['transaction_amount']['value'],
- 'pay_type': 1,
- 'upd_time': now_time,
- 'status': 0,
- 'meal_name': transaction_subject
- }
- if agreement_id:
- order_dict['pay_type'] = 0
- order_dict['meal_name'] = 'paypal_cycle'
- order_dict['order_id'] = transaction_subject
- params = {'trade_no': trade_no, 'pay_time': pay_time, 'refund_order': refund_order}
- response = requests.get('https://www.zositeche.com/testApi/checkOrderExist', params=params)
- if response.status_code != 200:
- # 如果响应失败,记录在数据库
- abnormal_qs = AbnormalOrder.objects.filter(trade_no=trade_no)
- if not abnormal_qs.exists():
- AbnormalOrder.objects.create(**order_dict)
- continue
- result = response.json()
- if result['result_code'] != 0 or not result['result']['is_exist']:
- # 如果响应结果为空,记录在数据库
- abnormal_qs = AbnormalOrder.objects.filter(trade_no=trade_no)
- if not abnormal_qs.exists():
- AbnormalOrder.objects.create(**order_dict)
- more_order_list.append(trade_no)
- else:
- if not refund_order:
- order_qs.update(payTime=pay_time)
- total = round(total, 2)
- daily_reconciliation = DailyReconciliation.objects.filter(time=timestamp)
- if daily_reconciliation.exists():
- if daily_reconciliation.first().order_ids:
- old_order_list = daily_reconciliation.first().order_ids.split(',')
- more_order_list = list(set(old_order_list) | set(more_order_list))
- order_ids = ','.join(set(more_order_list))
- daily_reconciliation.update(paypal_num=count, paypal_total=total, upd_time=now_time,
- order_ids=order_ids)
- else:
- order_ids = ','.join(set(more_order_list))
- DailyReconciliation.objects.create(paypal_num=count, paypal_total=total, time=timestamp,
- order_ids=order_ids, creat_time=now_time, upd_time=now_time)
- except Exception as e:
- LOGGER.info('paypal每日对账异常:errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def compared_wechat_order(response):
- today = datetime.datetime.today()
- start_date = today - datetime.timedelta(days=1)
- start_date = start_date.strftime("%Y%m%d")
- try:
- order_list = WechatPayObject().download_bill(start_date)
- thread = threading.Thread(target=CronComparedDataView.thread_compared_wechat_order,
- args=(order_list,))
- thread.start()
- return response.json(0)
- except Exception as e:
- LOGGER.info('CronComparedDataView.compared_wechat_order, errLine:{}, errMsg:{}'.format(
- e.__traceback__.tb_lineno, repr(e)))
- return response.json(500)
- @staticmethod
- def thread_compared_wechat_order(order_list):
- now_time = int(time.time())
- for order in order_list:
- if order['交易类型'] != '`APP':
- continue
- order_id = order['商户订单号'].replace('`', '')
- order_qs = Order_Model.objects.filter(orderID=order_id)
- if not order_qs.exists():
- order_dict = {
- 'trade_no': order['微信订单号'].replace('`', ''),
- 'order_id': order_id,
- 'pay_type': 3,
- 'price': order['订单金额'].replace('`', ''),
- 'pay_time': int(
- datetime.datetime.strptime(order['\ufeff交易时间'], "`%Y-%m-%d %H:%M:%S").timestamp()),
- 'upd_time': now_time,
- 'meal_name': order['商品名称'].replace('`', ''),
- }
- AbnormalOrder.objects.create(**order_dict)
- @staticmethod
- def compared_alipay_order(response):
- today = datetime.datetime.today()
- start_date = today - datetime.timedelta(days=1)
- start_date = start_date.strftime("%Y-%m-%d")
- try:
- ali_pay_obj = AliPayObject()
- alipay = ali_pay_obj.conf()
- result = alipay.server_api(
- api_name='alipay.data.dataservice.bill.downloadurl.query',
- biz_content={'bill_type': 'trade',
- 'bill_date': start_date,
- }
- )
- res = requests.get(result['bill_download_url'])
- zip_file = res.content
- zip_data = io.BytesIO(zip_file)
- data = []
- with zipfile.ZipFile(zip_data, 'r') as zip_ref:
- for file in zip_ref.namelist():
- if '汇总' not in file.encode('cp437').decode('gbk'):
- with zip_ref.open(file) as f:
- reader = csv.reader(io.TextIOWrapper(f, 'gbk'))
- for row in reader:
- data.append(row)
- key_list = data[4]
- orders = data[5:-4]
- order_list = []
- for item in orders:
- order_list.append(dict(zip(key_list, item)))
- thread = threading.Thread(target=CronComparedDataView.thread_compared_alipay_order,
- args=(order_list,))
- thread.start()
- return response.json(0)
- except Exception as e:
- LOGGER.info('CronComparedDataView.compared_alipay_order, errLine:{}, errMsg:{}'.format(
- e.__traceback__.tb_lineno, repr(e)))
- return response.json(500)
- @staticmethod
- def thread_compared_alipay_order(order_list):
- now_time = int(time.time())
- for order in order_list:
- order_id = order['商户订单号'].replace('\t', '')
- if len(order_id) != 20:
- continue
- order_qs = Order_Model.objects.filter(orderID=order_id)
- if not order_qs.exists():
- order_dict = {
- 'trade_no': order['支付宝交易号'].replace('\t', ''),
- 'order_id': order_id,
- 'pay_type': 2,
- 'price': order['订单金额(元)'].replace('\t', ''),
- 'pay_time': int(datetime.datetime.strptime(order['完成时间'], "%Y-%m-%d %H:%M:%S").timestamp()),
- 'upd_time': now_time,
- 'meal_name': order['商品名称'].replace('\t', ''),
- 'username': order['对方账户'].replace('\t', ''),
- }
- AbnormalOrder.objects.create(**order_dict)
- @staticmethod
- def compared_ansjer_order(request_dict, response):
- start_date_stamp = request_dict.get('time', None)
- end_time = request_dict.get('end_time', None)
- if start_date_stamp:
- start_date = datetime.datetime.fromtimestamp(int(start_date_stamp))
- end_date = start_date + datetime.timedelta(days=1)
- end_date_stamp = int(end_date.timestamp())
- else:
- today = datetime.datetime.today()
- start_date = today - datetime.timedelta(days=1)
- start_date = datetime.datetime(start_date.year, start_date.month, start_date.day)
- end_date = datetime.datetime(today.year, today.month, today.day)
- start_date_stamp = int(start_date.timestamp())
- end_date_stamp = int(end_date.timestamp())
- try:
- if end_time:
- end_date_stamp = end_time
- order_qs = Order_Model.objects.filter(status__in=[1, 5, 6], payType=1, payTime__gte=start_date_stamp,
- payTime__lt=end_date_stamp).values('trade_no', 'orderID', 'UID',
- 'userID__username',
- 'userID__NickName', 'channel',
- 'desc', 'payType',
- 'price', 'status',
- 'refunded_amount', 'addTime',
- 'updTime', 'app_type')
- if CONFIG_INFO == CONFIG_EUR:
- return response.json(0, list(order_qs))
- thread = threading.Thread(target=CronComparedDataView.thread_compared_ansjer_order,
- args=(list(order_qs), start_date))
- thread.start() # 启动线程
- return response.json(0)
- except Exception as e:
- LOGGER.info('CronComparedDataView.compared_ansjer_order, errLine:{}, errMsg:{}'.format(
- e.__traceback__.tb_lineno, repr(e)))
- return response.json(500)
- @staticmethod
- def thread_compared_ansjer_order(order_list, start_time):
- while True:
- response = requests.get('https://www.zositeche.com/cron/compared/AnsjerOrder',
- params={'time': int(start_time.timestamp())})
- if response.status_code == 200:
- result = response.json()
- if result['result_code'] == 0:
- eur_order_list = result['result']
- break
- try:
- begin_date = start_time - datetime.timedelta(days=15)
- end_date = start_time + datetime.timedelta(days=15)
- start_timestamp = int(start_time.timestamp())
- now_time = int(time.time())
- more_order_list = []
- total = 0
- all_order_list = order_list + eur_order_list
- count = len(all_order_list)
- zosi_paypal_api = paypalrestsdk.Api(PAYPAL_CRD['Zosi'])
- vsees_paypal_api = paypalrestsdk.Api(PAYPAL_CRD['Vsees'])
- for index, order in enumerate(all_order_list):
- total += float(order['price'])
- if not order['trade_no']:
- more_order_list.append(order['orderID'])
- continue
- if all_order_list.index(order) != index:
- more_order_list.append(order['orderID'])
- continue
- paypal_url = 'v1/reporting/transactions?start_date={}-{}-{}T00:00:00-0000&end_date={}-{}-{}T00:00:00-0000&transaction_id={}&fields=all&page_size=100&page=1'.format(
- begin_date.year, begin_date.month, begin_date.day, end_date.year, end_date.month, end_date.day,
- order['trade_no'])
- if order['app_type'] == 1:
- paypal_order_list = zosi_paypal_api.get(paypal_url)
- elif order['app_type'] == 2:
- paypal_order_list = vsees_paypal_api.get(paypal_url)
- else:
- continue
- if not paypal_order_list['transaction_details']:
- more_order_list.append(order['orderID'])
- total = round(total, 2)
- daily_reconciliation = DailyReconciliation.objects.filter(time=start_timestamp)
- if daily_reconciliation.exists():
- if daily_reconciliation.first().order_ids:
- old_order_list = daily_reconciliation.first().order_ids.split(',')
- more_order_list = list(set(old_order_list) | set(more_order_list))
- order_ids = ','.join(set(more_order_list))
- daily_reconciliation.update(ansjer_total=total, ansjer_num=count, order_ids=order_ids,
- upd_time=now_time)
- else:
- order_ids = ','.join(more_order_list)
- DailyReconciliation.objects.create(order_ids=order_ids, ansjer_total=total, ansjer_num=count,
- time=start_timestamp, creat_time=now_time, upd_time=now_time)
- except Exception as e:
- LOGGER.info('后台每日对账异常:errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
|