|
- import hashlib
- import logging
- import shutil
- import time
- import traceback
- import os
- from urllib import request, parse
- import requests
- from django.http import HttpResponse
- import jwt
- from django.views.generic.base import View
- from Model.models import PctestuserModel, PctestjobModel, PctestdeviceModel,\
- PctestfunctionModel, PctestjobdeviceModel, PctestModel, PctestlogModel
- from Object.ResponseObject import ResponseObject
- from Object.TokenObject import TokenObject
- from Service.CommonService import CommonService
- from Ansjer.config import OAUTH_ACCESS_TOKEN_SECRET
- class PcTest(View):
- def dispatch(self, requset, *args, **kwargs):
- return super(PcTest, self).dispatch(requset, *args, **kwargs)
- def get(self, request, *args, **kwargs):
- operation = kwargs.get('operation')
- request.encoding = 'utf-8'
- return self.validation(request.GET, request, operation)
- def post(self, request, *args, **kwargs):
- operation = kwargs.get('operation')
- request.encoding = 'utf-8'
- return self.validation(request.POST, request, operation)
- def validation(self, request_dict, request, operation):
- response = ResponseObject()
- if not operation:
- return response.json(444, 'operation')
- else:
- if operation == 'login':
- return self.login(request_dict, response)
- else:
- print(operation)
- token = request_dict.get('token', None)
- print('token:', token)
- # 解析token,验证是否有id
- tko = TokenObject1(token)
- response.lang = tko.lang
- if tko.code != 0:
- return response.json(tko.code)
- # 获取访问者的id和岗位
- userID = tko.id
- jobID = tko.job
- # if jobID == 1:
- # # 管理员可访问的接口
- if operation == 'job/add':
- return self.jobadd(request_dict, userID, response)
- elif operation == 'job/query':
- return self.jobquery(request_dict, userID, response)
- elif operation == 'job/delete':
- return self.jobdelete(request_dict, userID, response)
- elif operation == 'device/add':
- return self.deviceadd(request_dict, userID, response)
- elif operation == 'device/query':
- return self.devicequery(request_dict, userID, response)
- elif operation == 'device/delete':
- return self.devicedelete(request_dict, userID, response)
- elif operation == 'function/add':
- return self.functionadd(request_dict, userID, response)
- elif operation == 'function/query':
- return self.functionquery(request_dict, userID, response)
- elif operation == 'function/delete':
- return self.functiondelete(request_dict, userID, response)
- elif operation == 'job/device/add':
- return self.jobdeviceadd(request_dict, userID, response)
- elif operation == 'device/function/add':
- return self.devicefunctionadd(request_dict, userID, response)
- elif operation == 'staff/add':
- return self.staffadd(request_dict, userID, response)
- elif operation == 'staff/query':
- return self.staffquery(request_dict, userID, response)
- elif operation == 'staff/delete':
- return self.staffdelete(request_dict, userID, response)
- # 公共访问的接口
- if operation == 'job/device/query':
- return self.jobdevicequery(request_dict, jobID, response)
- elif operation == 'device/function/query':
- return self.devicefunctionquery(request_dict, jobID, response)
- elif operation == 'log/add':
- return self.logadd(request_dict, userID, response)
- elif operation == 'log/query':
- return self.logquery(request_dict, userID, response)
- elif operation == 'token/fullInfo':
- return self.fullInfo(request_dict, userID, response)
- elif operation == 'staff/initPass':
- return self.initPass(request_dict, userID, response)
- elif operation == 'staff/updatePass':
- return self.updatePass(request_dict, userID, response)
- else:
- return response.json(404)
- def login(self, request_dict, response):
- username = request_dict.get('username', None)
- password = request_dict.get('password', None)
- param_flag = CommonService.get_param_flag(
- data=[username, password])
- if param_flag is not True:
- return response.json(444)
- user_qs = PctestuserModel.objects.filter(username=username, password=password)
- if not user_qs.exists():
- if not PctestuserModel.objects.filter(username=username).exists():
- return response.json(104)
- return response.json(111)
- users = user_qs.values('id', 'username', 'password', 'job__id', 'job__jobcode')[0]
- tko = TokenObject()
- # 加密
- res = tko.generate(
- data={'id': users['id'], 'username': users['username'], 'password': users['password'], 'jobid': users['job__id'], 'job': users['job__jobcode']})
- res_qs = {
- 'res': res,
- 'id': users['id'],
- 'username': users['username'],
- 'job': users['job__jobcode'],
- 'jobid': users['job__id']
- }
- return response.json(0, res_qs)
- def jobadd(self, request_dict, userID, response):
- jobname = request_dict.get('jobname', None)
- jobcode = request_dict.get('jobcode', None)
- param_flag = CommonService.get_param_flag(
- data=[jobname])
- if param_flag is not True:
- return response.json(444)
- job_qs = PctestjobModel.objects.filter(jobname=jobname)
- if job_qs.exists():
- return response.json(174)
- else:
- PctestjobModel.objects.create(jobname=jobname,jobcode=jobcode)
- return response.json(0)
- def jobquery(self, request_dict, userID, response):
- # user_qs = PctestuserModel.objects.filter(id=userID).values('job')
- # job_qs = PctestjobModel.objects.filter(id=user_qs[0]['job'])
- #
- # if job_qs[0].id == 1:
- # job_qs = PctestjobModel.objects.filter()
- job_qs = PctestjobModel.objects.filter()
- device_qs = PctestjobdeviceModel.objects.filter(job__in=job_qs).values('device')
- data = CommonService.qs_to_list(job_qs.values('id', 'jobname', 'jobcode'))
- i = 0
- for jobs in job_qs:
- data[i]['devices'] = CommonService.qs_to_list(device_qs.filter(job__id=jobs.id).values('device__id', 'device__devicename'))
- i = i + 1
- count = device_qs.count()
- return response.json(0, {'datas': data, 'count': count})
- def jobdelete(self, request_dict, userID, response):
- id = request_dict.get('id', None)
- job_qs = PctestjobModel.objects.filter(id=id)
- if job_qs.exists:
- job_qs.delete()
- PctestjobdeviceModel.objects.filter(job__in=job_qs).delete()
- return response.json(0)
- def deviceadd(self, request_dict, userID, response):
- deviceid = request_dict.get('deviceid', None)
- devicename = request_dict.get('devicename', None)
- functions= request_dict.get('functions', None)
- param_flag = CommonService.get_param_flag(
- data=[devicename])
- if param_flag is not True:
- return response.json(444)
- if not deviceid:
- device_qs = PctestdeviceModel.objects.filter(devicename=devicename)
- if device_qs.exists():
- return response.json(174)
- PctestdeviceModel.objects.create(devicename=devicename)
- else:
- device_qs = PctestdeviceModel.objects.filter(id=deviceid)
- PctestdeviceModel.objects.filter(id=deviceid).update(devicename=devicename)
- function_list = PctestfunctionModel.objects.filter(id__in=functions.split(','))
- # 判断设备与此职能是否有关联,避免重复添加
- PctestModel.objects.filter(device_id=device_qs[0].id).delete()
- for fid in functions.split(','):
- for fun in function_list:
- if int(fid) ==fun.id:
- PctestModel.objects.create(device_id=device_qs[0].id, function_id=fun.id)
- return response.json(0)
- def devicequery(self, request_dict, userID, response):
- # user_qs = PctestuserModel.objects.filter(id=userID).values('job')
- # job_qs = PctestjobModel.objects.filter(id=user_qs[0]['job'])
- #
- # if job_qs[0].id == 1:
- # device_qs = PctestjobdeviceModel.objects.filter().values('device')
- # else:
- # device_qs = PctestjobdeviceModel.objects.filter(job__in=job_qs).values('device')
- device_qs = PctestdeviceModel.objects.filter()
- function_qs = PctestModel.objects.filter(device__in=device_qs)
- device_qs = device_qs.values('id', 'devicename')
- data = CommonService.qs_to_list(device_qs)
- i = 0
- for devices in device_qs:
- data[i]['functions'] = CommonService.qs_to_list(function_qs.filter(device__id=devices['id']).values('function__id','function__functionname','function__functioncode'))
- i = i + 1
- count = device_qs.count()
- return response.json(0, {'datas': data, 'count': count})
- def devicedelete(self, request_dict, userID, response):
- id = request_dict.get('id', None)
- device_qs = PctestdeviceModel.objects.filter(id=id)
- if device_qs.exists:
- PctestjobdeviceModel.objects.filter(device__in=device_qs).delete()
- PctestModel.objects.filter(device__in=device_qs).delete()
- device_qs.delete()
- return response.json(0)
- def functionadd(self, request_dict, userID, response):
- functionname = request_dict.get('functionname', None)
- functioncode = request_dict.get('functioncode', None)
- id = request_dict.get('id', None)
- param_flag = CommonService.get_param_flag(
- data=[functionname])
- if param_flag is not True:
- return response.json(444)
- if not id:
- job_qs = PctestfunctionModel.objects.filter(functionname=functionname)
- if job_qs.exists():
- return response.json(174)
- PctestfunctionModel.objects.create(functionname=functionname,functioncode=functioncode)
- else:
- PctestfunctionModel.objects.filter(id=id).update(functionname=functionname, functioncode=functioncode)
- return response.json(0)
- def functiondelete(self, request_dict, userID, response):
- id = request_dict.get('id', None)
- function_qs = PctestfunctionModel.objects.filter(id=id)
- if function_qs.exists:
- PctestModel.objects.filter(function__in=function_qs).delete()
- function_qs.delete()
- return response.json(0)
- def functionquery(self, request_dict, userID, response):
- function_list = PctestfunctionModel.objects.all()
- count = function_list.count()
- res = function_list
- send_json = CommonService.qs_to_dict(res)
- send_json['count'] = count
- return response.json(0, send_json)
- def jobdeviceadd(self, request_dict, userID, response):
- job_id = request_dict.get('job_id', None)
- device_id = request_dict.get('device_id', None)
- param_flag = CommonService.get_param_flag(
- data=[job_id, device_id])
- if param_flag is not True:
- return response.json(444)
- # 判断岗位与设备是否存在
- job_list = PctestjobModel.objects.filter(id=job_id)
- device_list = PctestdeviceModel.objects.filter(id=device_id)
- if not job_list.exists() or not device_list.exists():
- return response.json(173)
- # 判断岗位与此设备是否有关联,避免重复添加
- job_device_list = PctestjobdeviceModel.objects.filter(job_id=job_id, device_id=device_id)
- if job_device_list.exists():
- return response.json(174)
- else:
- PctestjobdeviceModel.objects.create(job_id=job_id, device_id=device_id)
- return response.json(0)
- def jobdevicequery(self, request_dict, jobID, response):
- if jobID == 1:
- job_device_list = PctestjobdeviceModel.objects.values('job__jobname', 'device__devicename')
- else:
- job_device_list = PctestjobdeviceModel.objects.filter(job_id=jobID).values('device_id','device__devicename')
- device_list = []
- for i in job_device_list:
- device_list.append(i)
- return response.json(0, device_list)
- def devicefunctionadd(self, request_dict, userID, response):
- device_id = request_dict.get('device_id', None)
- function_id = request_dict.get('function_id', None)
- param_flag = CommonService.get_param_flag(
- data=[function_id, device_id])
- if param_flag is not True:
- return response.json(444)
- # 判断设备与职能是否存在
- device_list = PctestdeviceModel.objects.filter(id=device_id)
- function_list = PctestfunctionModel.objects.filter(id=function_id)
- if not function_list.exists() or not device_list.exists():
- return response.json(173)
- # 判断设备与此职能是否有关联,避免重复添加
- device_function_list = PctestModel.objects.filter(device_id=device_id, function_id=function_id)
- if device_function_list.exists():
- return response.json(174)
- else:
- PctestModel.objects.create(device_id=device_id, function_id=function_id)
- return response.json(0)
- def devicefunctionquery(self, request_dict, jobID, response):
- if jobID == 1:
- device_function_list = PctestModel.objects.values('device__devicename', 'function__functionname')
- else:
- device_id = request_dict.get('device_id', None)
- param_flag = CommonService.get_param_flag(
- data=[device_id])
- if param_flag is not True:
- return response.json(444)
- # 判断岗位与此设备是否有关联,有关联则通过
- job_device_list = PctestjobdeviceModel.objects.filter(job_id=jobID, device_id=device_id)
- if not job_device_list.exists():
- return response.json(173)
- device_function_list = PctestModel.objects.filter(device_id=device_id).values('function_id', 'function__functionname')
- function_list = []
- for i in device_function_list:
- function_list.append(i)
- return response.json(0, function_list)
- def staffadd(self, request_dict, userID, response):
- username = request_dict.get('username', None)
- password = request_dict.get('password', None)
- id = request_dict.get('id', None)
- job_id = request_dict.get('job_id', None)
- param_flag = CommonService.get_param_flag(
- data=[username, job_id])
- if param_flag is not True:
- return response.json(444)
- # 判断员工与岗位是否存在,员工存在返回174
- user_list = PctestuserModel.objects.filter(username=username)
- job_list = PctestjobModel.objects.filter(id=job_id)
- if not job_list.exists():
- return response.json(174)
- if not id:
- if user_list.exists():
- return response.json(174)
- PctestuserModel.objects.create(username=username, password=password, job_id=job_id)
- else:
- PctestuserModel.objects.filter(id=id).update(username=username, job_id=job_id)
- return response.json(0)
- def initPass(self, request_dict, userID, response):
- id = request_dict.get('id', None)
- param_flag = CommonService.get_param_flag(
- data=[id])
- if param_flag is not True:
- return response.json(444)
- # 判断员工与岗位是否存在,员工存在返回174
- user_list = PctestuserModel.objects.filter(id=id)
- if user_list.exists():
- user_list.update(password='555')
- return response.json(0)
- def updatePass(self, request_dict, userID, response):
- id = request_dict.get('id', None)
- oldpass = request_dict.get('oldpass', None)
- newpass1 = request_dict.get('newpass1', None)
- newpass2 = request_dict.get('newpass2', None)
- param_flag = CommonService.get_param_flag(
- data=[id,oldpass,newpass1,newpass2])
- if param_flag is not True:
- return response.json(444)
- # 判断员工与岗位是否存在,员工存在返回174
- user_list = PctestuserModel.objects.filter(id=id, password=oldpass)
- if not user_list.exists():
- if not PctestuserModel.objects.filter(id=id).exists():
- return response.json(104)
- return response.json(111)
- if newpass1 != newpass2:
- return response.json(10,'两次密码不相同,请重新输入')
- user_list.update(password=newpass1)
- return response.json(0)
- # def updatePass(self, request_dict, userID, response):
- #
- # id = request_dict.get('id', None)
- # password = request_dict.get('password', None)
- # param_flag = CommonService.get_param_flag(
- # data=[id,password])
- # if param_flag is not True:
- # return response.json(444)
- #
- # # 判断员工与岗位是否存在,员工存在返回174
- # user_list = PctestuserModel.objects.filter(id=id)
- # if user_list.exists():
- # user_list.update(password=password)
- #
- # return response.json(0)
- def staffquery(self, request_dict, userID, response):
- user_list = PctestuserModel.objects.filter(id__gt=1).values('id', 'username', 'job__id', 'job__jobname', 'job__jobcode')
- users_list = []
- for i in user_list:
- users_list.append(i)
- return response.json(0, users_list)
- def staffdelete(self, request_dict, userID, response):
- id = request_dict.get('id', None)
- param_flag = CommonService.get_param_flag(data=[id])
- if param_flag is not True:
- return response.json(444)
- staff = PctestuserModel.objects.filter(id=id)
- if not staff.exists():
- return response.json(173)
- else:
- staff.delete()
- return response.json(0)
- def logadd(self, request_dict, userID, response):
- content = request_dict.get('content', None)
- param_flag = CommonService.get_param_flag(data=[content])
- if param_flag is not True:
- return response.json(444)
- addtime = time.time()
- PctestlogModel.objects.create(user_id=userID, content=content, addtime=addtime)
- return response.json(0)
- def logquery(self, request_dict, userID, response):
- startDateTime = request_dict.get('startDateTime', None)
- endDateTime = request_dict.get('endDateTime', None)
- userid = request_dict.get('userid', None)
- if userid :
- job_device_list = PctestlogModel.objects.filter(user_id=userID,addtime__range=(startDateTime, endDateTime)).values('id', 'user__id', 'user__username', 'content', 'addtime')
- else:
- job_device_list = PctestlogModel.objects.filter(addtime__range=(startDateTime, endDateTime)).values('id', 'user__id', 'user__username', 'content', 'addtime')
- device_list = []
- for i in job_device_list:
- device_list.append(i)
- return response.json(0, device_list)
- def fullInfo(self, request_dict, userID, response):
- user_qs = PctestuserModel.objects.filter(id=userID).values('job')
- fullinfo = []
- data = {}
- if user_qs.exists():
- job_qs = PctestjobModel.objects.filter(id=user_qs[0]['job'])
- user_qs = user_qs.values('id', 'username')
- # device_qs = PctestjobdeviceModel.objects.filter(job__in=job_qs).values('device')
- #
- # function_qs = PctestModel.objects.filter(device__in=device_qs)
- data = user_qs[0]
- data['jobs'] = CommonService.qs_to_list(job_qs.values('id', 'jobname', 'jobcode'))
- # i = 0
- # for jobs in data['jobs']:
- #
- # data['jobs'][i]['devices'] = CommonService.qs_to_list(device_qs.filter(job__id=jobs['id']).values('device__id', 'device__devicename'))
- #
- # j = 0
- # for devices in jobs['devices']:
- # data['jobs'][i]['devices'][j]['functions'] = CommonService.qs_to_list(function_qs.filter(device__id=devices['device__id']).values('function__id','function__functionname','function__functioncode'))
- # j = j+1
- #
- # i = i+1
- fullinfo.append(data)
- return response.json(0, fullinfo)
- class TokenObject1:
- def __init__(self, token=None):
- if token == 'local':
- token = 'eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJ1c2VySUQiOiIxNTg0MzUxODk2MjgyMTM4MDAxMzgwMDAiLCJsYW5nIjoiZW4iLCJ1c2VyIjoiMTM2ODAzMTc1OTYiLCJtX2NvZGUiOiIxMjM0MTMyNDMyMTQiLCJleHAiOjE1ODcyNzcwNjB9.c0LV_XyxwbzUlYqMJqx7vw9f19Jv-0kGnUHuu_go-mo'
- if token == 'test':
- token = 'eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJ1c2VyIjoiMTM4MDAxMzgwMDEiLCJleHAiOjE1Njk5OTg4OTYsInVzZXJJRCI6IjE1MTU2NDI2MjMzNzkzOTUxMzgwMDEzODAwMSIsImxhbmciOiJlbiIsIm1fY29kZSI6IjEyMzQxMzI0MzIxNCJ9.VAQtT9AbCCfXcrNj9DL5cvVasMDoI7AP8ptgU1GoMu8'
- self.token = token
- self.lang = None
- self.id = None
- self.job = None
- self.user = ''
- self.code = 0
- # 令牌校验
- self.valid()
- def valid(self):
- if self.token is None:
- self.code = 309
- return
- try:
- res = jwt.decode(self.token, OAUTH_ACCESS_TOKEN_SECRET, algorithms='HS256')
- print('res:', res)
- # print(res)
- self.id = res.get('id', None)
- self.lang = res.get('lang', None)
- self.job = res.get('job', None)
- self.user = res.get('user', '')
- # 刷新登录时间
- # if self.userID:
- # print(self.user)
- # redisObj = RedisObject(db=3)
- # redisObj.set_data(key=self.userID, val=self.user, expire=300)
- except jwt.ExpiredSignatureError as e:
- print('过期')
- print(repr(e))
- self.code = 309
- return
- except Exception as e:
- self.code = 309
- return
- else:
- if not self.id:
- self.code = 309
- return
- else:
- if self.id:
- self.code = 0
- return res
- else:
- self.code = 309
- return
|