MiscellService.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512
  1. import datetime
  2. import threading
  3. import time
  4. import requests
  5. import simplejson as json
  6. from django.utils.timezone import utc
  7. from Ansjer import local_settings as api_settings
  8. from Object.TokenObject import TokenObject
  9. from Object.mongodb import mongodb
  10. from Service.CommonService import CommonService
  11. from Service.ModelService import ModelService
  12. from Service.TemplateService import TemplateService
  13. from Object.RedisObject import RedisObject
  14. from Ansjer.config import SERVER_TYPE
  15. from Model.models import Device_User
  16. # coding:utf-8
  17. from boto3 import Session
  18. from botocore.exceptions import ClientError
  19. from boto3.dynamodb.conditions import Key, Attr
  20. import logging
  21. import json
  22. from Ansjer.config import DOMAIN_HOST,AWS_DynamoDB_REGION,AWS_DynamoDB_ACCESS_KEY,AWS_DynamoDB_SECRET_KEY
  23. logger = logging.getLogger(__name__)
  24. class MyserviceDynamodb():
  25. def __init__(self, **kwargs):
  26. self.region = AWS_DynamoDB_REGION
  27. self.access_key = AWS_DynamoDB_ACCESS_KEY
  28. self.secret_key = AWS_DynamoDB_SECRET_KEY
  29. self.session = self.__session()
  30. def __session(self):
  31. try:
  32. session = Session(aws_access_key_id=self.access_key, aws_secret_access_key=self.secret_key,region_name=self.region)
  33. except:
  34. print("Failed to connect session in region{0}".format(self.region))
  35. return session
  36. # 添加access_log表数据
  37. def access_log_item_put(self,table_name,data_list):
  38. dynamodb = self.session.resource('dynamodb')
  39. table = dynamodb.Table(table_name)
  40. with table.batch_writer() as batch:
  41. for i in data_list:
  42. data = json.loads(i.decode('utf-8'))
  43. if data['userID'] == '' :
  44. try:
  45. user_id = Device_User.objects.filter(username= data['userName']).values('userID')
  46. data['userID'] = user_id[0]['userID']
  47. except Exception:
  48. data['userID'] =''
  49. batch.put_item(
  50. Item={
  51. 'userID':data['userID'],
  52. 'addTime':data['addTime'],
  53. 'ip':data['ip'],
  54. 'status':data['status'],
  55. 'operation':data['operation'],
  56. 'content':data['content'],
  57. 'Expiration_time':data['Expiration_time'],
  58. }
  59. )
  60. print ('添加access_log表数据成功!')
  61. # 搜索该数据库表的方法
  62. def get_item(self, table_name, values):
  63. dynamodb = self.session.resource('dynamodb')
  64. if not dynamodb:
  65. raise DynamodbConnectionError("Failed to get resource for dynamodb!")
  66. try:
  67. table = dynamodb.Table(table_name)
  68. response = table.scan(
  69. FilterExpression=Attr('userID').eq(values)
  70. | Key('ip').eq(values)
  71. | Key('status').eq(values)
  72. | Key('content').eq(values)
  73. | Key('operation').eq(values)
  74. )
  75. except Exception as e:
  76. logger.error("Failed to get table {0}, error".format(table_name, e))
  77. items = response['Items']
  78. return items
  79. # 时间段搜索
  80. def get_item_time(self, table_name, start_date, end_date):
  81. dynamodb = self.session.resource('dynamodb')
  82. if not dynamodb:
  83. raise DynamodbConnectionError("Failed to get resource for dynamodb!")
  84. try:
  85. table = dynamodb.Table(table_name)
  86. response = table.scan(
  87. Select='COUNT',
  88. FilterExpression=(Attr('addTime').gt(start_date)
  89. & Key('addTime').lt(end_date)) | Key('addTime').eq(start_date)
  90. )
  91. items = response['Count']
  92. return items
  93. except Exception as e:
  94. logger.error("Failed to get table {0}, error".format(table_name, e))
  95. return 0
  96. # 时间段搜索
  97. def get_item_date(self, table_name, date):
  98. dynamodb = self.session.resource('dynamodb')
  99. if not dynamodb:
  100. raise DynamodbConnectionError("Failed to get resource for dynamodb!")
  101. res = {}
  102. try:
  103. table = dynamodb.Table(table_name)
  104. times = datetime.datetime.fromtimestamp(date)
  105. time_dict = CommonService.getTimeDict(times)
  106. for k, v in time_dict.items():
  107. start_date = time_dict[k]
  108. end_date = time_dict[k] + datetime.timedelta(hours=1)
  109. start_date = int(start_date.timestamp())
  110. end_date = int(end_date.timestamp())
  111. response = table.scan(
  112. Select = 'COUNT',
  113. FilterExpression=(Attr('addTime').gt(start_date)
  114. & Key('addTime').lt(end_date)) | Key('addTime').eq(start_date)
  115. )
  116. print (response['Count'])
  117. count = response['Count']
  118. if count:
  119. res[k] = count
  120. else:
  121. res[k] = 0
  122. except Exception as e:
  123. logger.error("Failed to get table {0}, error".format(table_name, e))
  124. return res
  125. def item_get_brand(self, table_name):
  126. dynamodb = self.session.resource('dynamodb')
  127. table = dynamodb.Table(table_name)
  128. try:
  129. response = table.scan()
  130. response = response['Items']
  131. return response
  132. except Exception:
  133. logger.error("Failed to put item in to {0}:error{1}".format(table))
  134. # 查询aws数据库里面有什么表
  135. def tables_list(self, table_name):
  136. client = self.session.client('dynamodb')
  137. response = client.list_tables()
  138. return response['TableNames']
  139. # 创建access_log表
  140. def access_log_table_create(self, table_name):
  141. dynamodb = self.session.resource('dynamodb')
  142. inventory = my.tables_list(table_name)
  143. if table_name in inventory:
  144. print ('access_log表包含')
  145. else:
  146. try:
  147. table = dynamodb.create_table(
  148. TableName=table_name,
  149. KeySchema=[
  150. {
  151. 'AttributeName': 'userID',
  152. 'KeyType': 'HASH'
  153. },
  154. {
  155. 'AttributeName': 'addTime',
  156. 'KeyType': 'RANGE'
  157. }
  158. ],
  159. AttributeDefinitions=[
  160. {
  161. 'AttributeName': 'userID',
  162. 'AttributeType': 'S'
  163. },
  164. {
  165. 'AttributeName': 'addTime',
  166. 'AttributeType': 'N'
  167. },
  168. ],
  169. ProvisionedThroughput={
  170. 'ReadCapacityUnits': 5,
  171. 'WriteCapacityUnits': 5,
  172. }
  173. )
  174. print ('创建表成功')
  175. except Exception:
  176. logger.error (table_name + '表已经存在')
  177. # 删除表
  178. def table_delete(self, table_name):
  179. dynamodb = self.session.resource('dynamodb')
  180. table = dynamodb.Table(table_name)
  181. table.delete()
  182. print ('删除表成功')
  183. my = MyserviceDynamodb()
  184. # print(my.table_delete('user_brand_all'))
  185. if DOMAIN_HOST == 'www.dvema.com':
  186. user_brand = 'access_log'
  187. else:
  188. user_brand = 'test_access_log'
  189. # my.table_delete(user_brand)
  190. # my.access_log_table_create(user_brand)
  191. # 杂项类,共用行不高,但有些地方需求
  192. class MiscellService():
  193. # 获取访问用户名称
  194. @staticmethod
  195. def get_access_name(request_dict):
  196. userName = request_dict.get('userName', None)
  197. if userName:
  198. return userName
  199. email = request_dict.get('email', None)
  200. if email:
  201. return email
  202. phone = request_dict.get('phone', None)
  203. if phone:
  204. return phone
  205. token = request_dict.get('token', None)
  206. user = ''
  207. if token is not None:
  208. tko = TokenObject(token)
  209. if tko.code == 0:
  210. user = tko.user
  211. # user = ModelService.get_user_name(tko.userID)
  212. return user
  213. @staticmethod
  214. def add_access_log(request, status_code):
  215. # 增加多进程 异步
  216. asy = threading.Thread(target=addLog, args=(request, status_code))
  217. asy.start()
  218. @staticmethod
  219. def access_log(request, response, type):
  220. if request.method == 'GET':
  221. request_dict = request.GET
  222. elif request.method == 'POST':
  223. # request.encoding = 'utf-8'
  224. request_dict = request.POST
  225. else:
  226. return
  227. api_list = TemplateService.log_api()
  228. request_path = request.path.strip().strip('/')
  229. if request_path in api_list:
  230. clientIP = CommonService.get_ip_address(request)
  231. now_time = time.time()
  232. password = request_dict.get('userPwd', None)
  233. if password is not None:
  234. request_dict = dict(request_dict)
  235. request_dict.pop('userPwd')
  236. content = json.dumps(request_dict)
  237. area = CommonService.getAddr(ip=clientIP)
  238. if type == 1:
  239. status_code = 200
  240. else:
  241. status_code = response.status_code
  242. add_data = {
  243. 'user': MiscellService.get_access_name(request=request),
  244. 'ip': clientIP,
  245. 'status': status_code,
  246. 'path': request_path,
  247. 'method': request.method,
  248. 'time': int(now_time),
  249. 'area': area,
  250. 'content': content,
  251. 'et': datetime.datetime.utcnow()
  252. }
  253. mdb = mongodb()
  254. col = "log_access"
  255. mdb.insert_one(col=col, data=add_data)
  256. # 获取所有设备ip地址的指向的国家
  257. @staticmethod
  258. def getArea(ip):
  259. data = {'ip': ip}
  260. country = ''
  261. URL = 'http://ip.taobao.com/service/getIpInfo.php'
  262. try:
  263. r = requests.get(URL, params=data, timeout=3)
  264. except requests.RequestException as e:
  265. print(e)
  266. else:
  267. json_data = r.json()
  268. if json_data['code'] == 0:
  269. if json_data['data']['country'] != 'XX':
  270. country = json_data['data']['country']
  271. return country
  272. @staticmethod
  273. def get_item_log(table_name, values):
  274. item_log = my.get_item(table_name, values)
  275. return item_log
  276. @staticmethod
  277. def item_get_brand(table_name):
  278. item_log = my.item_get_brand(table_name)
  279. return item_log
  280. @staticmethod
  281. def get_item_time(table_name,start_date, end_date):
  282. item_log = my.get_item_time(table_name,start_date, end_date)
  283. return item_log
  284. @staticmethod
  285. def get_item_date(table_name,date):
  286. item_log = my.get_item_date(table_name,date)
  287. return item_log
  288. # 下载接口添加访问日志
  289. @staticmethod
  290. def add_ota_download_log(request):
  291. clientIP = CommonService.get_ip_address(request)
  292. request_path = request.path.strip().strip('/')
  293. now_time = datetime.datetime.utcnow().replace(tzinfo=utc).astimezone(utc)
  294. add_data = {
  295. 'user': 'None',
  296. 'ip': clientIP,
  297. 'status': 200,
  298. 'url': request.META['SERVER_PROTOCOL'] + '-' + request.method + '-' + request.path,
  299. 'operation': request_path,
  300. 'time': now_time,
  301. 'content': ''
  302. }
  303. ModelService.addAccessLog(data=add_data)
  304. @staticmethod
  305. def batch_add_access_log(request, status_code):
  306. asy = threading.Thread(target=batch_add_log_ctr, args=(request, status_code))
  307. asy.start()
  308. @staticmethod
  309. def DynamoDB_add_access_log(request, status_code):
  310. asy = threading.Thread(target=dynamo_db_add_log_ctr, args=(request, status_code))
  311. asy.start()
  312. def addLog(request, status_code):
  313. try:
  314. request.encoding = 'utf-8'
  315. if request.method == 'GET':
  316. request_dict = request.GET
  317. elif request.method == 'POST':
  318. request_dict = request.POST
  319. else:
  320. return
  321. api_list = TemplateService.log_api()
  322. request_path = request.path.strip().strip('/')
  323. if request_path in api_list:
  324. user = MiscellService.get_access_name(request_dict=request_dict)
  325. clientIP = CommonService.get_ip_address(request)
  326. now_time = datetime.datetime.utcnow().replace(tzinfo=utc).astimezone(utc)
  327. password = request_dict.get('userPwd', None)
  328. if password is not None:
  329. request_dict = dict(request_dict)
  330. request_dict.pop('userPwd')
  331. content = json.dumps(request_dict)
  332. add_data = {
  333. 'user': user,
  334. 'ip': clientIP,
  335. 'status': status_code,
  336. 'url': request.META['SERVER_PROTOCOL'] + '-' + request.method + '-' + request.path,
  337. 'operation': request_path,
  338. 'time': now_time,
  339. 'content': content
  340. }
  341. ModelService.addAccessLog(data=add_data)
  342. except Exception as e:
  343. pass
  344. def batch_add_log_ctr(request, status_code):
  345. request.encoding = 'utf-8'
  346. if request.method == 'GET':
  347. request_dict = request.GET
  348. elif request.method == 'POST':
  349. request_dict = request.POST
  350. else:
  351. return
  352. api_list = TemplateService.log_api()
  353. request_path = request.path.strip().strip('/')
  354. if request_path in api_list:
  355. user = MiscellService.get_access_name(request_dict)
  356. clientIP = CommonService.get_ip_address(request)
  357. now_time = datetime.datetime.utcnow().replace(tzinfo=utc).astimezone(utc)
  358. password = request_dict.get('userPwd', None)
  359. if password is not None:
  360. request_dict = dict(request_dict)
  361. request_dict.pop('userPwd')
  362. content = json.dumps(request_dict)
  363. add_data = {
  364. 'user': user,
  365. 'ip': clientIP,
  366. 'status': status_code,
  367. 'url': request.META['SERVER_PROTOCOL'] + '-' + request.method + '-' + request.path,
  368. 'operation': request_path,
  369. 'time': str(now_time),
  370. 'content': content
  371. }
  372. redisObj = RedisObject()
  373. loggerData = json.dumps(add_data)
  374. # print(loggerData)
  375. if SERVER_TYPE == 'Ansjer.formal_settings':
  376. logKey = 'logger'
  377. else:
  378. logKey = 'test_logger'
  379. redisObj.rpush(name=logKey, val=loggerData)
  380. # 判断redis列表长度
  381. if redisObj.llen(name=logKey) > 100 or SERVER_TYPE == 'Ansjer.test_settings':
  382. data_list = redisObj.lrange(logKey, 0, -1)
  383. redisObj.del_data(key=logKey)
  384. ModelService.add_batch_log(data_list)
  385. def dynamo_db_add_log_ctr(request, status_code):
  386. request.encoding = 'utf-8'
  387. if request.method == 'GET':
  388. request_dict = request.GET
  389. elif request.method == 'POST':
  390. request_dict = request.POST
  391. else:
  392. return
  393. api_list = TemplateService.log_api()
  394. request_path = request.path.strip().strip('/')
  395. if SERVER_TYPE == 'Ansjer.formal_settings':
  396. logKey = 'loggers'
  397. else:
  398. logKey = 'test_loggers'
  399. # 判断redis列表长度为10条以上就会添加日志
  400. num = 1
  401. clientIP = CommonService.get_ip_address(request)
  402. token = request_dict.get('token', None)
  403. userID = '无'
  404. if token is not None:
  405. tko = TokenObject(token)
  406. userID = tko.userID
  407. password = request_dict.get('userPwd', None)
  408. userName = request_dict.get('userName', None)
  409. if password is not None:
  410. request_dict = dict(request_dict)
  411. request_dict.pop('userPwd')
  412. content = json.dumps(request_dict)
  413. addTime = int(time.time())
  414. if DOMAIN_HOST == 'www.dvema.com':
  415. user_brand = 'access_log'
  416. else:
  417. user_brand = 'test_access_log'
  418. # 过滤某些接口集合
  419. if request_path in api_list:
  420. add_data = {
  421. 'userName':userName,
  422. 'userID': userID,
  423. 'ip': clientIP,
  424. 'status': status_code,
  425. 'operation': request_path,
  426. 'addTime': addTime,
  427. 'content': content,
  428. 'Expiration_time': addTime + 2592000
  429. }
  430. redisObj = RedisObject()
  431. loggerData = json.dumps(add_data)
  432. redisObj.rpush(name=logKey, val=loggerData)
  433. # 判断redis列表长度
  434. if redisObj.llen(name=logKey) > num or SERVER_TYPE == 'Ansjer.test_settings':
  435. data_list = redisObj.lrange(logKey, 0, -1)
  436. redisObj.del_data(key=logKey)
  437. my.access_log_item_put(user_brand,data_list)
  438. # 判断是是否是下载接口的
  439. else:
  440. path = request.path
  441. index = path.find('/OTA/downloads')
  442. if index != -1:
  443. addsCount = len(api_settings.ADDR_URL)
  444. if addsCount > 1000:
  445. response = ResponseObject()
  446. # 提示获取链接失败,实际上是下载太频繁,超过了1000的并发量
  447. return response.json(901)
  448. else:
  449. # 合理的情况就添加访问日志
  450. adds = request.META.get('REMOTE_ADDR', None)
  451. api_settings.ADDR_URL.append(adds)
  452. add_data = {
  453. 'userName': '无',
  454. 'userID':'无',
  455. 'ip': clientIP,
  456. 'status': status_code,
  457. 'operation': request_path,
  458. 'addTime': addTime,
  459. 'content': content,
  460. 'Expiration_time': addTime + 2592000
  461. }
  462. redisObj = RedisObject()
  463. loggerData = json.dumps(add_data)
  464. redisObj.rpush(name=logKey, val=loggerData)
  465. # 判断redis列表长度
  466. if redisObj.llen(name=logKey) > num or SERVER_TYPE == 'Ansjer.test_settings':
  467. data_list = redisObj.lrange(logKey, 0, -1)
  468. redisObj.del_data(key=logKey)
  469. my.access_log_item_put(user_brand, data_list)