123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360 |
- import hashlib
- import logging
- import shutil
- import time
- import traceback
- import os
- from urllib import request, parse
- import requests
- from django.http import HttpResponse
- import jwt
- from django.views.generic.base import View
- from Model.models import PctestuserModel, PctestjobModel, PctestdeviceModel,\
- PctestfunctionModel, PctestjobdeviceModel, PctestModel, PctestlogModel
- from Object.ResponseObject import ResponseObject
- from Object.TokenObject import TokenObject
- from Service.CommonService import CommonService
- from Ansjer.config import OAUTH_ACCESS_TOKEN_SECRET
- class PcTest(View):
- def dispatch(self, requset, *args, **kwargs):
- return super(PcTest, self).dispatch(requset, *args, **kwargs)
- def get(self, request, *args, **kwargs):
- operation = kwargs.get('operation')
- request.encoding = 'utf-8'
- return self.validation(request.GET, request, operation)
- def post(self, request, *args, **kwargs):
- operation = kwargs.get('operation')
- request.encoding = 'utf-8'
- return self.validation(request.POST, request, operation)
- def validation(self, request_dict, request, operation):
- response = ResponseObject()
- if not operation:
- return response.json(444, 'operation')
- else:
- if operation == 'login':
- return self.login(request_dict, response)
- else:
- print(operation)
- token = request_dict.get('token', None)
- print('token:', token)
- # 解析token,验证是否有id
- tko = TokenObject1(token)
- response.lang = tko.lang
- if tko.code != 0:
- return response.json(tko.code)
- # 获取访问者的id和岗位
- userID = tko.id
- jobID = tko.job
- if jobID == 1:
- # 管理员可访问的接口
- if operation == 'job/add':
- return self.jobadd(request_dict, userID, response)
- elif operation == 'job/query':
- return self.jobquery(request_dict, userID, response)
- elif operation == 'device/add':
- return self.deviceadd(request_dict, userID, response)
- elif operation == 'device/query':
- return self.devicequery(request_dict, userID, response)
- elif operation == 'function/add':
- return self.functionadd(request_dict, userID, response)
- elif operation == 'function/query':
- return self.functionquery(request_dict, userID, response)
- elif operation == 'job/device/add':
- return self.jobdeviceadd(request_dict, userID, response)
- elif operation == 'job/device/query':
- return self.jobdevicequery(request_dict, jobID, response)
- elif operation == 'device/function/add':
- return self.devicefunctionadd(request_dict, userID, response)
- elif operation == 'device/function/query':
- return self.devicefunctionquery(request_dict, jobID, response)
- elif operation == 'staff/add':
- return self.staffadd(request_dict, userID, response)
- elif operation == 'staff/query':
- return self.staffquery(request_dict, userID, response)
- elif operation == 'staff/delete':
- return self.staffdelete(request_dict, userID, response)
- elif operation == 'log/query':
- return self.logquery(request_dict, userID, response)
- else:
- return response.json(404)
- else:
- # 普通员工访问的接口
- if operation == 'job/device/query':
- return self.jobdevicequery(request_dict, jobID, response)
- elif operation == 'device/function/query':
- return self.devicefunctionquery(request_dict, jobID, response)
- elif operation == 'log/add':
- return self.logadd(request_dict, userID, response)
- elif operation == 'log/query':
- return self.logquery(request_dict, userID, response)
- else:
- return response.json(404)
- def login(self, request_dict, response):
- username = request_dict.get('username', None)
- password = request_dict.get('password', None)
- param_flag = CommonService.get_param_flag(
- data=[username, password])
- if param_flag is not True:
- return response.json(444)
- user_qs = PctestuserModel.objects.filter(username=username, password=password)
- if not user_qs.exists():
- return response.json(104)
- users = user_qs.values('id', 'username', 'password', 'job')[0]
- tko = TokenObject()
- # 加密
- res = tko.generate(
- data={'id': users['id'], 'username': users['username'], 'password': users['password'], 'job': users['job']})
- res_qs = {
- 'res': res,
- 'job': users['job']
- }
- return response.json(0, res_qs)
- def jobadd(self, request_dict, userID, response):
- jobname = request_dict.get('jobname', None)
- param_flag = CommonService.get_param_flag(
- data=[jobname])
- if param_flag is not True:
- return response.json(444)
- job_qs = PctestjobModel.objects.filter(jobname=jobname)
- if job_qs.exists():
- return response.json(174)
- else:
- PctestjobModel.objects.create(jobname=jobname)
- return response.json(0)
- def jobquery(self, request_dict, userID, response):
- job_list = PctestjobModel.objects.filter(id__gt=1)
- count = job_list.count()
- res = job_list
- send_json = CommonService.qs_to_dict(res)
- send_json['count'] = count
- return response.json(0, send_json)
- def deviceadd(self, request_dict, userID, response):
- devicename = request_dict.get('devicename', None)
- param_flag = CommonService.get_param_flag(
- data=[devicename])
- if param_flag is not True:
- return response.json(444)
- job_qs = PctestdeviceModel.objects.filter(devicename=devicename)
- if job_qs.exists():
- return response.json(174)
- else:
- PctestdeviceModel.objects.create(devicename=devicename)
- return response.json(0)
- def devicequery(self, request_dict, userID, response):
- device_list = PctestdeviceModel.objects.all()
- count = device_list.count()
- res = device_list
- send_json = CommonService.qs_to_dict(res)
- send_json['count'] = count
- return response.json(0, send_json)
- def functionadd(self, request_dict, userID, response):
- functionname = request_dict.get('functionname', None)
- param_flag = CommonService.get_param_flag(
- data=[functionname])
- if param_flag is not True:
- return response.json(444)
- job_qs = PctestfunctionModel.objects.filter(functionname=functionname)
- if job_qs.exists():
- return response.json(174)
- else:
- PctestfunctionModel.objects.create(functionname=functionname)
- return response.json(0)
- def functionquery(self, request_dict, userID, response):
- function_list = PctestfunctionModel.objects.all()
- count = function_list.count()
- res = function_list
- send_json = CommonService.qs_to_dict(res)
- send_json['count'] = count
- return response.json(0, send_json)
- def jobdeviceadd(self, request_dict, userID, response):
- job_id = request_dict.get('job_id', None)
- device_id = request_dict.get('device_id', None)
- param_flag = CommonService.get_param_flag(
- data=[job_id, device_id])
- if param_flag is not True:
- return response.json(444)
- # 判断岗位与设备是否存在
- job_list = PctestjobModel.objects.filter(id=job_id)
- device_list = PctestdeviceModel.objects.filter(id=device_id)
- if not job_list.exists() or not device_list.exists():
- return response.json(173)
- # 判断岗位与此设备是否有关联,避免重复添加
- job_device_list = PctestjobdeviceModel.objects.filter(job_id=job_id, device_id=device_id)
- if job_device_list.exists():
- return response.json(174)
- else:
- PctestjobdeviceModel.objects.create(job_id=job_id, device_id=device_id)
- return response.json(0)
- def jobdevicequery(self, request_dict, jobID, response):
- if jobID == 1:
- job_device_list = PctestjobdeviceModel.objects.values('job__jobname', 'device__devicename')
- else:
- job_device_list = PctestjobdeviceModel.objects.filter(job_id=jobID).values('device_id','device__devicename')
- device_list = []
- for i in job_device_list:
- device_list.append(i)
- return response.json(0, device_list)
- def devicefunctionadd(self, request_dict, userID, response):
- device_id = request_dict.get('device_id', None)
- function_id = request_dict.get('function_id', None)
- param_flag = CommonService.get_param_flag(
- data=[function_id, device_id])
- if param_flag is not True:
- return response.json(444)
- # 判断设备与职能是否存在
- device_list = PctestdeviceModel.objects.filter(id=device_id)
- function_list = PctestfunctionModel.objects.filter(id=function_id)
- if not function_list.exists() or not device_list.exists():
- return response.json(173)
- # 判断设备与此职能是否有关联,避免重复添加
- device_function_list = PctestModel.objects.filter(device_id=device_id, function_id=function_id)
- if device_function_list.exists():
- return response.json(174)
- else:
- PctestModel.objects.create(device_id=device_id, function_id=function_id)
- return response.json(0)
- def devicefunctionquery(self, request_dict, jobID, response):
- if jobID == 1:
- device_function_list = PctestModel.objects.values('device__devicename', 'function__functionname')
- else:
- device_id = request_dict.get('device_id', None)
- param_flag = CommonService.get_param_flag(
- data=[device_id])
- if param_flag is not True:
- return response.json(444)
- # 判断岗位与此设备是否有关联,有关联则通过
- job_device_list = PctestjobdeviceModel.objects.filter(job_id=jobID, device_id=device_id)
- if not job_device_list.exists():
- return response.json(173)
- device_function_list = PctestModel.objects.filter(device_id=device_id).values('function_id', 'function__functionname')
- function_list = []
- for i in device_function_list:
- function_list.append(i)
- return response.json(0, function_list)
- def staffadd(self, request_dict, userID, response):
- username = request_dict.get('username', None)
- password = request_dict.get('password', None)
- job_id = request_dict.get('job_id', None)
- param_flag = CommonService.get_param_flag(
- data=[username, password, job_id])
- if param_flag is not True:
- return response.json(444)
- # 判断员工与岗位是否存在,员工存在返回174
- user_list = PctestuserModel.objects.filter(username=username)
- job_list = PctestjobModel.objects.filter(id=job_id)
- if not job_list.exists() or user_list.exists():
- return response.json(174)
- else:
- PctestuserModel.objects.create(username=username, password=password, job_id=job_id)
- return response.json(0)
- def staffquery(self, request_dict, userID, response):
- user_list = PctestuserModel.objects.filter(id__gt=1).values('id', 'username', 'job__jobname')
- users_list = []
- for i in user_list:
- users_list.append(i)
- return response.json(0, users_list)
- def staffdelete(self, request_dict, userID, response):
- id = request_dict.get('id', None)
- param_flag = CommonService.get_param_flag(data=[id])
- if param_flag is not True:
- return response.json(444)
- staff = PctestuserModel.objects.filter(id=id)
- if not staff.exists():
- return response.json(173)
- else:
- staff.delete()
- return response.json(0)
- def logadd(self, request_dict, userID, response):
- content = request_dict.get('content', None)
- param_flag = CommonService.get_param_flag(data=[content])
- if param_flag is not True:
- return response.json(444)
- addtime = time.time()
- PctestlogModel.objects.create(user_id=userID, content=content, addtime=addtime)
- return response.json(0)
- def logquery(self, request_dict, userID, response):
- if userID == 1:
- job_device_list = PctestlogModel.objects.values('user__username', 'content', 'addtime')
- else:
- job_device_list = PctestlogModel.objects.filter(user_id=userID).values('content','addtime')
- device_list = []
- for i in job_device_list:
- device_list.append(i)
- return response.json(0, device_list)
- class TokenObject1:
- def __init__(self, token=None):
- if token == 'local':
- token = 'eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJ1c2VySUQiOiIxNTg0MzUxODk2MjgyMTM4MDAxMzgwMDAiLCJsYW5nIjoiZW4iLCJ1c2VyIjoiMTM2ODAzMTc1OTYiLCJtX2NvZGUiOiIxMjM0MTMyNDMyMTQiLCJleHAiOjE1ODcyNzcwNjB9.c0LV_XyxwbzUlYqMJqx7vw9f19Jv-0kGnUHuu_go-mo'
- if token == 'test':
- token = 'eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJ1c2VyIjoiMTM4MDAxMzgwMDEiLCJleHAiOjE1Njk5OTg4OTYsInVzZXJJRCI6IjE1MTU2NDI2MjMzNzkzOTUxMzgwMDEzODAwMSIsImxhbmciOiJlbiIsIm1fY29kZSI6IjEyMzQxMzI0MzIxNCJ9.VAQtT9AbCCfXcrNj9DL5cvVasMDoI7AP8ptgU1GoMu8'
- self.token = token
- self.lang = None
- self.id = None
- self.job = None
- self.user = ''
- self.code = 0
- # 令牌校验
- self.valid()
- def valid(self):
- if self.token is None:
- self.code = 309
- return
- try:
- res = jwt.decode(self.token, OAUTH_ACCESS_TOKEN_SECRET, algorithms='HS256')
- print('res:', res)
- # print(res)
- self.id = res.get('id', None)
- self.lang = res.get('lang', None)
- self.job = res.get('job', None)
- self.user = res.get('user', '')
- # 刷新登录时间
- # if self.userID:
- # print(self.user)
- # redisObj = RedisObject(db=3)
- # redisObj.set_data(key=self.userID, val=self.user, expire=300)
- except jwt.ExpiredSignatureError as e:
- print('过期')
- print(repr(e))
- self.code = 309
- return
- except Exception as e:
- self.code = 309
- return
- else:
- if not self.id:
- self.code = 309
- return
- else:
- if self.id:
- self.code = 0
- return res
- else:
- self.code = 309
- return
|