import datetime import threading import time import requests import simplejson as json from django.utils.timezone import utc from Ansjer import config as api_settings from Object.TokenObject import TokenObject from Object.mongodb import mongodb from Service.CommonService import CommonService from Service.ModelService import ModelService from Service.TemplateService import TemplateService from Object.RedisObject import RedisObject from Ansjer.config import SERVER_TYPE from Model.models import Device_User # coding:utf-8 from boto3 import Session from botocore.exceptions import ClientError from boto3.dynamodb.conditions import Key, Attr import logging import json from Ansjer.config import DOMAIN_HOST,AWS_DynamoDB_REGION,AWS_DynamoDB_ACCESS_KEY,AWS_DynamoDB_SECRET_KEY logger = logging.getLogger(__name__) class MyserviceDynamodb(): def __init__(self, **kwargs): self.region = AWS_DynamoDB_REGION self.access_key = AWS_DynamoDB_ACCESS_KEY self.secret_key = AWS_DynamoDB_SECRET_KEY self.session = self.__session() def __session(self): try: session = Session(aws_access_key_id=self.access_key, aws_secret_access_key=self.secret_key,region_name=self.region) except: print("Failed to connect session in region{0}".format(self.region)) return session # 添加access_log表数据 def access_log_item_put(self,table_name,data_list): dynamodb = self.session.resource('dynamodb') table = dynamodb.Table(table_name) with table.batch_writer() as batch: for i in data_list: data = json.loads(i.decode('utf-8')) if data['userID'] == '' : try: user_id = Device_User.objects.filter(username= data['userName']).values('userID') data['userID'] = user_id[0]['userID'] except Exception: data['userID'] ='' batch.put_item( Item={ 'userID':data['userID'], 'addTime':data['addTime'], 'ip':data['ip'], 'status':data['status'], 'operation':data['operation'], 'content':data['content'], 'Expiration_time':data['Expiration_time'], } ) print ('添加access_log表数据成功!') # 搜索该数据库表的方法 def get_item(self, table_name, values): dynamodb = self.session.resource('dynamodb') if not dynamodb: raise DynamodbConnectionError("Failed to get resource for dynamodb!") try: table = dynamodb.Table(table_name) response = table.scan( FilterExpression = Attr('userID').eq(values) | Key('ip').eq(values) | Key('status').eq(values) | Key('content').eq(values) | Key('operation').eq(values) ) items = response['Items'] return items except Exception as e: logger.error("Failed to get table {0}, error".format(table_name, e)) return [] # 时间段搜索 def get_item_time(self, table_name, start_date, end_date): dynamodb = self.session.resource('dynamodb') if not dynamodb: raise DynamodbConnectionError("Failed to get resource for dynamodb!") try: table = dynamodb.Table(table_name) response = table.scan( Select='COUNT', FilterExpression=(Attr('addTime').gt(start_date) & Key('addTime').lt(end_date)) | Key('addTime').eq(start_date) ) items = response['Count'] return items except Exception as e: logger.error("Failed to get table {0}, error".format(table_name, e)) return 0 # 时间段搜索 def get_item_date(self, table_name, date): dynamodb = self.session.resource('dynamodb') if not dynamodb: raise DynamodbConnectionError("Failed to get resource for dynamodb!") res = {} try: table = dynamodb.Table(table_name) times = datetime.datetime.fromtimestamp(date) time_dict = CommonService.getTimeDict(times) for k, v in time_dict.items(): start_date = time_dict[k] end_date = time_dict[k] + datetime.timedelta(hours=1) start_date = int(start_date.timestamp()) end_date = int(end_date.timestamp()) response = table.scan( Select = 'COUNT', FilterExpression=(Attr('addTime').gt(start_date) & Key('addTime').lt(end_date)) | Key('addTime').eq(start_date) ) print (response['Count']) count = response['Count'] if count: res[k] = count else: res[k] = 0 except Exception as e: logger.error("Failed to get table {0}, error".format(table_name, e)) return res def item_get_brand(self, table_name): dynamodb = self.session.resource('dynamodb') table = dynamodb.Table(table_name) try: response = table.scan() response = response['Items'] return response except Exception: logger.error("Failed to put item in to {0}:error{1}".format(table)) # 查询aws数据库里面有什么表 def tables_list(self, table_name): client = self.session.client('dynamodb') response = client.list_tables() return response['TableNames'] # 创建access_log表 def access_log_table_create(self, table_name): dynamodb = self.session.resource('dynamodb') inventory = my.tables_list(table_name) if table_name in inventory: print ('access_log表包含') else: try: table = dynamodb.create_table( TableName=table_name, KeySchema=[ { 'AttributeName': 'userID', 'KeyType': 'HASH' }, { 'AttributeName': 'addTime', 'KeyType': 'RANGE' } ], AttributeDefinitions=[ { 'AttributeName': 'userID', 'AttributeType': 'S' }, { 'AttributeName': 'addTime', 'AttributeType': 'N' }, ], ProvisionedThroughput={ 'ReadCapacityUnits': 5, 'WriteCapacityUnits': 5, } ) print ('创建表成功') except Exception: logger.error (table_name + '表已经存在') # 删除表 def table_delete(self, table_name): dynamodb = self.session.resource('dynamodb') table = dynamodb.Table(table_name) table.delete() print ('删除表成功') my = MyserviceDynamodb() # print(my.table_delete('user_brand_all')) if DOMAIN_HOST == 'www.dvema.com': user_brand = 'access_log' else: user_brand = 'test_access_log' # my.table_delete(user_brand) # my.access_log_table_create(user_brand) # 杂项类,共用行不高,但有些地方需求 class MiscellService(): # 获取访问用户名称 @staticmethod def get_access_name(request_dict): userName = request_dict.get('userName', None) if userName: return userName email = request_dict.get('email', None) if email: return email phone = request_dict.get('phone', None) if phone: return phone token = request_dict.get('token', None) user = '' if token is not None: tko = TokenObject(token) if tko.code == 0: user = tko.user # user = ModelService.get_user_name(tko.userID) return user @staticmethod def add_access_log(request, status_code): # 增加多进程 异步 asy = threading.Thread(target=addLog, args=(request, status_code)) asy.start() @staticmethod def access_log(request, response, type): if request.method == 'GET': request_dict = request.GET elif request.method == 'POST': # request.encoding = 'utf-8' request_dict = request.POST else: return api_list = TemplateService.log_api() request_path = request.path.strip().strip('/') if request_path in api_list: clientIP = CommonService.get_ip_address(request) now_time = time.time() password = request_dict.get('userPwd', None) if password is not None: request_dict = dict(request_dict) request_dict.pop('userPwd') content = json.dumps(request_dict) area = CommonService.getAddr(ip=clientIP) if type == 1: status_code = 200 else: status_code = response.status_code add_data = { 'user': MiscellService.get_access_name(request=request), 'ip': clientIP, 'status': status_code, 'path': request_path, 'method': request.method, 'time': int(now_time), 'area': area, 'content': content, 'et': datetime.datetime.utcnow() } mdb = mongodb() col = "log_access" mdb.insert_one(col=col, data=add_data) # 获取所有设备ip地址的指向的国家 @staticmethod def getArea(ip): data = {'ip': ip} country = '' URL = 'http://ip.taobao.com/service/getIpInfo.php' try: r = requests.get(URL, params=data, timeout=3) except requests.RequestException as e: print(e) else: json_data = r.json() if json_data['code'] == 0: if json_data['data']['country'] != 'XX': country = json_data['data']['country'] return country @staticmethod def get_item_log(table_name, values): item_log = my.get_item(table_name, values) return item_log @staticmethod def item_get_brand(table_name): item_log = my.item_get_brand(table_name) return item_log @staticmethod def get_item_time(table_name,start_date, end_date): item_log = my.get_item_time(table_name,start_date, end_date) return item_log @staticmethod def get_item_date(table_name,date): item_log = my.get_item_date(table_name,date) return item_log # 下载接口添加访问日志 @staticmethod def add_ota_download_log(request): clientIP = CommonService.get_ip_address(request) request_path = request.path.strip().strip('/') now_time = datetime.datetime.utcnow().replace(tzinfo=utc).astimezone(utc) add_data = { 'user': 'None', 'ip': clientIP, 'status': 200, 'url': request.META['SERVER_PROTOCOL'] + '-' + request.method + '-' + request.path, 'operation': request_path, 'time': now_time, 'content': '' } ModelService.addAccessLog(data=add_data) @staticmethod def batch_add_access_log(request, status_code): asy = threading.Thread(target=batch_add_log_ctr, args=(request, status_code)) asy.start() @staticmethod def DynamoDB_add_access_log(request, status_code): asy = threading.Thread(target=dynamo_db_add_log_ctr, args=(request, status_code)) asy.start() def addLog(request, status_code): try: request.encoding = 'utf-8' if request.method == 'GET': request_dict = request.GET elif request.method == 'POST': request_dict = request.POST else: return api_list = TemplateService.log_api() request_path = request.path.strip().strip('/') if request_path in api_list: user = MiscellService.get_access_name(request_dict=request_dict) clientIP = CommonService.get_ip_address(request) now_time = datetime.datetime.utcnow().replace(tzinfo=utc).astimezone(utc) password = request_dict.get('userPwd', None) if password is not None: request_dict = dict(request_dict) request_dict.pop('userPwd') content = json.dumps(request_dict) print(content) if user != '': user = user else: print('空') user = '空' add_data = { 'user': user, 'ip': clientIP, 'status': status_code, 'url': request.META['SERVER_PROTOCOL'] + '-' + request.method + '-' + request.path, 'operation': request_path, 'time': now_time, 'content': content } print(add_data) ModelService.addAccessLog(data=add_data) except Exception as e: print(repr(e)) pass def batch_add_log_ctr(request, status_code): request.encoding = 'utf-8' if request.method == 'GET': request_dict = request.GET elif request.method == 'POST': request_dict = request.POST else: return api_list = TemplateService.log_api() request_path = request.path.strip().strip('/') if request_path in api_list: user = MiscellService.get_access_name(request_dict) clientIP = CommonService.get_ip_address(request) now_time = datetime.datetime.utcnow().replace(tzinfo=utc).astimezone(utc) password = request_dict.get('userPwd', None) if password is not None: request_dict = dict(request_dict) request_dict.pop('userPwd') content = json.dumps(request_dict) add_data = { 'user': user, 'ip': clientIP, 'status': status_code, 'url': request.META['SERVER_PROTOCOL'] + '-' + request.method + '-' + request.path, 'operation': request_path, 'time': str(now_time), 'content': content } redisObj = RedisObject() loggerData = json.dumps(add_data) # print(loggerData) if SERVER_TYPE == 'Ansjer.formal_settings': logKey = 'logger' else: logKey = 'test_logger' redisObj.rpush(name=logKey, val=loggerData) # 判断redis列表长度 if redisObj.llen(name=logKey) > 100 or SERVER_TYPE == 'Ansjer.test_settings': data_list = redisObj.lrange(logKey, 0, -1) redisObj.del_data(key=logKey) ModelService.add_batch_log(data_list) def dynamo_db_add_log_ctr(request, status_code): request.encoding = 'utf-8' if request.method == 'GET': request_dict = request.GET elif request.method == 'POST': request_dict = request.POST else: return api_list = TemplateService.log_api() request_path = request.path.strip().strip('/') if SERVER_TYPE == 'Ansjer.formal_settings': logKey = 'loggers' else: logKey = 'test_loggers' # 判断redis列表长度为10条以上就会添加日志 num = 1 clientIP = CommonService.get_ip_address(request) token = request_dict.get('token', None) userID = '无' if token is not None: tko = TokenObject(token) userID = tko.userID password = request_dict.get('userPwd', None) userName = request_dict.get('userName', None) if password is not None: request_dict = dict(request_dict) request_dict.pop('userPwd') content = json.dumps(request_dict) addTime = int(time.time()) if DOMAIN_HOST == 'www.dvema.com': user_brand = 'access_log' else: user_brand = 'test_access_log' # 过滤某些接口集合 if request_path in api_list: add_data = { 'userName':userName, 'userID': userID, 'ip': clientIP, 'status': status_code, 'operation': request_path, 'addTime': addTime, 'content': content, 'Expiration_time': addTime + 2592000 } redisObj = RedisObject() loggerData = json.dumps(add_data) redisObj.rpush(name=logKey, val=loggerData) # 判断redis列表长度 if redisObj.llen(name=logKey) > num or SERVER_TYPE == 'Ansjer.test_settings': data_list = redisObj.lrange(logKey, 0, -1) redisObj.del_data(key=logKey) my.access_log_item_put(user_brand,data_list) # 判断是是否是下载接口的 else: path = request.path index = path.find('/OTA/downloads') if index != -1: addsCount = len(api_settings.ADDR_URL) if addsCount > 1000: response = ResponseObject() # 提示获取链接失败,实际上是下载太频繁,超过了1000的并发量 return response.json(901) else: # 合理的情况就添加访问日志 adds = request.META.get('REMOTE_ADDR', None) api_settings.ADDR_URL.append(adds) add_data = { 'userName': '无', 'userID':'无', 'ip': clientIP, 'status': status_code, 'operation': request_path, 'addTime': addTime, 'content': content, 'Expiration_time': addTime + 2592000 } redisObj = RedisObject() loggerData = json.dumps(add_data) redisObj.rpush(name=logKey, val=loggerData) # 判断redis列表长度 if redisObj.llen(name=logKey) > num or SERVER_TYPE == 'Ansjer.test_settings': data_list = redisObj.lrange(logKey, 0, -1) redisObj.del_data(key=logKey) my.access_log_item_put(user_brand, data_list)