from django.views.generic.base import View from django.utils.decorators import method_decorator from django.views.decorators.csrf import csrf_exempt from Service.TokenManager import JSONTokenManager from Service.ModelService import ModelService from Service.TemplateService import TemplateService from Model.models import Access_Log import datetime from Service.ResponseService import * from django.core import serializers import chardet ''' http://192.168.136.40:8077/accesslog?operation=queryByAdmin&token=test&page=1&line=5&order=-id http://192.168.136.40:8077/accesslog?operation=truncateByAdmin&token=test http://192.168.136.40:8077/accesslog?operation=searchByAdmin&token=test&page=1&line=10&content={"status":20}&order=-id http://127.0.0.1:8000/accesslog?operation=loginUserNum&token=stest http://127.0.0.1:8000/accesslog/loginUserNum?token=stest http://127.0.0.1:8000/accesslog?operation=AllLoginArea&token=stest http://192.168.136.40:8077/access/staticPath?token=stest ''' class AccessLog(View): @method_decorator(csrf_exempt) def dispatch(self, *args, **kwargs): return super(AccessLog, self).dispatch(*args, **kwargs) def get(self, request, *args, **kwargs): request.encoding = 'utf-8' return self.validation(request_dict=request.GET) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' return self.validation(request_dict=request.POST) def validation(self, request_dict, *args, **kwargs): token = request_dict.get('token', None) if token is not None: tokenManager = JSONTokenManager() error_code = tokenManager.verify_AToken(token) if error_code == 0: userID = tokenManager.accessDict.get('userID', None) operation = request_dict.get('operation', None) param_flag = CommonService.get_param_flag(data=[userID, operation]) if param_flag is True: if operation == 'queryByAdmin': return self.query_by_admin(request_dict=request_dict, userID=userID) elif operation == 'searchByAdmin': return self.search_by_admin(request_dict=request_dict, userID=userID) elif operation == 'truncateByAdmin': return self.truncate_by_admin(userID=userID) elif operation == 'loginUserNum': return self.login_user_num(userID=userID) elif operation == 'AllLoginArea': return self.getAllLoginArea(userID=userID) return ResponseJSON(444) else: return HttpResponse(tokenManager.errorCodeInfo(error_code)) else: return ResponseJSON(311) def query_by_admin(self, request_dict, userID): page = int(request_dict.get('page', None)) line = int(request_dict.get('line', None)) order = request_dict.get('order', '-id') if order == '': order = '-id' param_flag = CommonService.get_param_flag(data=[page, line]) if param_flag is True: check_perm = ModelService.check_permission(userID=userID, permID=30) if check_perm is True: access_log_queryset = Access_Log.objects.all().order_by(order) if access_log_queryset.exists(): count = access_log_queryset.count() res = access_log_queryset[(page - 1) * line:page * line] send_json = CommonService.query_set_to_dict(res) send_json['count'] = count return ResponseJSON(0,send_json) return ResponseJSON(0, {'datas': [], 'count': 0}) else: return ResponseJSON(404) else: return ResponseJSON(444) def truncate_by_admin(self, userID): check_perm = ModelService.check_permission(userID=userID, permID=10) if check_perm is True: from django.db import connection cursor = connection.cursor() cursor.execute("TRUNCATE TABLE `access_log`") return ResponseJSON(0) def search_by_admin(self, request_dict, userID): page = int(request_dict.get('page', None)) line = int(request_dict.get('line', None)) order = request_dict.get('order', '-id') content = request_dict.get('content', None) starttime = request_dict.get('starttime', None) endtime = request_dict.get('endtime', None) param_flag = CommonService.get_param_flag(data=[page, line]) if param_flag is True: check_perm = ModelService.check_permission(userID=userID, permID=20) if check_perm is True: try: content = json.loads(content) search_kwargs = CommonService.get_kwargs(data=content) queryset = Access_Log.objects.filter(**search_kwargs).order_by(order) except Exception as e: return ResponseJSON(444) if starttime is not None and starttime != '' and endtime is not None and endtime != '': startt = datetime.datetime.fromtimestamp(int(starttime)) starttime = startt.strftime("%Y-%m-%d %H:%M:%S.%f") endt = datetime.datetime.fromtimestamp(int(endtime)) endtime = endt.strftime("%Y-%m-%d %H:%M:%S.%f") # var_dump(starttime,endtime) queryset = queryset.filter(time__range=(starttime, endtime)) elif starttime is not None and starttime != '': startt = datetime.datetime.fromtimestamp(int(starttime)) starttime = startt.strftime("%Y-%m-%d %H:%M:%S.%f") queryset = queryset.filter(time__gte=starttime) elif endtime is not None and endtime != '': endt = datetime.datetime.fromtimestamp(int(endtime)) endtime = endt.strftime("%Y-%m-%d %H:%M:%S.%f") queryset = queryset.filter(time__lte=endtime) if queryset.exists(): count = queryset.count() res = queryset[(page - 1) * line:page * line] send_json = CommonService.query_set_to_dict(res) send_json['count'] = count return ResponseJSON(0,send_json) return ResponseJSON(0,{'datas': [], 'count': 0}) else: return ResponseJSON(404) else: return ResponseJSON(444) # 获取登录用户的ip的登录情况的接口 def login_user_num(self, userID): check_perm = ModelService.check_permission(userID=userID, permID=20) if check_perm is True: user_list = Access_Log.objects.values("user","ip").distinct().filter(url__contains='account/login').order_by('time') return ResponseJSON(10,list(user_list[0])) # print("是否QuerySet") # print(type(user_list).__name__=='QuerySet') user_list= list(user_list) newuser_list="[" count=0 for q in user_list: # print(type(q)) # 获取ip地址中文名称 ip_addrname=CommonService.getAddr(q['ip']) # print(ip_addrname) # 拼凑为字符串类型 ip_addrname='{"ip_addrname":"'+ip_addrname+'"}' # 把字符串转为字典json ip_addrname=json.loads(ip_addrname) # 字典类型合并 ip_addrnameall=dict(q, **ip_addrname) # print(ip_addrnameall) str_dict = json.dumps(ip_addrnameall) newuser_list=newuser_list+str_dict+"," count+=1 newuser_list= newuser_list[:-1]+"]" # 把字符串转为字典json newuser_list = json.loads(newuser_list) # print(newuser_list) return ResponseJSON(0,{'datas':list(newuser_list), 'count': count}) else: return ResponseJSON(404) # 获取所有用户登录的地区 def getAllLoginArea(self, userID): own_permission = ModelService.check_permission(userID=userID, permID=30) if own_permission is True: users = Access_Log.objects.filter(operation='account/login').distinct().values_list('user', flat=True) ip_dict = {} for user in users: qs = Access_Log.objects.values_list('ip', flat=True).filter(operation='account/login',user=user).order_by('time') ip_dict[user] = qs[0] return ResponseJSON(0,ip_dict) else: return ResponseJSON(404) @csrf_exempt def statisticsPath(request): request.encoding = 'utf-8' if request.method == 'GET': request_dict = request.GET if request.method == 'POST': request_dict = request.POST token = request_dict.get('token', None) if token is not None: tokenManager = JSONTokenManager() error_code = tokenManager.verify_AToken(token) if error_code == 0: userID = tokenManager.accessDict.get('userID', None) own_perm = ModelService.check_permission(userID,10) paths = TemplateService.log_api() datas = {} for path in paths: count = Access_Log.objects.filter(operation=path).count() datas[path] = count return ResponseJSON(0, {'datas': datas}) else: return HttpResponse(tokenManager.errorCodeInfo(error_code)) else: return ResponseJSON(311)