MiscellService.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521
  1. import datetime
  2. import threading
  3. import time
  4. import requests
  5. import simplejson as json
  6. from django.utils.timezone import utc
  7. from Ansjer import config as api_settings
  8. from Object.TokenObject import TokenObject
  9. from Object.mongodb import mongodb
  10. from Service.CommonService import CommonService
  11. from Service.ModelService import ModelService
  12. from Service.TemplateService import TemplateService
  13. from Object.RedisObject import RedisObject
  14. from Ansjer.config import SERVER_TYPE
  15. from Model.models import Device_User
  16. # coding:utf-8
  17. from boto3 import Session
  18. from botocore.exceptions import ClientError
  19. from boto3.dynamodb.conditions import Key, Attr
  20. import logging
  21. import json
  22. from Ansjer.config import DOMAIN_HOST,AWS_DynamoDB_REGION,AWS_DynamoDB_ACCESS_KEY,AWS_DynamoDB_SECRET_KEY
  23. logger = logging.getLogger(__name__)
  24. class MyserviceDynamodb():
  25. def __init__(self, **kwargs):
  26. self.region = AWS_DynamoDB_REGION
  27. self.access_key = AWS_DynamoDB_ACCESS_KEY
  28. self.secret_key = AWS_DynamoDB_SECRET_KEY
  29. self.session = self.__session()
  30. def __session(self):
  31. try:
  32. session = Session(aws_access_key_id=self.access_key, aws_secret_access_key=self.secret_key,region_name=self.region)
  33. except:
  34. print("Failed to connect session in region{0}".format(self.region))
  35. return session
  36. # 添加access_log表数据
  37. def access_log_item_put(self,table_name,data_list):
  38. dynamodb = self.session.resource('dynamodb')
  39. table = dynamodb.Table(table_name)
  40. with table.batch_writer() as batch:
  41. for i in data_list:
  42. data = json.loads(i.decode('utf-8'))
  43. if data['userID'] == '' :
  44. try:
  45. user_id = Device_User.objects.filter(username= data['userName']).values('userID')
  46. data['userID'] = user_id[0]['userID']
  47. except Exception:
  48. data['userID'] =''
  49. batch.put_item(
  50. Item={
  51. 'userID':data['userID'],
  52. 'addTime':data['addTime'],
  53. 'ip':data['ip'],
  54. 'status':data['status'],
  55. 'operation':data['operation'],
  56. 'content':data['content'],
  57. 'Expiration_time':data['Expiration_time'],
  58. }
  59. )
  60. print ('添加access_log表数据成功!')
  61. # 搜索该数据库表的方法
  62. def get_item(self, table_name, values):
  63. dynamodb = self.session.resource('dynamodb')
  64. if not dynamodb:
  65. raise DynamodbConnectionError("Failed to get resource for dynamodb!")
  66. try:
  67. table = dynamodb.Table(table_name)
  68. response = table.scan(
  69. FilterExpression = Attr('userID').eq(values)
  70. | Key('ip').eq(values)
  71. | Key('status').eq(values)
  72. | Key('content').eq(values)
  73. | Key('operation').eq(values)
  74. )
  75. items = response['Items']
  76. return items
  77. except Exception as e:
  78. logger.error("Failed to get table {0}, error".format(table_name, e))
  79. return []
  80. # 时间段搜索
  81. def get_item_time(self, table_name, start_date, end_date):
  82. dynamodb = self.session.resource('dynamodb')
  83. if not dynamodb:
  84. raise DynamodbConnectionError("Failed to get resource for dynamodb!")
  85. try:
  86. table = dynamodb.Table(table_name)
  87. response = table.scan(
  88. Select='COUNT',
  89. FilterExpression=(Attr('addTime').gt(start_date)
  90. & Key('addTime').lt(end_date)) | Key('addTime').eq(start_date)
  91. )
  92. items = response['Count']
  93. return items
  94. except Exception as e:
  95. logger.error("Failed to get table {0}, error".format(table_name, e))
  96. return 0
  97. # 时间段搜索
  98. def get_item_date(self, table_name, date):
  99. dynamodb = self.session.resource('dynamodb')
  100. if not dynamodb:
  101. raise DynamodbConnectionError("Failed to get resource for dynamodb!")
  102. res = {}
  103. try:
  104. table = dynamodb.Table(table_name)
  105. times = datetime.datetime.fromtimestamp(date)
  106. time_dict = CommonService.getTimeDict(times)
  107. for k, v in time_dict.items():
  108. start_date = time_dict[k]
  109. end_date = time_dict[k] + datetime.timedelta(hours=1)
  110. start_date = int(start_date.timestamp())
  111. end_date = int(end_date.timestamp())
  112. response = table.scan(
  113. Select = 'COUNT',
  114. FilterExpression=(Attr('addTime').gt(start_date)
  115. & Key('addTime').lt(end_date)) | Key('addTime').eq(start_date)
  116. )
  117. print (response['Count'])
  118. count = response['Count']
  119. if count:
  120. res[k] = count
  121. else:
  122. res[k] = 0
  123. except Exception as e:
  124. logger.error("Failed to get table {0}, error".format(table_name, e))
  125. return res
  126. def item_get_brand(self, table_name):
  127. dynamodb = self.session.resource('dynamodb')
  128. table = dynamodb.Table(table_name)
  129. try:
  130. response = table.scan()
  131. response = response['Items']
  132. return response
  133. except Exception:
  134. logger.error("Failed to put item in to {0}:error{1}".format(table))
  135. # 查询aws数据库里面有什么表
  136. def tables_list(self, table_name):
  137. client = self.session.client('dynamodb')
  138. response = client.list_tables()
  139. return response['TableNames']
  140. # 创建access_log表
  141. def access_log_table_create(self, table_name):
  142. dynamodb = self.session.resource('dynamodb')
  143. inventory = my.tables_list(table_name)
  144. if table_name in inventory:
  145. print ('access_log表包含')
  146. else:
  147. try:
  148. table = dynamodb.create_table(
  149. TableName=table_name,
  150. KeySchema=[
  151. {
  152. 'AttributeName': 'userID',
  153. 'KeyType': 'HASH'
  154. },
  155. {
  156. 'AttributeName': 'addTime',
  157. 'KeyType': 'RANGE'
  158. }
  159. ],
  160. AttributeDefinitions=[
  161. {
  162. 'AttributeName': 'userID',
  163. 'AttributeType': 'S'
  164. },
  165. {
  166. 'AttributeName': 'addTime',
  167. 'AttributeType': 'N'
  168. },
  169. ],
  170. ProvisionedThroughput={
  171. 'ReadCapacityUnits': 5,
  172. 'WriteCapacityUnits': 5,
  173. }
  174. )
  175. print ('创建表成功')
  176. except Exception:
  177. logger.error (table_name + '表已经存在')
  178. # 删除表
  179. def table_delete(self, table_name):
  180. dynamodb = self.session.resource('dynamodb')
  181. table = dynamodb.Table(table_name)
  182. table.delete()
  183. print ('删除表成功')
  184. my = MyserviceDynamodb()
  185. # print(my.table_delete('user_brand_all'))
  186. if DOMAIN_HOST == 'www.dvema.com':
  187. user_brand = 'access_log'
  188. else:
  189. user_brand = 'test_access_log'
  190. # my.table_delete(user_brand)
  191. # my.access_log_table_create(user_brand)
  192. # 杂项类,共用行不高,但有些地方需求
  193. class MiscellService():
  194. # 获取访问用户名称
  195. @staticmethod
  196. def get_access_name(request_dict):
  197. userName = request_dict.get('userName', None)
  198. if userName:
  199. return userName
  200. email = request_dict.get('email', None)
  201. if email:
  202. return email
  203. phone = request_dict.get('phone', None)
  204. if phone:
  205. return phone
  206. token = request_dict.get('token', None)
  207. user = ''
  208. if token is not None:
  209. tko = TokenObject(token)
  210. if tko.code == 0:
  211. user = tko.user
  212. # user = ModelService.get_user_name(tko.userID)
  213. return user
  214. @staticmethod
  215. def add_access_log(request, status_code):
  216. # 增加多进程 异步
  217. asy = threading.Thread(target=addLog, args=(request, status_code))
  218. asy.start()
  219. @staticmethod
  220. def access_log(request, response, type):
  221. if request.method == 'GET':
  222. request_dict = request.GET
  223. elif request.method == 'POST':
  224. # request.encoding = 'utf-8'
  225. request_dict = request.POST
  226. else:
  227. return
  228. api_list = TemplateService.log_api()
  229. request_path = request.path.strip().strip('/')
  230. if request_path in api_list:
  231. clientIP = CommonService.get_ip_address(request)
  232. now_time = time.time()
  233. password = request_dict.get('userPwd', None)
  234. if password is not None:
  235. request_dict = dict(request_dict)
  236. request_dict.pop('userPwd')
  237. content = json.dumps(request_dict)
  238. area = CommonService.getAddr(ip=clientIP)
  239. if type == 1:
  240. status_code = 200
  241. else:
  242. status_code = response.status_code
  243. add_data = {
  244. 'user': MiscellService.get_access_name(request=request),
  245. 'ip': clientIP,
  246. 'status': status_code,
  247. 'path': request_path,
  248. 'method': request.method,
  249. 'time': int(now_time),
  250. 'area': area,
  251. 'content': content,
  252. 'et': datetime.datetime.utcnow()
  253. }
  254. mdb = mongodb()
  255. col = "log_access"
  256. mdb.insert_one(col=col, data=add_data)
  257. # 获取所有设备ip地址的指向的国家
  258. @staticmethod
  259. def getArea(ip):
  260. data = {'ip': ip}
  261. country = ''
  262. URL = 'http://ip.taobao.com/service/getIpInfo.php'
  263. try:
  264. r = requests.get(URL, params=data, timeout=3)
  265. except requests.RequestException as e:
  266. print(e)
  267. else:
  268. json_data = r.json()
  269. if json_data['code'] == 0:
  270. if json_data['data']['country'] != 'XX':
  271. country = json_data['data']['country']
  272. return country
  273. @staticmethod
  274. def get_item_log(table_name, values):
  275. item_log = my.get_item(table_name, values)
  276. return item_log
  277. @staticmethod
  278. def item_get_brand(table_name):
  279. item_log = my.item_get_brand(table_name)
  280. return item_log
  281. @staticmethod
  282. def get_item_time(table_name,start_date, end_date):
  283. item_log = my.get_item_time(table_name,start_date, end_date)
  284. return item_log
  285. @staticmethod
  286. def get_item_date(table_name,date):
  287. item_log = my.get_item_date(table_name,date)
  288. return item_log
  289. # 下载接口添加访问日志
  290. @staticmethod
  291. def add_ota_download_log(request):
  292. clientIP = CommonService.get_ip_address(request)
  293. request_path = request.path.strip().strip('/')
  294. now_time = datetime.datetime.utcnow().replace(tzinfo=utc).astimezone(utc)
  295. add_data = {
  296. 'user': 'None',
  297. 'ip': clientIP,
  298. 'status': 200,
  299. 'url': request.META['SERVER_PROTOCOL'] + '-' + request.method + '-' + request.path,
  300. 'operation': request_path,
  301. 'time': now_time,
  302. 'content': ''
  303. }
  304. ModelService.addAccessLog(data=add_data)
  305. @staticmethod
  306. def batch_add_access_log(request, status_code):
  307. asy = threading.Thread(target=batch_add_log_ctr, args=(request, status_code))
  308. asy.start()
  309. @staticmethod
  310. def DynamoDB_add_access_log(request, status_code):
  311. asy = threading.Thread(target=dynamo_db_add_log_ctr, args=(request, status_code))
  312. asy.start()
  313. def addLog(request, status_code):
  314. try:
  315. request.encoding = 'utf-8'
  316. if request.method == 'GET':
  317. request_dict = request.GET
  318. elif request.method == 'POST':
  319. request_dict = request.POST
  320. else:
  321. return
  322. api_list = TemplateService.log_api()
  323. request_path = request.path.strip().strip('/')
  324. if request_path in api_list:
  325. user = MiscellService.get_access_name(request_dict=request_dict)
  326. clientIP = CommonService.get_ip_address(request)
  327. now_time = datetime.datetime.utcnow().replace(tzinfo=utc).astimezone(utc)
  328. password = request_dict.get('userPwd', None)
  329. if password is not None:
  330. request_dict = dict(request_dict)
  331. request_dict.pop('userPwd')
  332. content = json.dumps(request_dict)
  333. print(content)
  334. if user != '':
  335. user = user
  336. else:
  337. print('空')
  338. user = '空'
  339. add_data = {
  340. 'user': user,
  341. 'ip': clientIP,
  342. 'status': status_code,
  343. 'url': request.META['SERVER_PROTOCOL'] + '-' + request.method + '-' + request.path,
  344. 'operation': request_path,
  345. 'time': now_time,
  346. 'content': content
  347. }
  348. print(add_data)
  349. ModelService.addAccessLog(data=add_data)
  350. except Exception as e:
  351. print(repr(e))
  352. pass
  353. def batch_add_log_ctr(request, status_code):
  354. request.encoding = 'utf-8'
  355. if request.method == 'GET':
  356. request_dict = request.GET
  357. elif request.method == 'POST':
  358. request_dict = request.POST
  359. else:
  360. return
  361. api_list = TemplateService.log_api()
  362. request_path = request.path.strip().strip('/')
  363. if request_path in api_list:
  364. user = MiscellService.get_access_name(request_dict)
  365. clientIP = CommonService.get_ip_address(request)
  366. now_time = datetime.datetime.utcnow().replace(tzinfo=utc).astimezone(utc)
  367. password = request_dict.get('userPwd', None)
  368. if password is not None:
  369. request_dict = dict(request_dict)
  370. request_dict.pop('userPwd')
  371. content = json.dumps(request_dict)
  372. add_data = {
  373. 'user': user,
  374. 'ip': clientIP,
  375. 'status': status_code,
  376. 'url': request.META['SERVER_PROTOCOL'] + '-' + request.method + '-' + request.path,
  377. 'operation': request_path,
  378. 'time': str(now_time),
  379. 'content': content
  380. }
  381. redisObj = RedisObject()
  382. loggerData = json.dumps(add_data)
  383. # print(loggerData)
  384. if SERVER_TYPE == 'Ansjer.formal_settings':
  385. logKey = 'logger'
  386. else:
  387. logKey = 'test_logger'
  388. redisObj.rpush(name=logKey, val=loggerData)
  389. # 判断redis列表长度
  390. if redisObj.llen(name=logKey) > 100 or SERVER_TYPE == 'Ansjer.test_settings':
  391. data_list = redisObj.lrange(logKey, 0, -1)
  392. redisObj.del_data(key=logKey)
  393. ModelService.add_batch_log(data_list)
  394. def dynamo_db_add_log_ctr(request, status_code):
  395. request.encoding = 'utf-8'
  396. if request.method == 'GET':
  397. request_dict = request.GET
  398. elif request.method == 'POST':
  399. request_dict = request.POST
  400. else:
  401. return
  402. api_list = TemplateService.log_api()
  403. request_path = request.path.strip().strip('/')
  404. if SERVER_TYPE == 'Ansjer.formal_settings':
  405. logKey = 'loggers'
  406. else:
  407. logKey = 'test_loggers'
  408. # 判断redis列表长度为10条以上就会添加日志
  409. num = 1
  410. clientIP = CommonService.get_ip_address(request)
  411. token = request_dict.get('token', None)
  412. userID = '无'
  413. if token is not None:
  414. tko = TokenObject(token)
  415. userID = tko.userID
  416. password = request_dict.get('userPwd', None)
  417. userName = request_dict.get('userName', None)
  418. if password is not None:
  419. request_dict = dict(request_dict)
  420. request_dict.pop('userPwd')
  421. content = json.dumps(request_dict)
  422. addTime = int(time.time())
  423. if DOMAIN_HOST == 'www.dvema.com':
  424. user_brand = 'access_log'
  425. else:
  426. user_brand = 'test_access_log'
  427. # 过滤某些接口集合
  428. if request_path in api_list:
  429. add_data = {
  430. 'userName':userName,
  431. 'userID': userID,
  432. 'ip': clientIP,
  433. 'status': status_code,
  434. 'operation': request_path,
  435. 'addTime': addTime,
  436. 'content': content,
  437. 'Expiration_time': addTime + 2592000
  438. }
  439. redisObj = RedisObject()
  440. loggerData = json.dumps(add_data)
  441. redisObj.rpush(name=logKey, val=loggerData)
  442. # 判断redis列表长度
  443. if redisObj.llen(name=logKey) > num or SERVER_TYPE == 'Ansjer.test_settings':
  444. data_list = redisObj.lrange(logKey, 0, -1)
  445. redisObj.del_data(key=logKey)
  446. my.access_log_item_put(user_brand,data_list)
  447. # 判断是是否是下载接口的
  448. else:
  449. path = request.path
  450. index = path.find('/OTA/downloads')
  451. if index != -1:
  452. addsCount = len(api_settings.ADDR_URL)
  453. if addsCount > 1000:
  454. response = ResponseObject()
  455. # 提示获取链接失败,实际上是下载太频繁,超过了1000的并发量
  456. return response.json(901)
  457. else:
  458. # 合理的情况就添加访问日志
  459. adds = request.META.get('REMOTE_ADDR', None)
  460. api_settings.ADDR_URL.append(adds)
  461. add_data = {
  462. 'userName': '无',
  463. 'userID':'无',
  464. 'ip': clientIP,
  465. 'status': status_code,
  466. 'operation': request_path,
  467. 'addTime': addTime,
  468. 'content': content,
  469. 'Expiration_time': addTime + 2592000
  470. }
  471. redisObj = RedisObject()
  472. loggerData = json.dumps(add_data)
  473. redisObj.rpush(name=logKey, val=loggerData)
  474. # 判断redis列表长度
  475. if redisObj.llen(name=logKey) > num or SERVER_TYPE == 'Ansjer.test_settings':
  476. data_list = redisObj.lrange(logKey, 0, -1)
  477. redisObj.del_data(key=logKey)
  478. my.access_log_item_put(user_brand, data_list)