Test.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. @Copyright (C) ansjer cop Video Technology Co.,Ltd.All rights reserved.
  5. @AUTHOR: ASJRD018
  6. @NAME: Ansjer
  7. @software: PyCharm
  8. @DATE: 2018/5/22 13:58
  9. @Version: python3.6
  10. @MODIFY DECORD:ansjer dev
  11. @file: Test.py
  12. @Contact: chanjunkai@163.com
  13. """
  14. from django.views.generic.base import View
  15. import os
  16. '''
  17. http://192.168.136.40:8077/Test
  18. '''
  19. from Object.ResponseObject import ResponseObject
  20. from Ansjer.config import BASE_DIR
  21. import json
  22. from alipay import AliPay
  23. import time
  24. import apns2
  25. from django.http import JsonResponse
  26. # 测试接口sdk
  27. class Test(View):
  28. def get(self, request, *args, **kwargs):
  29. request_dict = request.GET
  30. test_push_type = request_dict.get('test_push_type')
  31. if test_push_type == 'jpush':
  32. return self.jgPush(request)
  33. elif test_push_type == 'fcm':
  34. return self.do_fcm_push(request)
  35. elif test_push_type == 'apns':
  36. return self.do_apns(request.GET)
  37. elif test_push_type == 's3sts':
  38. return self.do_get_s3_sts()
  39. return self.do_gcm_push(request)
  40. return self.do_alipay_query_status()
  41. def do_get_s3_sts(self):
  42. import boto3
  43. REGION_NAME = 'us-east-1' # e.g
  44. import json
  45. boto3_sts = boto3.client(
  46. 'sts',
  47. aws_access_key_id='AKIA2E67UIMD45Y3HL53',
  48. aws_secret_access_key='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',
  49. region_name=REGION_NAME
  50. )
  51. Policy = {"Version": "2012-10-17", "Statement": [
  52. {"Effect": "Allow", "Action": "s3:*", "Resource": ["arn:aws:s3:::azvod1/*"]}]}
  53. response = boto3_sts.get_federation_token(
  54. Name='chanjunkai@166.com',
  55. Policy=json.dumps(Policy),
  56. DurationSeconds=7200
  57. )
  58. response['Credentials']['bucket_name'] = 'azvod1'
  59. return JsonResponse(status=200, data=response)
  60. def do_get_aws_kinesis_vidoe(self):
  61. import boto3
  62. REGION_NAME = 'us-east-1' # e.g
  63. import json
  64. sts = boto3.client(
  65. 'sts',
  66. aws_access_key_id='AKIA2E67UIMD45Y3HL53',
  67. aws_secret_access_key='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',
  68. region_name=REGION_NAME
  69. )
  70. Policy = {"Version": "2012-10-17",
  71. "Statement": [{"Effect": "Allow", "Action": "kinesisvideo:*", "Resource": ["*"]}]}
  72. credentials = sts.get_federation_token(
  73. Name='chanjunkai@163.com', # or any unique text related to user
  74. Policy=json.dumps(Policy),
  75. DurationSeconds=36000,
  76. )
  77. print(credentials)
  78. access_key_id = credentials['Credentials']['AccessKeyId']
  79. session_token = credentials['Credentials']['SessionToken']
  80. secret_access_key = credentials['Credentials']['SecretAccessKey']
  81. response = ResponseObject()
  82. res = {
  83. 'access_key_id':access_key_id,
  84. 'secret_access_key':secret_access_key,
  85. 'session_token':session_token,
  86. }
  87. return response.json(0,res)
  88. def do_alipay_query_status(self):
  89. response = ResponseObject()
  90. # app_private_key_string = open(BASE_DIR + '/Ansjer/file/alipay/alipay_private_2048.pem').read()
  91. # alipay_public_key_string = open(BASE_DIR + '/Ansjer/file/alipay/alipay_public_2048.pem').read()
  92. app_private_key_string = open(BASE_DIR + '/Ansjer/file/alipay/zosi_alipay_private_2048.pem').read()
  93. alipay_public_key_string = open(BASE_DIR + '/Ansjer/file/alipay/zosi_alipay_public_2048.pem').read()
  94. alipay = AliPay(
  95. # appid="2016092200569234",
  96. appid="2019041663958142",
  97. app_notify_url=None, # the default notify path
  98. app_private_key_string=app_private_key_string,
  99. alipay_public_key_string=alipay_public_key_string,
  100. sign_type="RSA2", # RSA or RSA2
  101. debug=False # False by default
  102. )
  103. # check order status
  104. print("now sleep 3s")
  105. # time.sleep(3)
  106. result = alipay.api_alipay_trade_query(out_trade_no="20190424085757859937")
  107. if result.get("trade_status", "") == "TRADE_SUCCESS":
  108. paid = True
  109. print(paid)
  110. return response.json(0)
  111. else:
  112. print("not paid...")
  113. return response.json(404)
  114. def do_fcm_push(self, request):
  115. rg_id = request.GET.get('rg_id', '')
  116. serverKey = request.GET.get('serverKey', '')
  117. push_type = request.GET.get('push_type', None)
  118. # Send to single device.
  119. from pyfcm import FCMNotification
  120. # OR initialize with proxies
  121. # proxy_dict = {
  122. # "http": "http://127.0.0.1",
  123. # "https": "http://127.0.0.1",
  124. # }
  125. # push_service = FCMNotification(api_key="<api-key>", proxy_dict=proxy_dict)
  126. push_service = FCMNotification(api_key=serverKey)
  127. # Your api-key can be gotten from: https://console.firebase.google.com/project/<project-name>/settings/cloudmessaging
  128. registration_id = rg_id
  129. message_title = "Zosi Smart(xxxx 007)"
  130. now_time = int(time.time())
  131. data = {"alert": "Motion ", "event_time": now_time, "event_type": "51", "msg": "",
  132. "received_at": now_time, "sound": "sound.aif", "uid": "98UXAA8BRPA35VAL111A", "zpush": "1"}
  133. # message_body = json.dumps(data)
  134. message_body = '警告:Motion Channel:1 日期:{tt}'.format(
  135. tt=str(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))))
  136. print(message_body)
  137. from var_dump import var_dump
  138. if push_type == '1':
  139. var_dump('1111111')
  140. result = push_service.single_device_data_message(registration_id=registration_id, data_message=data)
  141. elif push_type == '2':
  142. var_dump('22222222')
  143. result = push_service.notify_single_device(registration_id=registration_id, message_title=message_title,
  144. message_body=message_body, data_message=data)
  145. else:
  146. var_dump('333333')
  147. result = push_service.notify_single_device(registration_id=registration_id, message_title=message_title,
  148. message_body=message_body)
  149. from var_dump import var_dump
  150. var_dump(result)
  151. response = ResponseObject()
  152. return response.json(0, result)
  153. def do_apns(self, request_dict):
  154. token_val = request_dict.get('token_val', None)
  155. pem_path = os.path.join(BASE_DIR, 'Ansjer/file/apns_pem/apns-dev2.pem')
  156. # pem_path = os.path.join(BASE_DIR, 'Ansjer/file/apns_pem/apns-loocamccloud.pem')
  157. print(pem_path)
  158. response = ResponseObject()
  159. try:
  160. n_time = int(time.time())
  161. cli = apns2.APNSClient(mode="dev", client_cert=pem_path,
  162. password='111111')
  163. push_data = {"alert": "Motion ", "event_time": n_time, "event_type": 51, "msg": "",
  164. "received_at": n_time, "sound": "sound.aif", "uid": 'XFDJUHUIOKJHYTGSFFDR', "zpush": "1", "channel": 1}
  165. alert = apns2.PayloadAlert(body='通道:1 uid:XFDJUHUIOKJHYTGSFFDR', title='ansjer')
  166. payload = apns2.Payload(alert=alert, custom=push_data)
  167. n = apns2.Notification(payload=payload, priority=apns2.PRIORITY_LOW)
  168. res = cli.push(n=n, device_token=token_val, topic='com.ansjer.loocamccloud')
  169. # assert res.status_code == 200, res.reason
  170. # assert res.apns_id
  171. if res.status_code == 200:
  172. return response.json(0)
  173. else:
  174. return response.json(404, res.reason)
  175. except Exception as e:
  176. return response.json(10, repr(e))
  177. def do_get_putOss_url(self, request_dict):
  178. import oss2
  179. obj_name = request_dict.get('obj_name', '')
  180. # 阿里云主账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM账号进行API访问或日常运维,请登录 https://ram.console.aliyun.com 创建RAM账号。
  181. auth = oss2.Auth('LTAIyMkGfEdogyL9', '71uIjpsqVOmF7DAITRyRuc259jHOjO')
  182. # Endpoint以杭州为例,其它Region请按实际情况填写。
  183. bucket = oss2.Bucket(auth, 'oss-cn-shenzhen.aliyuncs.com', 'apg')
  184. # 设置此签名URL在60秒内有效。
  185. url = bucket.sign_url('PUT', obj_name, 7200)
  186. response = ResponseObject()
  187. return response.json(0, url)
  188. def do_gcm_push(self, request):
  189. import json
  190. import requests
  191. response = ResponseObject()
  192. rg_id = request.GET.get('rg_id', '')
  193. serverKey = request.GET.get('serverKey', '')
  194. if not rg_id or not serverKey:
  195. return response.json(444)
  196. now_time = int(time.time())
  197. data = {"alert": "Motion ", "event_time": now_time, "event_type": "51", "msg": "",
  198. "received_at": now_time, "sound": "sound.aif", "uid": "XFDJUHUIOKJHYTGSFFDR", "zpush": "1"}
  199. json_data = {
  200. "collapse_key": "WhatYouWant",
  201. "data": data,
  202. "delay_while_idle": False,
  203. "time_to_live": 3600,
  204. "registration_ids": [rg_id]
  205. }
  206. url = 'https://android.googleapis.com/gcm/send'
  207. data = json.dumps(json_data).encode('utf-8')
  208. headers = {'Content-Type': 'application/json', 'Authorization': 'key=%s' % serverKey}
  209. req = requests.post(url, data, headers=headers)
  210. response = ResponseObject()
  211. return response.json(0)
  212. def jgPush(self, request):
  213. response = ResponseObject()
  214. devToken = request.GET.get('devToken', '')
  215. app_key = request.GET.get('app_key', '')
  216. master_secret = request.GET.get('master_secret', '')
  217. import jpush as jpush
  218. # 此处换成各自的app_key和master_secret
  219. _jpush = jpush.JPush(app_key, master_secret)
  220. push = _jpush.create_push()
  221. # if you set the logging level to "DEBUG",it will show the debug logging.
  222. _jpush.set_logging("DEBUG")
  223. # push.audience = jpush.all_
  224. n_time = int(time.time())
  225. event_type = 51
  226. push.audience = jpush.registration_id(devToken)
  227. push_data = {"alert": "Motion ", "event_time": n_time, "event_type": event_type, "msg": "",
  228. "received_at": n_time, "sound": "sound.aif", "uid": 'xoxoxoxoxoxoxoxoxoxo', "zpush": "1"}
  229. android = jpush.android(alert="Hello, Android msg", priority=1, style=1, alert_type=1, big_text='ssssssssssssssssss',
  230. extras=push_data,title='fffff')
  231. push.notification = jpush.notification(android=android)
  232. push.platform = jpush.all_
  233. try:
  234. push.send()
  235. except Exception as e:
  236. print("Exception")
  237. return response.json(10, repr(e))
  238. return response.json(0)
  239. def post(self, request, *args, **kwargs):
  240. response = ResponseObject()
  241. data = request.POST.dict()
  242. signature = data["sign"]
  243. data.pop('sign')
  244. print(json.dumps(data))
  245. print(signature)
  246. # verify
  247. app_private_key_string = open(BASE_DIR + '/Controller/alipay_private_2048.pem').read()
  248. alipay_public_key_string = open(BASE_DIR + '/Controller/alipay_public_2048.pem').read()
  249. alipay = AliPay(
  250. appid="2016092200569234",
  251. app_notify_url=None, # the default notify path
  252. app_private_key_string=app_private_key_string,
  253. alipay_public_key_string=alipay_public_key_string,
  254. sign_type="RSA2", # RSA or RSA2
  255. debug=False # False by default
  256. )
  257. success = alipay.verify(data, signature)
  258. if success and data["trade_status"] in ("TRADE_SUCCESS", "TRADE_FINISHED"):
  259. print("trade succeed")
  260. return response.json(0, signature)
  261. # 修改 资源改变
  262. def put(self, request):
  263. response = ResponseObject()
  264. return response.json(0, '')
  265. # 修改 属性改变
  266. def PATCH(self, request):
  267. response = ResponseObject()
  268. return response.json(0)
  269. # 删除
  270. def delete(self, request):
  271. response = ResponseObject()
  272. return response.json(0)
  273. def validation(self, request_dict, *args, **kwargs):
  274. response = ResponseObject()
  275. return response.json(0)