|
- import base64
- import calendar
- import datetime
- import re
- import time
- from base64 import encodebytes
- from distutils.version import LooseVersion
- from pathlib import Path
- from random import Random
- import OpenSSL.crypto as ct
- import ipdb
- import requests
- import simplejson as json
- from dateutil.relativedelta import relativedelta
- from django.core import serializers
- from django.db.models import F
- from django.utils import timezone
- from django.utils.crypto import constant_time_compare
- from pyipip import IPIPDatabase
- from Ansjer.config import BASE_DIR, SERVER_DOMAIN_SSL, CONFIG_INFO, CONFIG_TEST, CONFIG_CN, SERVER_DOMAIN_TEST, \
- SERVER_DOMAIN_CN, SERVER_DOMAIN_US, CONFIG_US, CONFIG_EUR, SERVER_DOMAIN_LIST, SERVER_DOMAIN_EUR, ALEXA_DOMAIN, \
- ME_COUNTRY_ID_LIST, EA_COUNTRY_ID_LIST
- from Controller.CheckUserData import RandomStr
- from Model.models import iotdeviceInfoModel, Device_Info, UIDModel, AppDeviceType, UIDCompanySerialModel, GatewayPush, \
- Device_User, UserExModel
- from Object.AWS.S3Email import S3Email
- from Object.ResponseObject import ResponseObject
- from Object.TokenObject import TokenObject
- class CommonService:
- # 高复用性函数类
- @staticmethod
- def get_kwargs(data=None):
- # 添加模糊搜索
- if data is None:
- data = {}
- kwargs = {}
- for (k, v) in data.items():
- if v is not None and v != u'':
- kwargs[k + '__icontains'] = v
- return kwargs
- @staticmethod
- def qs_to_dict(query_set):
- # 格式化query_set转dict
- sqlJSON = serializers.serialize('json', query_set)
- sqlList = json.loads(sqlJSON)
- sqlDict = dict(zip(["datas"], [sqlList]))
- return sqlDict
- # 格式化query_set转dict
- @staticmethod
- def request_dict_to_dict(request_dict):
- # 传参格式转换,键包含meta获取meta[]中的值,值'true'/'false'转为True,False
- key_list = []
- value_list = []
- for k, v in request_dict.items():
- key_list.append(k[k.index('[') + 1:k.index(']')] if 'meta' in k else k)
- if v == 'true':
- v = True
- elif v == 'false':
- v = False
- value_list.append(v)
- data_dict = dict(zip(key_list, value_list))
- print(data_dict)
- return data_dict
- # 获取文件大小
- @staticmethod
- def get_file_size(file_path='', suffix_type='', decimal_point=0):
- # for x in ['bytes', 'KB', 'MB', 'GB', 'TB']:
- # path = Path() / 'D:/TestServer/123444.mp4'
- path = Path() / file_path
- size = path.stat().st_size
- mb_size = 0.0
- if suffix_type == 'MB':
- mb_size = size / 1024.0 / 1024.0
- if decimal_point != 0:
- mb_size = round(mb_size, decimal_point)
- return mb_size
- @staticmethod
- def get_param_flag(data=None):
- # print(data)
- if data is None:
- data = []
- flag = True
- for v in data:
- if v is None:
- flag = False
- break
- return flag
- @staticmethod
- def get_ip_address(request):
- """
- 获取ip地址
- :param request:
- :return:
- """
- try:
- real_ip = request.META['HTTP_X_FORWARDED_FOR']
- clientIP = real_ip.split(",")[0]
- except:
- try:
- clientIP = request.META['REMOTE_ADDR']
- except Exception as e:
- clientIP = ''
- return clientIP
- # @获取一天每个小时的datetime.datetime
- @staticmethod
- def getTimeDict(times):
- time_dict = {}
- t = 0
- for x in range(24):
- if x < 10:
- x = '0' + str(x)
- else:
- x = str(x)
- a = times.strftime("%Y-%m-%d") + " " + x + ":00:00"
- time_dict[t] = timezone.datetime.strptime(a, '%Y-%m-%d %H:%M:%S')
- t += 1
- return time_dict
- # 根据ip获取地址
- @staticmethod
- def getAddr(ip):
- print('start_time=' + str(time.time()))
- base_dir = BASE_DIR
- # ip数据库
- db = IPIPDatabase(base_dir + '/DB/17monipdb.dat')
- addr = db.lookup(ip)
- # ModelService.add_tmp_log(addr)
- ts = addr.split('\t')[0]
- print('end_time=' + str(time.time()))
- return ts
- # 通过ip检索ipip指定信息 lang为CN或EN
- @staticmethod
- def getIpIpInfo(ip, lang, update=False):
- ipbd_dir = BASE_DIR + "/DB/mydata4vipday2.ipdb"
- db = ipdb.City(ipbd_dir)
- if update:
- rr = db.reload(ipbd_dir)
- info = db.find_map(ip, lang)
- return info
- @staticmethod
- def getUserID(userPhone='13800138000', getUser=True, setOTAID=False, μs=True):
- if μs == True:
- if getUser == True:
- timeID = str(round(time.time() * 1000000))
- userID = timeID + userPhone
- return userID
- else:
- if setOTAID == False:
- timeID = str(round(time.time() * 1000000))
- ID = userPhone + timeID
- return ID
- else:
- timeID = str(round(time.time() * 1000000))
- eID = '13800' + timeID + '138000'
- return eID
- else:
- if getUser == True:
- timeID = str(round(time.time() * 1000))
- userID = timeID + userPhone
- return userID
- else:
- if setOTAID == False:
- timeID = str(round(time.time() * 1000))
- ID = userPhone + timeID
- return ID
- else:
- timeID = str(round(time.time() * 1000))
- eID = '13800' + timeID + '138000'
- return eID
- @staticmethod
- def get_username(userID):
- """
- 根据用户id获取用户名/邮箱/电话
- @param userID: 用户id
- @return:
- """
- if userID:
- device_user_qs = Device_User.objects.filter(userID=userID).values('username', 'userEmail', 'phone')
- if device_user_qs.exists():
- if device_user_qs[0]['username']:
- return device_user_qs[0]['username']
- elif device_user_qs[0]['userEmail']:
- return device_user_qs[0]['userEmail']
- elif device_user_qs[0]['phone']:
- return device_user_qs[0]['phone']
- return ''
- # 生成随机数
- @staticmethod
- def RandomStr(randomlength=8, number=True):
- str = ''
- if number == False:
- characterSet = 'AaBbCcDdEeFfGgHhIiJjKkLlMmNnOoPpQqRrSsT' \
- 'tUuVvWwXxYyZz0123456789'
- else:
- characterSet = '0123456789'
- length = len(characterSet) - 1
- random = Random()
- for index in range(randomlength):
- str += characterSet[random.randint(0, length)]
- return str
- # 生成订单好
- @staticmethod
- def createOrderID():
- random_id = CommonService.RandomStr(6, True)
- order_id = datetime.datetime.now().strftime('%Y%m%d%H%M%S') + str(random_id)
- print('orderID:')
- print(order_id)
- return order_id
- # qs转换list datetime处理
- @staticmethod
- def qs_to_list(qs):
- res = []
- # print(qs)
- for ps in qs:
- try:
- if 'time' in ps:
- ps['time'] = ps['time'].strftime("%Y-%m-%d %H:%M:%S")
- if 'add_time' in ps:
- ps['add_time'] = ps['add_time'].strftime("%Y-%m-%d %H:%M:%S")
- if 'update_time' in ps:
- ps['update_time'] = ps['update_time'].strftime("%Y-%m-%d %H:%M:%S")
- if 'end_time' in ps:
- ps['end_time'] = ps['end_time'].strftime("%Y-%m-%d %H:%M:%S")
- if 'data_joined' in ps:
- if ps['data_joined']:
- ps['data_joined'] = ps['data_joined'].strftime("%Y-%m-%d %H:%M:%S")
- else:
- ps['data_joined'] = ''
- if 'userID__data_joined' in ps:
- if ps['userID__data_joined']:
- ps['userID__data_joined'] = ps['userID__data_joined'].strftime("%Y-%m-%d %H:%M:%S")
- else:
- ps['userID__data_joined'] = ''
- except Exception as e:
- pass
- res.append(ps)
- return res
- # 获取当前时间
- @staticmethod
- def get_now_time_str(n_time, tz, lang):
- print(n_time)
- print(tz)
- print(lang)
- n_time = int(n_time) + 3600 * float(tz)
- if lang == 'cn':
- return time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(int(n_time)))
- else:
- return time.strftime('%m-%d-%Y %H:%M:%S', time.gmtime(int(n_time)))
- # 生成随机数
- @staticmethod
- def encrypt_data(randomlength=8, number=False):
- str = ''
- if number == False:
- characterSet = 'AaBbCcDdEeFfGgHhIiJjKkLlMmNnOoPpQqRrSsT' \
- 'tUuVvWwXxYyZz0123456789'
- else:
- characterSet = '0123456789'
- length = len(characterSet) - 1
- random = Random()
- for index in range(randomlength):
- str += characterSet[random.randint(0, length)]
- return str
- @staticmethod
- def encode_data(content, start=1, end=4):
- """
- 数据加密
- @param content: 数据内容
- @param start: 起始长度
- @param end: 结束长度
- @return content: 加密的数据
- """
- if not content:
- return ''
- for i in range(start, end):
- length = end - i
- content = RandomStr(length, False) + content + RandomStr(length, False)
- content = base64.b64encode(str(content).encode('utf-8')).decode('utf8')
- return content
- @staticmethod
- def decode_data(content, start=1, end=4):
- """
- 数据解密
- @param content: 数据内容
- @param start: 起始长度
- @param end: 结束长度
- @return content: 解密的数据
- """
- if not content:
- return ''
- for i in range(start, end):
- content = base64.b64decode(content)
- content = content.decode('utf-8')
- content = content[i:-i]
- return content
- # 把格式化时间转换成时间戳
- @staticmethod
- def str_to_timestamp(str_time=None, format='%Y-%m-%d %H:%M:%S'):
- if str_time:
- time_tuple = time.strptime(str_time, format) # 把格式化好的时间转换成元祖
- result = time.mktime(time_tuple) # 把时间元祖转换成时间戳
- return int(result)
- return int(time.time())
- # 把时间戳转换成格式化
- @staticmethod
- def timestamp_to_str(timestamp=None, format='%Y-%m-%d %H:%M:%S'):
- if timestamp:
- time_tuple = time.localtime(timestamp) # 把时间戳转换成时间元祖
- result = time.strftime(format, time_tuple) # 把时间元祖转换成格式化好的时间
- return result
- else:
- return time.strptime(format)
- @staticmethod
- def get_date_from_timestamp(timestamp, timezone_offset):
- # 创建时区对象
- tz = datetime.timezone(datetime.timedelta(hours=timezone_offset))
- # 使用时间戳创建 datetime 对象
- dt = datetime.datetime.fromtimestamp(timestamp, tz)
- # 格式化成 '%Y-%m-%d'
- formatted_date = dt.strftime('%Y-%m-%d %H:%M:%S')
- return formatted_date
- # 计算N个月后的时间戳
- @staticmethod
- def calcMonthLater(addMonth, unix_timestamp=None):
- if unix_timestamp:
- now_year = time.localtime(unix_timestamp).tm_year
- now_month = time.localtime(unix_timestamp).tm_mon
- now_day = time.localtime(unix_timestamp).tm_mday
- now_hour = time.localtime(unix_timestamp).tm_hour
- now_min = time.localtime(unix_timestamp).tm_min
- now_second = time.localtime(unix_timestamp).tm_sec
- else:
- now_year = datetime.datetime.now().year
- now_month = datetime.datetime.now().month
- now_day = datetime.datetime.now().day
- now_hour = datetime.datetime.now().hour
- now_min = datetime.datetime.now().minute
- now_second = datetime.datetime.now().second
- for add in range(addMonth):
- if now_month == 12:
- now_year += 1
- now_month = 1
- else:
- now_month += 1
- timestamps = 0
- for is_format in range(4):
- try:
- date_format = '{now_year}-{now_month}-{now_day} {now_hour}:{now_min}:{now_second}' \
- .format(now_year=now_year, now_month=now_month, now_day=now_day, now_hour=now_hour,
- now_min=now_min, now_second=now_second)
- timestamps = CommonService.str_to_timestamp(date_format)
- except Exception as e:
- if str(e) == 'day is out of range for month':
- now_day = now_day - 1
- return timestamps
- @staticmethod
- def updateMac(mac: str):
- macArray = mac.split(':')
- macArray[0] = int(macArray[0], 16)
- macArray[1] = int(macArray[1], 16)
- macArray[2] = int(macArray[2], 16)
- first = int(macArray[5], 16)
- second = int(macArray[4], 16)
- three = int(macArray[3], 16)
- if first == 255 and second == 255 and three == 255:
- return None
- first += 1
- if first / 256 == 1:
- second += 1
- first = first % 256
- if second / 256 == 1:
- three += 1
- second = second % 256
- macArray[3] = three
- macArray[4] = second
- macArray[5] = first
- tmp = ':'.join(map(lambda x: "%02x" % x, macArray))
- return tmp.upper()
- @staticmethod
- def encode_data_without_salt(content):
- return base64.b64encode(str(content).encode("utf-8")).decode('utf8')
- @staticmethod
- def check_time_stamp_token(token, time_stamp):
- # 时间戳token校验
- if not all([token, time_stamp]):
- return False
- try:
- token = int(CommonService.decode_data(token))
- time_stamp = int(time_stamp)
- now_time = int(time.time())
- distance = now_time - time_stamp
- if token != time_stamp or distance > 60000 or distance < -60000: # 为了全球化时间控制在一天内
- return False
- return True
- except Exception as e:
- print(e)
- return False
- @staticmethod
- def check_time_stamp_token_without_distance(time_stamp_token, time_stamp):
- """
- 用于没有RTC设备的时间戳token校验
- @param time_stamp: 时间戳
- @param time_stamp_token: 时间戳token
- @return: boolean True/False
- """
- if not all([time_stamp_token, time_stamp]):
- return False
- try:
- token = CommonService.decode_data(time_stamp_token)
- if token != time_stamp:
- return False
- return True
- except Exception as e:
- print(e)
- return False
- @staticmethod
- def req_publish_mqtt_msg(identification_code, topic_name, msg, qos=1):
- """
- 通用发布MQTT消息函数
- @param identification_code: 标识码
- @param topic_name: 主题名
- @param msg: 消息内容
- @param qos: mqtt qos等级
- @return: boolean
- """
- if not all([identification_code, topic_name]):
- return False
- if identification_code.endswith('11L'):
- thing_name = 'LC_' + identification_code
- else:
- thing_name = 'Ansjer_Device_' + identification_code
- try:
- # 获取数据组织将要请求的url
- iot = iotdeviceInfoModel.objects.filter(
- thing_name=thing_name).values(
- 'endpoint', 'token_iot_number')
- if not iot.exists():
- return False
- endpoint = iot[0]['endpoint']
- Token = iot[0]['token_iot_number']
- # api doc: https://docs.aws.amazon.com/zh_cn/iot/latest/developerguide/http.html
- # url: https://IoT_data_endpoint/topics/url_encoded_topic_name?qos=1
- # post请求url发布MQTT消息
- url = 'https://{}/topics/{}?qos={}'.format(endpoint, topic_name, qos)
- authorizer_name = 'Ansjer_Iot_Auth'
- signature = CommonService.rsa_sign(Token) # Token签名
- headers = {
- 'x-amz-customauthorizer-name': authorizer_name,
- 'Token': Token,
- 'x-amz-customauthorizer-signature': signature}
- r = requests.post(url=url, headers=headers, json=msg, timeout=2)
- if r.status_code == 200:
- res = r.json()
- if res['message'] == 'OK':
- return True
- return False
- else:
- return False
- except Exception as e:
- return False
- @staticmethod
- def rsa_sign(Token):
- # 私钥签名Token
- if not Token:
- return ''
- private_key_file = '''-----BEGIN RSA PRIVATE KEY-----
- MIIEpQIBAAKCAQEA5iJzEDPqtGmFMggekVro6C0lrjuC2BjunGkrFNJWpDYzxCzE
- X5jf4/Fq7hcIaQd5sqHugDxPVollSLPe9zNilbrd0sZfU+Ed8gRVuKW9KwfE9XFr
- L0pt6bKRQ0IIRfiZ9TuR0tsQysvcO1GZSXcYfPue3tGM1zOnWFThWDqZ06+sOxzt
- RMRl4yNfbpCG4MfxG3itNXOfrjZv2OMLSXrxmzubSvRpUYSvQPs4fm9302SAnySY
- 0MKzx6H6528ZQm/IDDSZy6EmNBIyTRDfxC56vnYcXvqedAQh7jJnjdvt6Q4MhASH
- eIYi1FBSdu2NT6wgpnrqXzx5pq9kR/lnsLID0wIDAQABAoIBAQCiF4GT1/1oNSpr
- ouxk1PNXFPWFUsVGD8mAwVJmx//eiY7MjfuCmdqYYmI+cFqsH2fIOeYSzGfVO9Dq
- 9EYHN1oovAWhf7eFDPpajFMUSyiCNmazub8VAAeKowtNpCTPo9pMsDh1m3aoYA4u
- ebrN0+Sbo16y8kWRDgDAZoiR7DSMs8lczk16hwfv5mw8XpNDbaL3Coi4Koe2S1Yh
- 2SX3vWFlpd7qF1ZYXuZIp+b8JPrV7n9eUKoFgzj0gqgwQK80CoexIjiOrNMPvkQa
- q+8kCvFjAzKxOK7e8gjM8lMRiGodb61kmYZkkJzFwWO4EaGbl34lfVECd1Ixp3tF
- be0OWAGBAoGBAPSteXDzzToD8ovM7LL11x0jWwI6HOiHu89kZtW566rIezjWBuA2
- TxrcYKM3h9jQRXS3CsMdoIv6XGk5lqM8ADtjn23FBWe/THYLh8bm8JOgh5RRWQDg
- SvkLfi9Ih2mM4NJfmuuDOh3Nze2efLM7+kOZWUQwF2Zx9mL5jvRBk351AoGBAPDI
- sYmT2Li+i5+0vykA2m5uPF8ZOW8BGtAfCZv0suW7BNzSgin78g9WapRd/4p0NNiL
- /nVMqPPCpd1akCUpV+GDWQt0hV+HZjxANE0KWhciQRyo2qvo51j8SWILJSgh0tXC
- aTF8qt6oGw3VN3m57vKhbrlDaz0J/NDJFci6msAnAoGBAOuG6bXPGijUj+//DYKf
- n7jOxdZ49kboEePrtAncdHzri6IEdI3z+WXT6bpzw/LzWUimwldb96WHFNm9s8Hi
- Ch8hIODbnP5naUTgiIzw1XhmONyPCewL/F+LrqX5XVA/alNX8JrwsUrrR2WLAGLQ
- Q3I69XDsEjptTU2tCO0bCs3ZAoGBAJ2lCHfm0JHET230zONvp5N9oREyVqQSuRdh
- +syc3TQDyh85w/bw+X6JOaaCFHj1tFPC9Iqf8k4GNspCLPXnp54CfR4+38O3xnvU
- HWoDSRC0YKT++IxtJGriYrlKSr2Hx54kdvLriIPW1D+uRW/xCDza7L9nIKMKEvgv
- b4/IfOEpAoGAeKM9Te7T1VzlAkS0CJOwanzwYV/zrex84WuXxlsGgPQ871lTs5AP
- H1QLfLfFXH+UVrCEC2yv4eml/cqFkpB3gE5i4MQ8GPVIOSs5tsIyl8YUA03vdNdB
- GCqvlyw5dfxNA+EtxNE2wCW/LW7ENJlACgcfgPlBZtpLheWoZB/maw4=
- -----END RSA PRIVATE KEY-----'''
- # 使用密钥文件方式
- # private_key_file_path = os.path.join(BASE_DIR, 'static/iotCore/private.pem')#.replace('\\', '/')
- # private_key_file = open(private_key_file_path, 'r')
- private_key = ct.load_privatekey(ct.FILETYPE_PEM, private_key_file)
- signature = ct.sign(private_key, Token.encode('utf8'), 'sha256')
- signature = encodebytes(signature).decode('utf8').replace('\n', '')
- # print('signature:', signature)
- return signature
- @staticmethod
- def get_payment_status_url(lang, payment_status):
- # 返回相应的支付状态url
- if lang == 'cn':
- file_name = 'success.html' if payment_status == 'success' else 'fail.html'
- else:
- file_name = 'en_success.html' if payment_status == 'success' else 'en_fail.html'
- pay_failed_url = "{}web/paid2/{}".format(SERVER_DOMAIN_SSL, file_name)
- return pay_failed_url
- # 根据uid查询序列号,存在则返回序列号,否则返uid
- @staticmethod
- def query_serial_with_uid(uid):
- device_info_qs = Device_Info.objects.filter(UID=uid).values('serial_number')
- if device_info_qs.exists():
- serial_number = device_info_qs[0]['serial_number']
- if serial_number:
- return serial_number
- return uid
- # 根据序列号查询uid,存在则返回uid,否则返回序列号
- @staticmethod
- def query_uid_with_serial(serial_number):
- device_info_qs = Device_Info.objects.filter(serial_number=serial_number).values('UID')
- if device_info_qs.exists():
- uid = device_info_qs[0]['UID']
- if uid:
- return uid
- return serial_number
- @staticmethod
- def get_full_serial_number(uid, serial_number, device_type):
- """
- 根据uid查询返回完整序列号
- @param uid: uid
- @param serial_number: 9位序列号
- @param device_type: 设备类型
- @return: full_serial_number
- """
- p2p_type = str(UIDModel.objects.filter(uid=uid).values('p2p_type')[0]['p2p_type'])
- # 设备类型转为16进制并补齐4位
- device_type = hex(device_type)[2:]
- device_type = (4 - len(device_type)) * '0' + device_type
- full_serial_number = serial_number + p2p_type + device_type
- return full_serial_number
- # 根据企业标识返回物品名
- @staticmethod
- def get_thing_name(company_mark, thing_name_suffix):
- if company_mark == '11A':
- return 'Ansjer_Device_' + thing_name_suffix
- elif company_mark == '11L':
- return 'LC_' + thing_name_suffix
- else:
- return thing_name_suffix
- @staticmethod
- def confirm_region_id(region_country=0):
- """
- 根据配置信息和国家确定region_id
- @param region_country: 用户国家id
- @return: region_id
- """
- region_id = 3
- if CONFIG_INFO == CONFIG_US: # 美洲
- # 中东地区国家id
- if region_country in ME_COUNTRY_ID_LIST:
- region_id = 6
- # 东亚地区国家id
- elif region_country in EA_COUNTRY_ID_LIST:
- region_id = 2
- elif CONFIG_INFO == CONFIG_EUR: # 欧洲
- region_id = 4
- elif CONFIG_INFO == CONFIG_CN: # 中国
- region_id = 1
- elif CONFIG_INFO == CONFIG_TEST: # 测试
- region_id = 5
- return region_id
- @staticmethod
- def verify_token_get_user_id(request_dict, request):
- """
- 认证token,获取user id
- @param request_dict: 请求参数
- @param request: 请求体
- @return: token_obj.code, token_obj.userID, response
- """
- try:
- token_obj = TokenObject(request.META.get('HTTP_AUTHORIZATION'))
- lang = request_dict.get('lang', None)
- response = ResponseObject(lang if lang else token_obj.lang)
- return token_obj.code, token_obj.userID, response
- except Exception as e:
- print(e)
- return 309, None, None
- @staticmethod
- def cutting_time(start_time, end_time, time_unit):
- """
- 按时间单位切割时间段
- @param start_time: 开始时间
- @param end_time: 结束时间
- @param time_unit: 时间单位
- @return: time_list 切割后的时间列表
- """
- time_list = []
- while True:
- if time_unit == 'day':
- temp_time = start_time + relativedelta(days=1)
- elif time_unit == 'week':
- temp_time = start_time + relativedelta(days=7)
- elif time_unit == 'month':
- temp_time = start_time + relativedelta(months=1)
- elif time_unit == 'quarter':
- temp_time = start_time + relativedelta(months=3)
- elif time_unit == 'year':
- temp_time = start_time + relativedelta(years=1)
- else:
- break
- if temp_time < end_time:
- time_tuple = (CommonService.str_to_timestamp(start_time.strftime('%Y-%m-%d %H:%M:%S')),
- CommonService.str_to_timestamp(temp_time.strftime('%Y-%m-%d %H:%M:%S')))
- time_list.append(time_tuple)
- start_time = temp_time
- else:
- time_tuple = (CommonService.str_to_timestamp(start_time.strftime('%Y-%m-%d %H:%M:%S')),
- CommonService.str_to_timestamp(end_time.strftime('%Y-%m-%d %H:%M:%S')))
- if time_tuple not in time_list:
- time_list.append(time_tuple)
- break
- if not time_list:
- time_tuple = (CommonService.str_to_timestamp(start_time.strftime('%Y-%m-%d %H:%M:%S')),
- CommonService.str_to_timestamp(end_time.strftime('%Y-%m-%d %H:%M:%S')))
- time_list = [time_tuple]
- return time_list
- @staticmethod
- def cutting_time_stamp(start_time, end_time):
- """
- 按天切割时间段
- @param start_time: 开始时间
- @param end_time: 结束时间
- @return: time_list 切割后的时间列表
- """
- time_list = []
- while True:
- mid_time = datetime.datetime(start_time.year, start_time.month, start_time.day) + relativedelta(days=1)
- if mid_time < end_time:
- time_tuple = (CommonService.str_to_timestamp(start_time.strftime('%Y-%m-%d %H:%M:%S')),
- CommonService.str_to_timestamp(mid_time.strftime('%Y-%m-%d %H:%M:%S')))
- time_list.append(time_tuple)
- start_time = mid_time
- else:
- time_tuple = (CommonService.str_to_timestamp(start_time.strftime('%Y-%m-%d %H:%M:%S')),
- CommonService.str_to_timestamp(end_time.strftime('%Y-%m-%d %H:%M:%S')))
- if time_tuple not in time_list:
- time_list.append(time_tuple)
- break
- if not time_list:
- time_tuple = (CommonService.str_to_timestamp(start_time.strftime('%Y-%m-%d %H:%M:%S')),
- CommonService.str_to_timestamp(end_time.strftime('%Y-%m-%d %H:%M:%S')))
- time_list = [time_tuple]
- return time_list
- @staticmethod
- def get_domain_name():
- """
- 获取域名
- @return: domain_name_list 域名列表
- """
- if CONFIG_INFO == CONFIG_TEST:
- domain_name_list = [SERVER_DOMAIN_TEST[:-1]]
- elif CONFIG_INFO == CONFIG_CN or CONFIG_INFO == CONFIG_US or CONFIG_INFO == CONFIG_EUR:
- domain_name_list = [SERVER_DOMAIN_US[:-1], SERVER_DOMAIN_CN[:-1], SERVER_DOMAIN_EUR[:-1]]
- else:
- domain_name_list = []
- return domain_name_list
- @staticmethod
- def get_orders_domain_name_list():
- """
- 获取其他服务器域名列表
- @return: orders_domain_name_list 其他服务器域名列表
- """
- orders_domain_name_list = SERVER_DOMAIN_LIST
- if CONFIG_INFO == CONFIG_TEST:
- orders_domain_name_list = [SERVER_DOMAIN_CN, SERVER_DOMAIN_US, SERVER_DOMAIN_EUR]
- elif CONFIG_INFO == CONFIG_CN:
- orders_domain_name_list = [SERVER_DOMAIN_TEST, SERVER_DOMAIN_US, SERVER_DOMAIN_EUR]
- elif CONFIG_INFO == CONFIG_US:
- orders_domain_name_list = [SERVER_DOMAIN_TEST, SERVER_DOMAIN_CN, SERVER_DOMAIN_EUR]
- elif CONFIG_INFO == CONFIG_EUR:
- orders_domain_name_list = [SERVER_DOMAIN_TEST, SERVER_DOMAIN_CN, SERVER_DOMAIN_US]
- return orders_domain_name_list
- @staticmethod
- def list_sort(e):
- """
- 列表排序
- @param e: 列表元素
- """
- return sorted(e, key=lambda item: -item['count'])
- @staticmethod
- def list_sort_v2(e, order_by):
- """
- 列表排序
- @param e: 列表元素
- @param order_by: 排序对象
- """
- return sorted(e, key=lambda item: item[order_by], reverse=True)
- @staticmethod
- def Package_Type(order_type, content):
- """
- 套餐类型
- """
- if order_type == 0:
- content = content + '(' + '云存' + ')'
- return content
- elif order_type == 1:
- content = content + '(' + 'AI' + ')'
- return content
- elif order_type == 2:
- pass
- elif order_type == 4:
- content = content + '(' + '云盘' + ')'
- return content
- @staticmethod
- def is_cloud_device(ucode, device_type):
- """
- 设备是否支持云存
- @param ucode: 设备版本
- @param device_type: 设备类型
- """
- if len(ucode) > 4:
- number = ucode[-4]
- else:
- return False
- device_type_qs = AppDeviceType.objects.filter(type=device_type).values('model')
- model = device_type_qs[0]['model'] if device_type_qs.exists() else ''
- # 判断设备是否为ipc设备和是否支持云存
- if model == 2 and number in ['4', '5']:
- return True
- return False
- @staticmethod
- def negative_number_judgment(number_list):
- """
- 判断正负数
- @param number_list: float或int类型列表
- """
- if any(i < 0 for i in number_list):
- return False
- else:
- return True
- @staticmethod
- def check_password(password1, password2):
- """
- 比较密码
- @param 返回True or False
- """
- return constant_time_compare(password1, password2)
- @staticmethod
- def compare_version_number(version_number, version_number_list):
- """
- 比对版本号大小
- @param version_number: 版本号
- @param version_number_list: 版本号列表
- """
- version_list = []
- input_version = LooseVersion(version_number)
- for version in version_number_list:
- version = LooseVersion(version)
- if input_version >= version:
- version_list.append(version)
- else:
- continue
- return version_list
- @staticmethod
- def convert_to_timestamp(timezone_offset, time_string):
- """
- 时间字符串转为时间戳
- @param timezone_offset: 时区
- @param time_string: 时间字符串
- @return: timestamp
- """
- datetime_obj = datetime.datetime.strptime(time_string, '%Y-%m-%d %H:%M:%S')
- # 创建一个表示指定时区的timedelta对象
- utc_offset = datetime.timedelta(hours=timezone_offset)
- # 调整时区
- datetime_obj = datetime_obj - utc_offset
- # datetime.datetime对象 -> str
- time_str_utc = datetime_obj.strftime("%Y-%m-%d %H:%M:%S")
- timestamp = calendar.timegm(time.strptime(time_str_utc, '%Y-%m-%d %H:%M:%S'))
- return timestamp
- @staticmethod
- def get_uid_by_serial_number(serial_number):
- """
- 根据序列号获取绑定uid
- @param serial_number: 9位序列号
- @return: uid信息
- """
- c_serial_qs = UIDCompanySerialModel.objects.filter(company_serial__serial_number=serial_number[0:6])
- if not c_serial_qs.exists():
- return serial_number
- c_serial_info = c_serial_qs.values('uid__uid')
- return c_serial_info[0]['uid__uid']
- @staticmethod
- def get_uids_by_serial_numbers(serial_numbers):
- """
- 根据多个序列号获取绑定uid列表
- @param serial_numbers: 多个6位序列号列表
- @return: uid列表
- """
- if not serial_numbers:
- return []
-
- # 确保所有序列号都是6位
- serial_list = [serial[:6] for serial in serial_numbers if serial]
- if not serial_list:
- return []
-
- # 查询所有匹配的序列号
- c_serial_qs = UIDCompanySerialModel.objects.filter(company_serial__serial_number__in=serial_list)
-
- # 如果没有匹配记录,返回原序列号列表
- if not c_serial_qs.exists():
- return serial_numbers
-
- # 获取所有匹配的uid
- uid_list = list(c_serial_qs.values_list('uid__uid', flat=True))
-
- return uid_list
- @staticmethod
- def get_serial_number_by_uid(uid):
- """
- 根据序列号获取绑定uid
- @param uid: uid
- @return: uid信息
- """
- c_serial_qs = UIDCompanySerialModel.objects.filter(uid__uid=uid)
- if not c_serial_qs.exists():
- return uid
- c_serial_qs = c_serial_qs.annotate(mark=F('company_serial__company__mark'),
- serial_number=F('company_serial__serial_number'))
- c_serial_info = c_serial_qs.values('mark', 'serial_number')
- return c_serial_info[0]['serial_number'] + c_serial_info[0]['mark']
- @staticmethod
- def get_user_tz(user_id):
- """
- 获取用户时区
- @param user_id: 用户id
- @return: tz
- """
- # 从gateway_push表查询时区
- gateway_push_qs = GatewayPush.objects.filter(user_id=user_id).order_by('-id').first()
- if gateway_push_qs is None:
- tz = 0.00
- else:
- # 截掉.00然后转为浮点型
- tz = float(gateway_push_qs.tz[:-3])
- return tz
- @staticmethod
- def update_alexa_events(data_list):
- """
- 请求Alexa服务器更新事件网关
- 邮件提醒捕获的异常
- @param data_list: 数据列表
- @return:
- """
- try:
- data_list = json.dumps(data_list)
- data = {'data_list': data_list}
- url = ALEXA_DOMAIN + 'deviceStatus/addOrUpdateV2'
- requests.post(url, data=data, timeout=30)
- except Exception as e:
- S3Email().faEmail(
- '请求Alexa服务器更新事件网关异常:error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)),
- 'servers@ansjer.com')
- pass
- @classmethod
- def confirm_msg_sign_name(cls, sign_name, phone):
- """
- 确认短信签名
- @param sign_name: app签名标识
- @param phone: 手机号
- @return:
- """
- if sign_name == 'zosi':
- sign_name = '周视'
- elif sign_name == 'vsees':
- # 微瞳移动号码使用 Ansjer 签名
- is_china_mobile = cls.is_china_mobile(phone)
- if is_china_mobile:
- sign_name = 'Ansjer'
- else:
- sign_name = '微瞳'
- else:
- sign_name = 'Ansjer'
- return sign_name
- @staticmethod
- def is_china_mobile(phone):
- """
- 检查手机号码是否属于中国移动
- :param phone: 手机号码字符串(11位数字)
- :return: 如果是移动号码返回True,否则False
- """
- # 正则表达式匹配中国移动号段
- pattern = r'^1(3[4-9]|4[7]|5[0-27-9]|7[28]|8[2-47-8]|9[58])\d{8}$'
- return bool(re.fullmatch(pattern, phone))
- @staticmethod
- def confirm_msg_sign_name_with_phone(phone):
- """
- 根据手机号确认短信签名
- @param phone: 手机号码
- @return:
- """
- sign_name = '周视'
- # 根据用户APP包名确定短信签名
- device_user_qs = Device_User.objects.filter(username=phone).values('userID')
- if device_user_qs.exists():
- user_id = device_user_qs[0]['userID']
- user_ex_qs = UserExModel.objects.filter(userID=user_id).values('appBundleId')
- if user_ex_qs.exists():
- if user_ex_qs[0]['appBundleId'] in ['com.cloudlife.commissionf', 'com.cloudlife.commissionf_a']:
- sign_name = '微瞳'
- return sign_name
|