| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521 | import datetimeimport threadingimport timeimport requestsimport simplejson as jsonfrom django.utils.timezone import utcfrom Ansjer import config as api_settingsfrom Object.TokenObject import TokenObjectfrom Object.mongodb import mongodbfrom Service.CommonService import CommonServicefrom Service.ModelService import ModelServicefrom Service.TemplateService import TemplateServicefrom Object.RedisObject import RedisObjectfrom Ansjer.config import SERVER_TYPEfrom Model.models import Device_User# coding:utf-8from boto3 import Sessionfrom botocore.exceptions import ClientErrorfrom boto3.dynamodb.conditions import Key, Attrimport loggingimport jsonfrom Ansjer.config import DOMAIN_HOST,AWS_DynamoDB_REGION,AWS_DynamoDB_ACCESS_KEY,AWS_DynamoDB_SECRET_KEYlogger = logging.getLogger(__name__)class MyserviceDynamodb():    def __init__(self, **kwargs):        self.region = AWS_DynamoDB_REGION        self.access_key = AWS_DynamoDB_ACCESS_KEY        self.secret_key = AWS_DynamoDB_SECRET_KEY        self.session = self.__session()    def __session(self):        try:            session = Session(aws_access_key_id=self.access_key, aws_secret_access_key=self.secret_key,region_name=self.region)        except:            print("Failed to connect session in region{0}".format(self.region))        return session    # 添加access_log表数据    def access_log_item_put(self,table_name,data_list):        dynamodb = self.session.resource('dynamodb')        table = dynamodb.Table(table_name)        with table.batch_writer() as batch:            for i in data_list:                data = json.loads(i.decode('utf-8'))                if data['userID'] == '' :                    try:                        user_id = Device_User.objects.filter(username= data['userName']).values('userID')                        data['userID'] = user_id[0]['userID']                    except Exception:                        data['userID'] =''                batch.put_item(                    Item={                        'userID':data['userID'],                        'addTime':data['addTime'],                        'ip':data['ip'],                        'status':data['status'],                        'operation':data['operation'],                        'content':data['content'],                        'Expiration_time':data['Expiration_time'],                    }                )        print ('添加access_log表数据成功!')# 搜索该数据库表的方法    def get_item(self, table_name, values):        dynamodb = self.session.resource('dynamodb')        if not dynamodb:            raise DynamodbConnectionError("Failed to get resource for dynamodb!")        try:            table = dynamodb.Table(table_name)            response = table.scan(                FilterExpression = Attr('userID').eq(values)                                 | Key('ip').eq(values)                                 | Key('status').eq(values)                                 | Key('content').eq(values)                                 | Key('operation').eq(values)            )            items = response['Items']            return items        except Exception as e:            logger.error("Failed to get table {0}, error".format(table_name, e))            return []    # 时间段搜索    def get_item_time(self, table_name, start_date, end_date):        dynamodb = self.session.resource('dynamodb')        if not dynamodb:            raise DynamodbConnectionError("Failed to get resource for dynamodb!")        try:            table = dynamodb.Table(table_name)            response = table.scan(                Select='COUNT',                FilterExpression=(Attr('addTime').gt(start_date)                                 & Key('addTime').lt(end_date)) | Key('addTime').eq(start_date)            )            items = response['Count']            return items        except Exception as e:            logger.error("Failed to get table {0}, error".format(table_name, e))            return 0    # 时间段搜索    def get_item_date(self, table_name, date):        dynamodb = self.session.resource('dynamodb')        if not dynamodb:            raise DynamodbConnectionError("Failed to get resource for dynamodb!")        res = {}        try:            table = dynamodb.Table(table_name)            times = datetime.datetime.fromtimestamp(date)            time_dict = CommonService.getTimeDict(times)            for k, v in time_dict.items():                start_date = time_dict[k]                end_date = time_dict[k] + datetime.timedelta(hours=1)                start_date = int(start_date.timestamp())                end_date = int(end_date.timestamp())                response = table.scan(                    Select = 'COUNT',                    FilterExpression=(Attr('addTime').gt(start_date)                                      & Key('addTime').lt(end_date)) | Key('addTime').eq(start_date)                )                print (response['Count'])                count = response['Count']                if count:                    res[k] = count                else:                    res[k] = 0        except Exception as e:            logger.error("Failed to get table {0}, error".format(table_name, e))        return res    def item_get_brand(self, table_name):        dynamodb = self.session.resource('dynamodb')        table = dynamodb.Table(table_name)        try:            response = table.scan()            response = response['Items']            return response        except Exception:            logger.error("Failed to put item in to {0}:error{1}".format(table))# 查询aws数据库里面有什么表    def tables_list(self, table_name):        client = self.session.client('dynamodb')        response = client.list_tables()        return response['TableNames']    # 创建access_log表    def access_log_table_create(self, table_name):        dynamodb = self.session.resource('dynamodb')        inventory = my.tables_list(table_name)        if table_name in inventory:            print ('access_log表包含')        else:            try:                table = dynamodb.create_table(                    TableName=table_name,                    KeySchema=[                        {                            'AttributeName': 'userID',                            'KeyType': 'HASH'                        },                        {                            'AttributeName': 'addTime',                            'KeyType': 'RANGE'                        }                    ],                    AttributeDefinitions=[                        {                            'AttributeName': 'userID',                            'AttributeType': 'S'                        },                        {                            'AttributeName': 'addTime',                            'AttributeType': 'N'                        },                    ],                    ProvisionedThroughput={                        'ReadCapacityUnits': 5,                        'WriteCapacityUnits': 5,                    }                )                print ('创建表成功')            except Exception:                logger.error (table_name + '表已经存在')    # 删除表    def table_delete(self, table_name):        dynamodb = self.session.resource('dynamodb')        table = dynamodb.Table(table_name)        table.delete()        print ('删除表成功')my = MyserviceDynamodb()# print(my.table_delete('user_brand_all'))if DOMAIN_HOST == 'www.zositechc.cn':    user_brand = 'access_log'else:    user_brand = 'test_access_log'# my.table_delete(user_brand)# my.access_log_table_create(user_brand)# 杂项类,共用行不高,但有些地方需求class MiscellService():    # 获取访问用户名称    @staticmethod    def get_access_name(request_dict):        userName = request_dict.get('userName', None)        if userName:            return userName        email = request_dict.get('email', None)        if email:            return email        phone = request_dict.get('phone', None)        if phone:            return phone        token = request_dict.get('token', None)        user = ''        if token is not None:            tko = TokenObject(token)            if tko.code == 0:                user = tko.user                # user = ModelService.get_user_name(tko.userID)        return user    @staticmethod    def add_access_log(request, status_code):        # 增加多进程 异步        asy = threading.Thread(target=addLog, args=(request, status_code))        asy.start()    @staticmethod    def access_log(request, response, type):        if request.method == 'GET':            request_dict = request.GET        elif request.method == 'POST':            # request.encoding = 'utf-8'            request_dict = request.POST        else:            return        api_list = TemplateService.log_api()        request_path = request.path.strip().strip('/')        if request_path in api_list:            clientIP = CommonService.get_ip_address(request)            now_time = time.time()            password = request_dict.get('userPwd', None)            if password is not None:                request_dict = dict(request_dict)                request_dict.pop('userPwd')            content = json.dumps(request_dict)            area = CommonService.getAddr(ip=clientIP)            if type == 1:                status_code = 200            else:                status_code = response.status_code            add_data = {                'user': MiscellService.get_access_name(request=request),                'ip': clientIP,                'status': status_code,                'path': request_path,                'method': request.method,                'time': int(now_time),                'area': area,                'content': content,                'et': datetime.datetime.utcnow()            }            mdb = mongodb()            col = "log_access"            mdb.insert_one(col=col, data=add_data)    # 获取所有设备ip地址的指向的国家    @staticmethod    def getArea(ip):        data = {'ip': ip}        country = ''        URL = 'http://ip.taobao.com/service/getIpInfo.php'        try:            r = requests.get(URL, params=data, timeout=3)        except requests.RequestException as e:            print(e)        else:            json_data = r.json()            if json_data['code'] == 0:                if json_data['data']['country'] != 'XX':                    country = json_data['data']['country']        return country    @staticmethod    def get_item_log(table_name, values):        item_log = my.get_item(table_name, values)        return item_log    @staticmethod    def item_get_brand(table_name):        item_log = my.item_get_brand(table_name)        return item_log    @staticmethod    def get_item_time(table_name,start_date, end_date):        item_log = my.get_item_time(table_name,start_date, end_date)        return item_log    @staticmethod    def get_item_date(table_name,date):        item_log = my.get_item_date(table_name,date)        return item_log    # 下载接口添加访问日志    @staticmethod    def add_ota_download_log(request):        clientIP = CommonService.get_ip_address(request)        request_path = request.path.strip().strip('/')        now_time = datetime.datetime.utcnow().replace(tzinfo=utc).astimezone(utc)        add_data = {            'user': 'None',            'ip': clientIP,            'status': 200,            'url': request.META['SERVER_PROTOCOL'] + '-' + request.method + '-' + request.path,            'operation': request_path,            'time': now_time,            'content': ''        }        ModelService.addAccessLog(data=add_data)    @staticmethod    def batch_add_access_log(request, status_code):        asy = threading.Thread(target=batch_add_log_ctr, args=(request, status_code))        asy.start()    @staticmethod    def DynamoDB_add_access_log(request, status_code):        asy = threading.Thread(target=dynamo_db_add_log_ctr, args=(request, status_code))        asy.start()def addLog(request, status_code):    try:        request.encoding = 'utf-8'        if request.method == 'GET':            request_dict = request.GET        elif request.method == 'POST':            request_dict = request.POST        else:            return        api_list = TemplateService.log_api()        request_path = request.path.strip().strip('/')        if request_path in api_list:            user = MiscellService.get_access_name(request_dict=request_dict)            clientIP = CommonService.get_ip_address(request)            now_time = datetime.datetime.utcnow().replace(tzinfo=utc).astimezone(utc)            password = request_dict.get('userPwd', None)            if password is not None:                request_dict = dict(request_dict)                request_dict.pop('userPwd')            content = json.dumps(request_dict)            print(content)            if user != '':                user = user            else:                print('空')                user = '空'            add_data = {                'user': user,                'ip': clientIP,                'status': status_code,                'url': request.META['SERVER_PROTOCOL'] + '-' + request.method + '-' + request.path,                'operation': request_path,                'time': now_time,                'content': content            }            print(add_data)            ModelService.addAccessLog(data=add_data)    except Exception as e:        print(repr(e))        passdef batch_add_log_ctr(request, status_code):    request.encoding = 'utf-8'    if request.method == 'GET':        request_dict = request.GET    elif request.method == 'POST':        request_dict = request.POST    else:        return    api_list = TemplateService.log_api()    request_path = request.path.strip().strip('/')    if request_path in api_list:        user = MiscellService.get_access_name(request_dict)        clientIP = CommonService.get_ip_address(request)        now_time = datetime.datetime.utcnow().replace(tzinfo=utc).astimezone(utc)        password = request_dict.get('userPwd', None)        if password is not None:            request_dict = dict(request_dict)            request_dict.pop('userPwd')        content = json.dumps(request_dict)        add_data = {            'user': user,            'ip': clientIP,            'status': status_code,            'url': request.META['SERVER_PROTOCOL'] + '-' + request.method + '-' + request.path,            'operation': request_path,            'time': str(now_time),            'content': content        }        redisObj = RedisObject()        loggerData = json.dumps(add_data)        # print(loggerData)        if SERVER_TYPE == 'Ansjer.formal_settings':            logKey = 'logger'        else:            logKey = 'test_logger'        redisObj.rpush(name=logKey, val=loggerData)        # 判断redis列表长度        if redisObj.llen(name=logKey) > 100 or SERVER_TYPE == 'Ansjer.test_settings':            data_list = redisObj.lrange(logKey, 0, -1)            redisObj.del_data(key=logKey)            ModelService.add_batch_log(data_list)def dynamo_db_add_log_ctr(request, status_code):    request.encoding = 'utf-8'    if request.method == 'GET':        request_dict = request.GET    elif request.method == 'POST':        request_dict = request.POST    else:        return    api_list = TemplateService.log_api()    request_path = request.path.strip().strip('/')    if SERVER_TYPE == 'Ansjer.formal_settings':        logKey = 'loggers'    else:        logKey = 'test_loggers'    # 判断redis列表长度为10条以上就会添加日志    num = 1    clientIP = CommonService.get_ip_address(request)    token = request_dict.get('token', None)    userID = '无'    if token is not None:        tko = TokenObject(token)        userID = tko.userID    password = request_dict.get('userPwd', None)    userName = request_dict.get('userName', None)    if password is not None:        request_dict = dict(request_dict)        request_dict.pop('userPwd')    content = json.dumps(request_dict)    addTime = int(time.time())    if DOMAIN_HOST == 'www.zositechc.cn':        user_brand = 'access_log'    else:        user_brand = 'test_access_log'    # 过滤某些接口集合    if request_path in api_list:        add_data = {            'userName':userName,            'userID': userID,            'ip': clientIP,            'status': status_code,            'operation': request_path,            'addTime': addTime,            'content': content,            'Expiration_time': addTime + 2592000        }        redisObj = RedisObject()        loggerData = json.dumps(add_data)        redisObj.rpush(name=logKey, val=loggerData)        # 判断redis列表长度        if redisObj.llen(name=logKey) > num or SERVER_TYPE == 'Ansjer.test_settings':            data_list = redisObj.lrange(logKey, 0, -1)            redisObj.del_data(key=logKey)            my.access_log_item_put(user_brand,data_list)    # 判断是是否是下载接口的    else:        path = request.path        index = path.find('/OTA/downloads')        if index != -1:            addsCount = len(api_settings.ADDR_URL)            if addsCount > 1000:                response = ResponseObject()                # 提示获取链接失败,实际上是下载太频繁,超过了1000的并发量                return response.json(901)            else:                # 合理的情况就添加访问日志                adds = request.META.get('REMOTE_ADDR', None)                api_settings.ADDR_URL.append(adds)                add_data = {                    'userName': '无',                    'userID':'无',                    'ip': clientIP,                    'status': status_code,                    'operation': request_path,                    'addTime': addTime,                    'content': content,                    'Expiration_time': addTime + 2592000                }                redisObj = RedisObject()                loggerData = json.dumps(add_data)                redisObj.rpush(name=logKey, val=loggerData)                # 判断redis列表长度                if redisObj.llen(name=logKey) > num or SERVER_TYPE == 'Ansjer.test_settings':                    data_list = redisObj.lrange(logKey, 0, -1)                    redisObj.del_data(key=logKey)                    my.access_log_item_put(user_brand, data_list)
 |