Bläddra i källkod

访问日志用亚马逊的DynamoDB数据库

pzb 5 år sedan
förälder
incheckning
bc9d508622
6 ändrade filer med 472 tillägg och 4 borttagningar
  1. 4 2
      Ansjer/urls.py
  2. 1 1
      Controller/AccessLog.py
  3. 155 0
      Controller/DynamoDBLog.py
  4. 307 1
      Service/MiscellService.py
  5. 1 0
      Service/ModelService.py
  6. 4 0
      Service/middleware.py

+ 4 - 2
Ansjer/urls.py

@@ -2,9 +2,9 @@ from django.conf.urls import url
 from django.urls import path, re_path
 
 from Controller import FeedBack, EquipmentOTA, EquipmentInfo, EquipmentSensor, StreamMedia, AdminManage, AppInfo, \
-    AccessLog, Test, MealManage, DeviceManage, EquipmentStatus, SysManage, DeviceLog, LogAccess, AppColophon, \
+    AccessLog, DynamoDBLog , Test, MealManage, DeviceManage, EquipmentStatus, SysManage, DeviceLog, LogAccess, AppColophon, \
     EquipmentManager, LogManager, PermissionManager, OTAEquipment, shareUserPermission, UidSetController, \
-    UserManger, CheckUserData, \
+    UserManger, CheckUserData,\
     UserController, CloudVod, OrderContrller, VodBucket, DetectController, DeviceShare, UserBrandController, \
     StsOssController, UIDPreview, SysMsg
 
@@ -78,6 +78,8 @@ urlpatterns = [
     url(r'^uidset/(?P<operation>.*)$', UidSetController.UidSetView.as_view()),
     url(r'^appInfo', AppInfo.AppInfo.as_view()),  # app版本信息
     url(r'^accesslog', AccessLog.AccessLog.as_view()),
+    url(r'^dynamoDBLog/(?P<operation>.*)$', DynamoDBLog.DynamoDBLog.as_view()),
+
     url(r'^meal/manage', MealManage.MealManage.as_view()),
     url(r'^device/manage$', DeviceManage.DeviceManage.as_view()),
     # 设备在线

+ 1 - 1
Controller/AccessLog.py

@@ -14,7 +14,6 @@ from Service.ModelService import ModelService
 from Service.TemplateService import TemplateService
 from Ansjer.config import SERVER_TYPE
 from Object.RedisObject import RedisObject
-
 '''
 http://192.168.136.40:8077/accesslog?operation=queryByAdmin&token=test&page=1&line=5&order=-id
 http://192.168.136.40:8077/accesslog?operation=truncateByAdmin&token=test
@@ -24,6 +23,7 @@ http://192.168.136.40:8077/accesslog/staticPath/?token=stest
 '''
 
 
+
 class AccessLog(View):
     @method_decorator(csrf_exempt)
     def dispatch(self, *args, **kwargs):

+ 155 - 0
Controller/DynamoDBLog.py

@@ -0,0 +1,155 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+@Copyright (C) ansjer cop Video Technology Co.,Ltd.All rights reserved.
+@AUTHOR: ASJRD019
+@NAME: AnsjerFormal
+@software: PyCharm
+@DATE: 2019/10/9 11:50
+@Version: python3.6
+@MODIFY DECORD:ansjer dev
+@file: AliPayObject.py
+@Contact: pzb3076@163.com
+"""
+
+from django.views.generic.base import View
+from django.utils.decorators import method_decorator
+from django.views.decorators.csrf import csrf_exempt
+from Service.ModelService import ModelService
+from Model.models import User_Brand,Device_User
+from django.utils import timezone
+import traceback,time,json,urllib.request
+from Object.ResponseObject import ResponseObject
+from Object.TokenObject import TokenObject
+from Service.CommonService import CommonService
+from collections import Counter
+from Ansjer.config import DOMAIN_HOST,AWS_DynamoDB_REGION,AWS_DynamoDB_ACCESS_KEY,AWS_DynamoDB_SECRET_KEY
+from Object.RedisObject import RedisObject
+from Service.MiscellService import MiscellService
+import datetime, simplejson as json
+import time
+'''
+http://192.168.136.39:8000/dynamoDBLog/searchByAdmin?search_value=154390905041313800138000&token=eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJ1c2VySUQiOiIxNTQzOTA5MDUwNDEzMTM4MDAxMzgwMDAiLCJsYW5nIjoiY24iLCJ1c2VyIjoiMTM4MDAxMzgwMDEiLCJtX2NvZGUiOiIxMjM0MTMyNDMyMTQiLCJleHAiOjE1NzMyNjc3OTR9.YWNEHKKrzFfEc4liz6N8ANQl8C1VfhhLv_MEOqf3yV4&page=1&line=12
+
+
+http://192.168.136.39:8000/dynamoDBLog/connector?token=eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJ1c2VySUQiOiIxNTQzOTA5MDUwNDEzMTM4MDAxMzgwMDAiLCJsYW5nIjoiY24iLCJ1c2VyIjoiMTM4MDAxMzgwMDEiLCJtX2NvZGUiOiIxMjM0MTMyNDMyMTQiLCJleHAiOjE1NzMyNjc3OTR9.YWNEHKKrzFfEc4liz6N8ANQl8C1VfhhLv_MEOqf3yV4
+
+ '''
+class DynamoDBLog(View):
+    @method_decorator(csrf_exempt)
+    def dispatch(self, *args, **kwargs):
+        return super(DynamoDBLog, self).dispatch(*args, **kwargs)
+
+    def get(self, request, *args, **kwargs):
+        request.encoding = 'utf-8'
+        operation = kwargs.get('operation')
+        return self.validation(request.GET, request, operation)
+
+    def post(self, request, *args, **kwargs):
+        request.encoding = 'utf-8'
+        operation = kwargs.get('operation')
+        return self.validation(request.POST, request, operation)
+
+    def validation(self, request_dict, request, operation):
+        print (operation)
+        if operation is None:
+            return response.json(444, 'error path')
+        self.clientIP = CommonService.get_ip_address(request)
+        response = ResponseObject()
+        token = request_dict.get('token', None)
+        if token is not None:
+            tko = TokenObject(token)
+            response.lang = tko.lang
+            if tko.code == 0:
+                userID = tko.userID
+                if userID is not None:
+                    if operation == 'searchByAdmin':
+                        return self.search_info(request_dict, userID,response)
+                    elif operation == 'connector':
+                        return self.connector_info(request_dict, userID,response)
+                    elif operation == 'daySearch':
+                        return self.get_day_search(request_dict, userID,response)
+
+                    else:
+                        return response.json(444)
+                else:
+                    return response.json(309)
+            else:
+                return response.json(tko.code)
+        else:
+            return response.json(309)
+
+    # 查询搜索表(可以通过用户名,和接口名称,ip地址搜索等)
+    def search_info(self, request_dict, userID,response):
+        search_value = request_dict.get('search_value',None)
+        check_perm = ModelService.check_perm(userID=userID,permID=30)
+        if check_perm is True:
+            if search_value is None or search_value is '':
+                return response.json(0, {'datas': {}, 'count': 0})
+            else:
+                if DOMAIN_HOST == 'www.dvema.com':
+                    user_brand = 'access_log'
+                else:
+                    user_brand = 'test_access_log'
+                # 查询
+                user_ID = Device_User.objects.filter(username=search_value).values('userID', 'NickName')
+                try:
+                    search_value = user_ID[0]['userID']
+                except Exception:
+                    search_value = search_value
+                items = MiscellService.get_item_log( user_brand ,search_value)
+                for k, v in enumerate(items):
+                    user_ID = Device_User.objects.filter(userID=items[k]['userID']).values('username')
+                    try:
+                        items[k]['userID'] = user_ID[0]['username']
+                    except Exception:
+                        items[k]['userID'] = ''
+                return response.json(0, {'datas': items, 'count': len(items)})
+        else:
+            return response.json(404)
+
+    # 查询接口的访问量的对应数量
+    def connector_info(self, request_dict, userID,response):
+        check_perm = ModelService.check_perm(userID=userID,permID=30)
+        if check_perm is True:
+            if DOMAIN_HOST == 'www.dvema.com':
+                user_brand = 'access_log'
+            else:
+                user_brand = 'test_access_log'
+            table_value = MiscellService.item_get_brand(user_brand)
+            send_jsons = []
+            counts = 0
+            for i in table_value:
+                send_jsons.append(table_value[counts]['operation'])
+                counts = counts + 1
+            send_jsons = Counter(send_jsons)
+            return response.json(0, {'datas': send_jsons,'counts':counts})
+        else:
+            return response.json(404)
+
+    # 通过传过来日期天来搜索,访问量,并返回对应小时的访问数量
+    def get_day_search(self, request_dict,userID, response):
+        check_perm = ModelService.check_perm(userID=userID, permID=30)
+        if DOMAIN_HOST == 'www.dvema.com':
+            user_brand = 'access_log'
+        else:
+            user_brand = 'test_access_log'
+        time_stamp = int(request_dict.get('timestamp', None))
+        if check_perm is True:
+            # 分割小时再去搜索
+            times = datetime.datetime.fromtimestamp(time_stamp)
+            time_dict = CommonService.getTimeDict(times)
+            res = {}
+            for k, v in time_dict.items():
+                start_date = time_dict[k]
+                end_date = time_dict[k] + datetime.timedelta(hours=1)
+                start_date = int(start_date.timestamp())
+                end_date = int(end_date.timestamp())
+                count = MiscellService.get_item_time(user_brand,start_date, end_date)
+                if count:
+                    res[k] = count
+                else:
+                    res[k] = 0
+            return response.json(0, {'count': res})
+        else:
+            return response.json(404)

+ 307 - 1
Service/MiscellService.py

@@ -5,6 +5,7 @@ import time
 import requests
 import simplejson as json
 from django.utils.timezone import utc
+from Ansjer import local_settings as api_settings
 
 from Object.TokenObject import TokenObject
 from Object.mongodb import mongodb
@@ -13,6 +14,195 @@ from Service.ModelService import ModelService
 from Service.TemplateService import TemplateService
 from Object.RedisObject import RedisObject
 from Ansjer.config import SERVER_TYPE
+from Model.models import Device_User
+# coding:utf-8
+from boto3 import Session
+from botocore.exceptions import ClientError
+from boto3.dynamodb.conditions import Key, Attr
+import logging
+import json
+from Ansjer.config import DOMAIN_HOST,AWS_DynamoDB_REGION,AWS_DynamoDB_ACCESS_KEY,AWS_DynamoDB_SECRET_KEY
+logger = logging.getLogger(__name__)
+
+class MyserviceDynamodb():
+    def __init__(self, **kwargs):
+        self.region = AWS_DynamoDB_REGION
+        self.access_key = AWS_DynamoDB_ACCESS_KEY
+        self.secret_key = AWS_DynamoDB_SECRET_KEY
+        self.session = self.__session()
+
+    def __session(self):
+        try:
+            session = Session(aws_access_key_id=self.access_key, aws_secret_access_key=self.secret_key,region_name=self.region)
+        except:
+            print("Failed to connect session in region{0}".format(self.region))
+        return session
+
+    # 添加access_log表数据
+    def access_log_item_put(self,table_name,data_list):
+        dynamodb = self.session.resource('dynamodb')
+        table = dynamodb.Table(table_name)
+        with table.batch_writer() as batch:
+            for i in data_list:
+                data = json.loads(i.decode('utf-8'))
+                if data['userID'] == '' :
+                    try:
+                        user_id = Device_User.objects.filter(username= data['userName']).values('userID')
+                        data['userID'] = user_id[0]['userID']
+                    except Exception:
+                        data['userID'] =''
+                batch.put_item(
+                    Item={
+                        'userID':data['userID'],
+                        'addTime':data['addTime'],
+                        'ip':data['ip'],
+                        'status':data['status'],
+                        'operation':data['operation'],
+                        'content':data['content'],
+                        'Expiration_time':data['Expiration_time'],
+                    }
+                )
+        print ('添加access_log表数据成功!')
+
+# 搜索该数据库表的方法
+    def get_item(self, table_name, values):
+        dynamodb = self.session.resource('dynamodb')
+        if not dynamodb:
+            raise DynamodbConnectionError("Failed to get resource for dynamodb!")
+        try:
+            table = dynamodb.Table(table_name)
+            response = table.scan(
+                FilterExpression=Attr('userID').eq(values)
+                                 | Key('ip').eq(values)
+                                 | Key('status').eq(values)
+                                 | Key('content').eq(values)
+                                 | Key('operation').eq(values)
+            )
+        except Exception as e:
+            logger.error("Failed to get table {0}, error".format(table_name, e))
+        items = response['Items']
+        return  items
+
+    # 时间段搜索
+    def get_item_time(self, table_name, start_date, end_date):
+        dynamodb = self.session.resource('dynamodb')
+        if not dynamodb:
+            raise DynamodbConnectionError("Failed to get resource for dynamodb!")
+        try:
+            table = dynamodb.Table(table_name)
+            response = table.scan(
+                FilterExpression=(Attr('addTime').gt(start_date)
+                                 & Key('addTime').lt(end_date)) | Key('addTime').eq(start_date)
+            )
+        except Exception as e:
+            logger.error("Failed to get table {0}, error".format(table_name, e))
+        items = response['Items']
+        return  len(items)
+
+    # 时间段搜索
+    def get_item_date(self, table_name, date):
+        dynamodb = self.session.resource('dynamodb')
+        if not dynamodb:
+            raise DynamodbConnectionError("Failed to get resource for dynamodb!")
+        res = {}
+        try:
+            table = dynamodb.Table(table_name)
+            times = datetime.datetime.fromtimestamp(date)
+            time_dict = CommonService.getTimeDict(times)
+            for k, v in time_dict.items():
+                start_date = time_dict[k]
+                end_date = time_dict[k] + datetime.timedelta(hours=1)
+                start_date = int(start_date.timestamp())
+                end_date = int(end_date.timestamp())
+                response = table.scan(
+                    FilterExpression=(Attr('addTime').gt(start_date)
+                                      & Key('addTime').lt(end_date)) | Key('addTime').eq(start_date)
+                )
+                count = len(response['Items'])
+                if count:
+                    res[k] = count
+                else:
+                    res[k] = 0
+        except Exception as e:
+            logger.error("Failed to get table {0}, error".format(table_name, e))
+        return res
+
+    def item_get_brand(self, table_name):
+        dynamodb = self.session.resource('dynamodb')
+        table = dynamodb.Table(table_name)
+        try:
+            response = table.scan()
+            response = response['Items']
+            return response
+        except Exception:
+            logger.error("Failed to put item in to {0}:error{1}".format(table))
+
+
+
+# 查询aws数据库里面有什么表
+    def tables_list(self, table_name):
+        client = self.session.client('dynamodb')
+        response = client.list_tables()
+        return response['TableNames']
+
+    # 创建access_log表
+    def access_log_table_create(self, table_name):
+        dynamodb = self.session.resource('dynamodb')
+        inventory = my.tables_list(table_name)
+        if table_name in inventory:
+            print ('access_log表包含')
+        else:
+            try:
+                table = dynamodb.create_table(
+                    TableName=table_name,
+                    KeySchema=[
+                        {
+                            'AttributeName': 'userID',
+                            'KeyType': 'HASH'
+                        },
+                        {
+                            'AttributeName': 'addTime',
+                            'KeyType': 'RANGE'
+                        }
+                    ],
+                    AttributeDefinitions=[
+                        {
+                            'AttributeName': 'userID',
+                            'AttributeType': 'S'
+                        },
+                        {
+                            'AttributeName': 'addTime',
+                            'AttributeType': 'N'
+                        },
+
+                    ],
+                    ProvisionedThroughput={
+                        'ReadCapacityUnits': 5,
+                        'WriteCapacityUnits': 5,
+                    }
+                )
+                print ('创建表成功')
+            except Exception:
+                logger.error (table_name + '表已经存在')
+
+    # 删除表
+    def table_delete(self, table_name):
+        dynamodb = self.session.resource('dynamodb')
+        table = dynamodb.Table(table_name)
+        table.delete()
+        print ('删除表成功')
+
+my = MyserviceDynamodb()
+
+# print(my.table_delete('user_brand_all'))
+if DOMAIN_HOST == 'www.dvema.com':
+    user_brand = 'access_log'
+else:
+    user_brand = 'test_access_log'
+
+# my.table_delete(user_brand)
+my.access_log_table_create(user_brand)
+
 
 
 # 杂项类,共用行不高,但有些地方需求
@@ -102,6 +292,26 @@ class MiscellService():
                     country = json_data['data']['country']
         return country
 
+    @staticmethod
+    def get_item_log(table_name, values):
+        item_log = my.get_item(table_name, values)
+        return item_log
+
+    @staticmethod
+    def item_get_brand(table_name):
+        item_log = my.item_get_brand(table_name)
+        return item_log
+
+    @staticmethod
+    def get_item_time(table_name,start_date, end_date):
+        item_log = my.get_item_time(table_name,start_date, end_date)
+        return item_log
+
+    @staticmethod
+    def get_item_date(table_name,date):
+        item_log = my.get_item_date(table_name,date)
+        return item_log
+
     # 下载接口添加访问日志
     @staticmethod
     def add_ota_download_log(request):
@@ -124,6 +334,11 @@ class MiscellService():
         asy = threading.Thread(target=batch_add_log_ctr, args=(request, status_code))
         asy.start()
 
+    @staticmethod
+    def DynamoDB_add_access_log(request, status_code):
+        asy = threading.Thread(target=dynamo_db_add_log_ctr, args=(request, status_code))
+        asy.start()
+
 
 def addLog(request, status_code):
     try:
@@ -189,7 +404,7 @@ def batch_add_log_ctr(request, status_code):
         }
         redisObj = RedisObject()
         loggerData = json.dumps(add_data)
-        print(loggerData)
+        # print(loggerData)
         if SERVER_TYPE == 'Ansjer.formal_settings':
             logKey = 'logger'
         else:
@@ -200,3 +415,94 @@ def batch_add_log_ctr(request, status_code):
             data_list = redisObj.lrange(logKey, 0, -1)
             redisObj.del_data(key=logKey)
             ModelService.add_batch_log(data_list)
+
+
+def dynamo_db_add_log_ctr(request, status_code):
+    request.encoding = 'utf-8'
+    if request.method == 'GET':
+        request_dict = request.GET
+    elif request.method == 'POST':
+        request_dict = request.POST
+    else:
+        return
+    api_list = TemplateService.log_api()
+    request_path = request.path.strip().strip('/')
+    if SERVER_TYPE == 'Ansjer.formal_settings':
+        logKey = 'loggers'
+    else:
+        logKey = 'test_loggers'
+    # 判断redis列表长度为10条以上就会添加日志
+    num = 1
+    clientIP = CommonService.get_ip_address(request)
+    token = request_dict.get('token', None)
+    userID = ''
+    if token is not None:
+        tko = TokenObject(token)
+        userID = tko.userID
+    password = request_dict.get('userPwd', None)
+    userName = request_dict.get('userName', None)
+    if password is not None:
+        request_dict = dict(request_dict)
+        request_dict.pop('userPwd')
+    content = json.dumps(request_dict)
+    addTime = int(time.time())
+    if DOMAIN_HOST == 'www.dvema.com':
+        user_brand = 'access_log'
+    else:
+        user_brand = 'test_access_log'
+    # 过滤某些接口集合
+    if request_path in api_list:
+        add_data = {
+            'userName':userName,
+            'userID': userID,
+            'ip': clientIP,
+            'status': status_code,
+            'operation': request_path,
+            'addTime': addTime,
+            'content': content,
+            'Expiration_time': addTime + 2592000
+        }
+        redisObj = RedisObject()
+        loggerData = json.dumps(add_data)
+        redisObj.rpush(name=logKey, val=loggerData)
+        # 判断redis列表长度
+        if redisObj.llen(name=logKey) > num or SERVER_TYPE == 'Ansjer.test_settings':
+            data_list = redisObj.lrange(logKey, 0, -1)
+            redisObj.del_data(key=logKey)
+            my.access_log_item_put(user_brand,data_list)
+
+    # 判断是是否是下载接口的
+    else:
+        path = request.path
+        index = path.find('/OTA/downloads')
+        if index != -1:
+            addsCount = len(api_settings.ADDR_URL)
+            if addsCount > 1000:
+                response = ResponseObject()
+                # 提示获取链接失败,实际上是下载太频繁,超过了1000的并发量
+                return response.json(901)
+            else:
+                # 合理的情况就添加访问日志
+                adds = request.META.get('REMOTE_ADDR', None)
+                api_settings.ADDR_URL.append(adds)
+                add_data = {
+                    'userName': '无',
+                    'userID':'无',
+                    'ip': clientIP,
+                    'status': status_code,
+                    'operation': request_path,
+                    'addTime': addTime,
+                    'content': content,
+                    'Expiration_time': addTime + 2592000
+                }
+                redisObj = RedisObject()
+                loggerData = json.dumps(add_data)
+                redisObj.rpush(name=logKey, val=loggerData)
+                # 判断redis列表长度
+                if redisObj.llen(name=logKey) > num or SERVER_TYPE == 'Ansjer.test_settings':
+                    data_list = redisObj.lrange(logKey, 0, -1)
+                    redisObj.del_data(key=logKey)
+                    my.access_log_item_put(user_brand, data_list)
+
+
+

+ 1 - 0
Service/ModelService.py

@@ -113,6 +113,7 @@ class ModelService:
         else:
             return True
 
+
     # 通过用户名获取userIDLIST
     @staticmethod
     def get_user_list_by_username(username):

+ 4 - 0
Service/middleware.py

@@ -77,11 +77,15 @@ class StatisticsUrlMiddleware(MiddlewareMixin):
         '''
         self._https_statistics_to_close(request)
         ########记录访问日志
+        MiscellService.DynamoDB_add_access_log(request=request, status_code=response.status_code)
+
         if request.path !='/favicon.ico':
             print('process_response', request, response)
             try:
                 # mysql
                 MiscellService.batch_add_access_log(request=request, status_code=response.status_code)
+
+                # MiscellService.DynamoDB_add_access_log(request=request, status_code=response.status_code)
                 # MiscellService.add_access_log(request=request, status_code=response.status_code)
                 # mongodb版
                 # MiscellService.access_log(request=request, response=response, type=0)