#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ @Copyright (C) ansjer cop Video Technology Co.,Ltd.All rights reserved. @AUTHOR: ASJRD019 @NAME: AnsjerFormal @software: PyCharm @DATE: 2019/5/9 11:50 @Version: python3.6 @MODIFY DECORD:ansjer dev @file: AliPayObject.py @Contact: pzb3076@163.com """ import time from collections import Counter from django.utils.decorators import method_decorator from django.views.decorators.csrf import csrf_exempt from django.views.generic.base import View from Ansjer.config import AWS_DynamoDB_REGION, AWS_DynamoDB_ACCESS_KEY, AWS_DynamoDB_SECRET_KEY, \ USER_BRAND, USER_BRAND_ALL from Model.models import Device_User from Object.RedisObject import RedisObject from Object.ResponseObject import ResponseObject from Object.TokenObject import TokenObject from Service.CommonService import CommonService from Service.ModelService import ModelService ''' http://192.168.136.39:8000/userbrandinfo/queryByAdmin?token=test&page=1&line=10 记录 http://192.168.136.39:8000/userbrandinfo/queryArea?token=test 市的区域统计 http://192.168.136.39:8000/userbrandinfo/queryDeviceSupplier?token=test 品牌统计 http://192.168.136.39:8000/userbrandinfo/queryAllByAdmin?token=test&page=1&line=10 全部记录中每个用户的数据 http://192.168.136.39:8000/userbrandinfo/queryAll?token=eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJ1c2VySUQiOiIxNTQzOTA5MDUwNDEzMTM4MDAxMzgwMDAiLCJsYW5nIjoiY24iLCJ1c2VyIjoiMTM4MDAxMzgwMDEiLCJtX2NvZGUiOiIxMjM0MTMyNDMyMTQiLCJleHAiOjE1NTk4OTY4NTd9.nhK3VSghSGjyXKjel4woz7R_3bhjgqQDlX-ypYsklNU&page=1&line=5 ''' # coding:utf-8 from boto3 import Session from botocore.exceptions import ClientError from boto3.dynamodb.conditions import Key, Attr import logging import json logger = logging.getLogger(__name__) class MyserviceDynamodb(object): def __init__(self, **kwargs): self.region = AWS_DynamoDB_REGION self.access_key = AWS_DynamoDB_ACCESS_KEY self.secret_key = AWS_DynamoDB_SECRET_KEY self.session = self.__session() def __session(self): try: session = Session(aws_access_key_id=self.access_key, aws_secret_access_key=self.secret_key, region_name=self.region) return session except: print("Failed to connect session in region{0}".format(self.region)) # 创建user_brand_all表 def user_brand_all_table_create(self, table_name): dynamodb = self.session.resource('dynamodb') inventory = my.tables_list(table_name) if table_name in inventory: print('包含') else: try: table = dynamodb.create_table( TableName=table_name, KeySchema=[ { 'AttributeName': 'userID', 'KeyType': 'HASH' }, { 'AttributeName': 'addTime', 'KeyType': 'RANGE' } ], AttributeDefinitions=[ { 'AttributeName': 'userID', 'AttributeType': 'S' }, { 'AttributeName': 'addTime', 'AttributeType': 'N' }, ], ProvisionedThroughput={ 'ReadCapacityUnits': 5, 'WriteCapacityUnits': 5, } ) except Exception: logger.error(table_name + '表已经存在') # 查询aws数据库里面有什么表 def tables_list(self, table_name): client = self.session.client('dynamodb') response = client.list_tables() return response['TableNames'] # 创建user_brand表 def table_create(self, table_name): dynamodb = self.session.resource('dynamodb') inventory = my.tables_list(table_name) if table_name in inventory: print('包含') else: try: table = dynamodb.create_table( TableName=table_name, KeySchema=[ { 'AttributeName': 'userID', 'KeyType': 'HASH' } ], AttributeDefinitions=[ { 'AttributeName': 'userID', 'AttributeType': 'S' } ], ProvisionedThroughput={ 'ReadCapacityUnits': 5, 'WriteCapacityUnits': 5, } ) except Exception: logger.error(table_name + '表已经存在') # 添加user_brand_all表数据 def user_brand_all_item_put(self, table_name, data_list): dynamodb = self.session.resource('dynamodb') table = dynamodb.Table(table_name) with table.batch_writer() as batch: for i in data_list: data = json.loads(i.decode('utf-8')) # print (json.loads(i)) # data = i if data['city'] == '': data['city'] = '无' if data['area'] == '': data['area'] = '无' if data['street'] == '': data['street'] = '无' if data['longitude'] == '': data['longitude'] = '无' if data['latitude'] == '': data['latitude'] = '无' batch.put_item( Item={ 'userID': data['userID'], 'addTime': data['addTime'], 'deviceSupplier': data['deviceSupplier'], 'deviceModel': data['deviceModel'], 'osType': data['osType'], 'osVersion': data['osVersion'], 'country': data['country'], 'province': data['province'], 'city': data['city'], 'area': data['area'], 'street': data['street'], 'longitude': data['longitude'], 'latitude': data['latitude'], 'appId': data['appId'], 'status_all': data['status_all'], 'ExpirationTime_TTL': data['ExpirationTime_TTL'], 'ip': data['ip'], } ) print('批量添加数据成功!') # 添加user_brand表数据 def user_brand_item_put(self, table_name, user_id, add_time, device_supplier, device_model, os_type, os_version, country, province, city, area, street, longitude, latitude, app_id, status_all, ip): dynamodb = self.session.resource('dynamodb') table = dynamodb.Table(table_name) table.put_item( Item={ 'userID': user_id, 'addTime': add_time, 'deviceSupplier': device_supplier, 'deviceModel': device_model, 'osType': os_type, 'osVersion': os_version, 'country': country, 'province': province, 'city': city, 'area': area, 'street': street, 'longitude': longitude, 'latitude': latitude, 'appId': app_id, 'status_all': status_all, 'ip': ip, } ) print('添加表一条数据成功!') # page分页数,line条数,table_name表名称,last_evaluated查询开始值,page_value记录分页开始值 def get_page_line(self, page, line, table_name, last_evaluated, page_value): dynamodb = self.session.resource('dynamodb') # if not dynamodb: # raise DynamodbConnectionError("Failed to get resource for dynamodb!") table = dynamodb.Table(table_name) last_evaluated_key = last_evaluated try: if last_evaluated_key is None: response = table.scan( Limit=line, ) page_value.append(last_evaluated_key) else: # 构建分页的起点,传入下一页面的起点,这是由主键来控制的,last_evaluated_key的值就是本表中的uid response = table.scan( Limit=line, ExclusiveStartKey=last_evaluated_key) try: last_evaluated_key = response['LastEvaluatedKey'] # 如果有知就追加在该值的后面 page_value.append(last_evaluated_key) my.get_page_line(page, line, table_name, last_evaluated_key, page_value) except Exception: return page_value except Exception as e: logger.error("Failed to get table {0}, error".format(table_name, e)) return page_value def item_get(self, line, table_name, last_evaluated_key): dynamodb = self.session.resource('dynamodb') table = dynamodb.Table(table_name) if last_evaluated_key is None: response = table.scan( Limit=line, ) else: # 构建分页的起点,传入下一页面的起点,这是由主键来控制的,last_evaluated_key的值就是本表中的uid response = table.scan( Limit=line, ExclusiveStartKey=last_evaluated_key) return response def item_get_count(self, table_name): dynamodb = self.session.resource('dynamodb') table = dynamodb.Table(table_name) response = table.scan() return len(response['Items']) # 已经查询全部 def item_get_brand(self, table_name): dynamodb = self.session.resource('dynamodb') table = dynamodb.Table(table_name) last_evaluated_key = None i = 0 table_info = [] while True: # 刚开始,不需要传入startkey if last_evaluated_key is None: try: response = table.scan() except Exception: break else: # 构建分页的起点,传入下一页面的起点,这是由主键来控制的,last_evaluated_key的值就是本表中的uid try: response = table.scan(ExclusiveStartKey=last_evaluated_key) except Exception: break # print type(response) #字典 i = i + 1 print (i) if response['Items'] != []: table_info.extend(response['Items']) # 判断有没有这个LastEvaluatedKey,如果有,那么还有页面没有拉取玩,将这个值带入request的ExclusiveStartKey中,继续读取页面 try: if response['LastEvaluatedKey']: last_evaluated_key = response['LastEvaluatedKey'] continue else: break except Exception: break return table_info def put_item(self, table, item_dict=None): try: response = table.put_item(Item=item_dict) except Exception as e: logger.error("Failed to put item in to {0}:error{1}".format(table, e)) return response def update_table(self, table_name, user_id, username, add_time, deviceSupplier, deviceModel, osType, osVersion, country, province, city, area, street, longitude, latitude, appId, ip, status_all ): dynamodb = self.session.resource('dynamodb') table = dynamodb.Table(table_name) try: response = table.update_item( Key={ 'userID': user_id, 'username': username }, UpdateExpression="SET addTime = :add_time, deviceSupplier= :deviceSupplier, deviceModel = :deviceModel, osType = :osType, osVersion = :osVersion, country = :country, province = :province, city = :city, area= :area, street= :street, longitude= :longitude, latitude= :latitude, appId= :appId, ip= :ip, status_all= :status_all", ExpressionAttributeValues={ ':add_time': add_time, ':deviceSupplier': deviceSupplier, ':deviceModel': deviceModel, ':osType': osType, ':osVersion': osVersion, ':country': country, ':province': province, ':city': city, ':area': area, ':street': street, ':longitude': longitude, ':latitude': latitude, ':appId': appId, ':ip': ip, ':status_all': status_all, }, ReturnValues="UPDATED_NEW") except ClientError as e: if e.response['Error']['Code'] == "ConditionalCheckFailedException": logger.error(e.response['Error']['Message']) else: print('Failed update the dynamodb by event_id,not Failed Conditional') else: print('修改成功') def table_delete(self, table_name): dynamodb = self.session.resource('dynamodb') table = dynamodb.Table(table_name) table.delete() print('删除表成功') def item_delete(self, table_name, name, user_id, add_time): dynamodb = self.session.resource('dynamodb') table = dynamodb.Table(table_name) try: if name == 'user_brand': table.delete_item( Key={ 'userID': user_id, } ) else: table.delete_item( Key={ 'userID': user_id, 'addTime': int(add_time), } ) return 'ok' except Exception: logger.error("Failed to put item in to {0}:error{1}".format(table)) return 'no' # 搜索查询并分页查询全部 def get_table_info(self, table_name, username): """ 对表进行分页扫描,这里尝试的是对visit_info表进行扫描 """ # 通过dynamodb服务获取目标table的操作对象 dynamodb = self.session.resource('dynamodb') table_handle = dynamodb.Table(table_name) # 这个值是在分页查询的时候,用来记录页面的最后一个主键的下一个,以方便下一个页面的开启 last_evaluated_key = None i = 0 table_info =[] while True: # 刚开始,不需要传入startkey if last_evaluated_key is None: try: response = table_handle.scan( FilterExpression=Attr('deviceSupplier').eq(username) | Key('osType').eq(username) | Key('deviceModel').eq(username) | Key('userID').eq(username) | Key('osVersion').eq(username) | Key('country').eq(username) | Key('appId').eq(username) | Key('province').eq(username) | Key('city').eq(username) | Key('area').eq(username) | Key('street').eq(username) | Key('longitude').eq(username) | Key('latitude').eq(username) | Key('status_all').eq(username) | Key('ip').eq(username)) except Exception: break else: # 构建分页的起点,传入下一页面的起点,这是由主键来控制的,last_evaluated_key的值就是本表中的uid try: response = table_handle.scan( ExclusiveStartKey=last_evaluated_key, FilterExpression=Attr('deviceSupplier').eq(username) | Key('osType').eq(username) | Key('deviceModel').eq(username) | Key('userID').eq(username) | Key('osVersion').eq(username) | Key('country').eq(username) | Key('appId').eq(username) | Key('province').eq(username) | Key('city').eq(username) | Key('area').eq(username) | Key('street').eq(username) | Key('longitude').eq(username) | Key('latitude').eq(username) | Key('status_all').eq(username) | Key('ip').eq(username)) except Exception: break # response 有一个标准的json格式,包含了这次scan结果的各种信息 # print type(response) #字典 i = i+1 print (i) if response['Items']!=[]: table_info.extend(response['Items']) # 判断有没有这个LastEvaluatedKey,如果有,那么还有页面没有拉取玩,将这个值带入request的ExclusiveStartKey中,继续读取页面 try: if response['LastEvaluatedKey']: last_evaluated_key = response['LastEvaluatedKey'] continue else: break except Exception: break for k, v in enumerate(table_info): user_ID = Device_User.objects.filter(userID=table_info[k]['userID']).values('userID', 'username', 'NickName') try: table_info[k]['username'] = user_ID[0]['username'] except Exception: table_info[k]['username'] = '' return table_info pass my = MyserviceDynamodb() # print(my.table_delete('user_brand')) # print(my.table_delete('user_brand_all')) # my.table_create(USER_BRAND) # my.user_brand_all_table_create(USER_BRAND_ALL) # print(my.item_put('user_brand')) # table_value = my.get_table('user_brand') class UserBrandInfo(View): @method_decorator(csrf_exempt) def dispatch(self, *args, **kwargs): return super(UserBrandInfo, self).dispatch(*args, **kwargs) def get(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.GET, request, operation) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.POST, request, operation) def validation(self, request_dict, request, operation): self.clientIP = CommonService.get_ip_address(request) response = ResponseObject() token = request_dict.get('token', None) tko = TokenObject(token) if tko.code == 0: response.lang = tko.lang userID = tko.userID if operation == 'add': return self.add_info(request_dict, userID, USER_BRAND, USER_BRAND_ALL, response) elif operation == 'queryByAdmin': return self.query_info(request_dict, userID, USER_BRAND, USER_BRAND_ALL, response) elif operation == 'queryAllByAdmin': return self.query_all_info(request_dict, userID, USER_BRAND, USER_BRAND_ALL, response) elif operation == 'deleteByAdmin': return self.delete_by_admin(request_dict, userID, USER_BRAND, USER_BRAND_ALL, response) elif operation == 'queryDeviceSupplier': return self.query_deviceSupplier_info(request_dict, userID, USER_BRAND, USER_BRAND_ALL, response) elif operation == 'queryArea': return self.query_area_info(request_dict, userID, USER_BRAND, USER_BRAND_ALL, response) else: return response.json(444) else: return response.json(tko.code) # http://192.168.136.39:8000/userbrandinfo?operation=add&token=test&deviceSupplier=小米&deviceModel=HM NOTE 1TD&osType=WEB&osVersion=4.0.0 def add_info(self, request_dict, userID, user_brand, user_brand_all, response): return response.json(0, {'OK': int(time.time())}) deviceSupplier = request_dict.get('deviceSupplier', None) deviceModel = request_dict.get('deviceModel', None) osType = request_dict.get('osType', None) osVersion = request_dict.get('osVersion', None) country = request_dict.get('country', None) province = request_dict.get('province', None) city = request_dict.get('city', '') area = request_dict.get('area', '') street = request_dict.get('street', '') longitude = request_dict.get('longitude', '') latitude = request_dict.get('latitude', '') appId = request_dict.get('appId', None) if city == '': city = '无' if area == '': area = '无' if street == '': street = '无' if longitude == '': longitude = '无' if latitude == '': latitude = '无' param_area = CommonService.get_param_flag(data=[country, province, city]) must_fill_in = CommonService.get_param_flag(data=[appId, deviceSupplier, deviceModel, osType, osVersion]) if must_fill_in is False: return response.json(444, 'appId,deviceSupplier,deviceModel,osType,osVersion') if param_area is True: if '省' in province: province = province.replace('省', '') if '市' in city: city = city.replace('市', '') status = 1 country = country province = province city = city area = area street = street longitude = longitude latitude = latitude else: status = 0 jsonData = CommonService.getIpIpInfo(ip=self.clientIP, lang='CN') country = jsonData['country_name'] province = jsonData['region_name'] city = jsonData['city_name'] area = '无' street = '无' longitude = jsonData['longitude'] latitude = jsonData['latitude'] if country == '局域网': city = '无' longitude = '无' latitude = '无' param_flag = CommonService.get_param_flag(data=[deviceSupplier, deviceModel, osType, osVersion]) if param_flag is True: addTime = int(time.time()) print(time.time()) add_data = { 'userID': userID, 'addTime': addTime, 'deviceSupplier': deviceSupplier, 'deviceModel': deviceModel, 'osType': osType, 'osVersion': osVersion, 'country': country, 'province': province, 'city': city, 'area': area, 'street': street, 'longitude': longitude, 'latitude': latitude, 'appId': appId, 'status_all': status, 'ExpirationTime_TTL': addTime + 2592000, 'ip': self.clientIP } # 增加用户扩展信息 redisObj = RedisObject() add_user_brand_all_data = json.dumps(add_data) logKey = USER_BRAND_ALL redisObj.rpush(name=logKey, val=add_user_brand_all_data) # print (redisObj.llen(name=logKey)) # 判断redis列表长度 try: if redisObj.llen(name=logKey) > 100: data_list = redisObj.lrange(logKey, 0, -1) redisObj.del_data(key=logKey) # 添加user_brand_all表信息 my.user_brand_all_item_put(user_brand_all, data_list) # 添加user_brand表信息 my.user_brand_item_put(user_brand, userID, addTime, deviceSupplier, deviceModel, osType, osVersion, country, province, city, area, street, longitude, latitude, appId, status, self.clientIP) except Exception: my.table_create(USER_BRAND) my.user_brand_all_table_create(USER_BRAND_ALL) return response.json(0, {'OK': int(time.time())}) else: # 参数错误 return response.json(444) # 查询每个用户的最新登录数据表 def query_info(self, request_dict, userID, user_brand, user_brand_all, response): page = int(request_dict.get('page', None)) line = int(request_dict.get('line', None)) username = request_dict.get('username', None) param_flag = CommonService.get_param_flag(data=[page, line]) if param_flag is True: check_perm = ModelService.check_perm(userID=userID, permID=30) page_value = [] if check_perm is True: if username is None or username == '': # table_Limit_value = my.get_page_line(page,line,user_brand,None,page_value) # # 页面减1 # page = page-1 # if page < len(table_Limit_value): # # 分页后查询 # table_value = my.item_get(line,user_brand,table_Limit_value[page]) # table_value = table_value['Items'] # count = my.item_get_count(user_brand) # return response.json(0, {'datas': table_value, 'count': count}) return response.json(0, {'datas': {}, 'count': 0}) else: # 查询 user_ID = Device_User.objects.filter(username=username).values('userID', 'NickName') try: username = user_ID[0]['userID'] except Exception: username = username table_value = my.get_table_info(user_brand, username) return response.json(0, {'datas': table_value, 'count': len(table_value)}) else: return response.json(404) else: return response.json(444) # 删除接口 def delete_by_admin(self, request_dict, userID, user_brand, user_brand_all, response): id = request_dict.getlist('id', None) username = request_dict.getlist('username', None) add_time = request_dict.getlist('add_time', None) table_name = request_dict.getlist('table_name', None) param_flag = CommonService.get_param_flag(data=[id, username]) if param_flag is True: check_perm = ModelService.check_perm(userID=userID, permID=10) if check_perm is True: if str(table_name[0]) == 'user_brand': is_delete = my.item_delete(user_brand, 'user_brand', id[0], add_time[0]) else: is_delete = my.item_delete(user_brand_all, 'user_brand_all', id[0], add_time[0]) return response.json(0, {'delete_count': is_delete[0]}) else: return response.json(404) else: return response.json(444) # Query each user record statistics def query_all_info(self, request_dict, userID, user_brand, user_brand_all, response): page = int(request_dict.get('page', None)) line = int(request_dict.get('line', None)) username = request_dict.get('username', None) param_flag = CommonService.get_param_flag(data=[page, line]) # 增加用户扩展信息 try: redisObj = RedisObject() logKey = USER_BRAND_ALL data_list = redisObj.lrange(logKey, 0, -1) redisObj.del_data(key=logKey) # 添加user_brand_all表信息 my.user_brand_all_item_put(user_brand_all, data_list) except Exception: print ('有误') print ('添加了数据') if param_flag is True: check_perm = ModelService.check_perm(userID=userID, permID=30) page_value = [] if check_perm is True: if username is None or username == '': return response.json(0, {'datas': {}, 'count': 0}) else: # 查询 user_ID = Device_User.objects.filter(username=username).values('userID', 'NickName') try: username = user_ID[0]['userID'] except Exception: username = username table_value = my.get_table_info(user_brand_all, username) return response.json(0, {'datas': table_value, 'count': len(table_value)}) else: return response.json(404) else: return response.json(444) # Brand statistics interface def query_deviceSupplier_info(self, request_dict, userID, user_brand, user_brand_all, response): check_perm = ModelService.check_perm(userID=userID, permID=30) if check_perm is True: table_value = my.item_get_brand(user_brand) send_jsons = [] counts = 0 for i in table_value: send_jsons.append(table_value[counts]['deviceSupplier']) counts = counts + 1 send_jsons = Counter(send_jsons) return response.json(0, {'datas': send_jsons, 'counts': counts}) else: return response.json(404) # Interface to area statistics def query_area_info(self, request_dict, userID, user_brand, user_brand_all, response): check_perm = ModelService.check_perm(userID=userID, permID=30) district = request_dict.get('district', None) if check_perm is True: # 按照用户去重复查询 table_value = my.item_get_brand(user_brand) send_jsons = [] counts = 0 for i in table_value: send_jsons.append(table_value[counts][district]) counts = counts + 1 send_jsons = Counter(send_jsons) return response.json(0, {'datas': send_jsons, 'counts': counts}) else: return response.json(404)