Test.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. @Copyright (C) ansjer cop Video Technology Co.,Ltd.All rights reserved.
  5. @AUTHOR: ASJRD018
  6. @NAME: Ansjer
  7. @software: PyCharm
  8. @DATE: 2018/5/22 13:58
  9. @Version: python3.6
  10. @MODIFY DECORD:ansjer dev
  11. @file: Test.py
  12. @Contact: chanjunkai@163.com
  13. """
  14. from django.views.generic.base import View
  15. import os
  16. '''
  17. http://192.168.136.40:8077/Test
  18. '''
  19. from Object.ResponseObject import ResponseObject
  20. from Ansjer.config import BASE_DIR
  21. import json
  22. from alipay import AliPay
  23. import time
  24. from django.http import JsonResponse
  25. # 测试接口sdk
  26. class Test(View):
  27. def get(self, request, *args, **kwargs):
  28. request_dict = request.GET
  29. test_push_type = request_dict.get('test_push_type')
  30. if test_push_type == 'jpush':
  31. return self.jgPush(request)
  32. elif test_push_type == 'fcm':
  33. return self.do_fcm_push(request)
  34. elif test_push_type == 's3sts':
  35. return self.do_get_s3_sts()
  36. return self.do_gcm_push(request)
  37. return self.do_alipay_query_status()
  38. def do_get_s3_sts(self):
  39. import boto3
  40. REGION_NAME = 'us-east-1' # e.g
  41. import json
  42. boto3_sts = boto3.client(
  43. 'sts',
  44. aws_access_key_id='AKIA2E67UIMD45Y3HL53',
  45. aws_secret_access_key='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',
  46. region_name=REGION_NAME
  47. )
  48. Policy = {"Version": "2012-10-17", "Statement": [
  49. {"Effect": "Allow", "Action": "s3:*", "Resource": ["arn:aws:s3:::azvod1/*"]}]}
  50. response = boto3_sts.get_federation_token(
  51. Name='chanjunkai@166.com',
  52. Policy=json.dumps(Policy),
  53. DurationSeconds=7200
  54. )
  55. response['Credentials']['bucket_name'] = 'azvod1'
  56. return JsonResponse(status=200, data=response)
  57. def do_get_aws_kinesis_vidoe(self):
  58. import boto3
  59. REGION_NAME = 'us-east-1' # e.g
  60. import json
  61. sts = boto3.client(
  62. 'sts',
  63. aws_access_key_id='AKIA2E67UIMD45Y3HL53',
  64. aws_secret_access_key='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',
  65. region_name=REGION_NAME
  66. )
  67. Policy = {"Version": "2012-10-17",
  68. "Statement": [{"Effect": "Allow", "Action": "kinesisvideo:*", "Resource": ["*"]}]}
  69. credentials = sts.get_federation_token(
  70. Name='chanjunkai@163.com', # or any unique text related to user
  71. Policy=json.dumps(Policy),
  72. DurationSeconds=36000,
  73. )
  74. print(credentials)
  75. access_key_id = credentials['Credentials']['AccessKeyId']
  76. session_token = credentials['Credentials']['SessionToken']
  77. secret_access_key = credentials['Credentials']['SecretAccessKey']
  78. response = ResponseObject()
  79. res = {
  80. 'access_key_id':access_key_id,
  81. 'secret_access_key':secret_access_key,
  82. 'session_token':session_token,
  83. }
  84. return response.json(0,res)
  85. def do_alipay_query_status(self):
  86. response = ResponseObject()
  87. # app_private_key_string = open(BASE_DIR + '/Ansjer/file/alipay/alipay_private_2048.pem').read()
  88. # alipay_public_key_string = open(BASE_DIR + '/Ansjer/file/alipay/alipay_public_2048.pem').read()
  89. app_private_key_string = open(BASE_DIR + '/Ansjer/file/alipay/zosi_alipay_private_2048.pem').read()
  90. alipay_public_key_string = open(BASE_DIR + '/Ansjer/file/alipay/zosi_alipay_public_2048.pem').read()
  91. alipay = AliPay(
  92. # appid="2016092200569234",
  93. appid="2019041663958142",
  94. app_notify_url=None, # the default notify path
  95. app_private_key_string=app_private_key_string,
  96. alipay_public_key_string=alipay_public_key_string,
  97. sign_type="RSA2", # RSA or RSA2
  98. debug=False # False by default
  99. )
  100. # check order status
  101. print("now sleep 3s")
  102. # time.sleep(3)
  103. result = alipay.api_alipay_trade_query(out_trade_no="20190424085757859937")
  104. if result.get("trade_status", "") == "TRADE_SUCCESS":
  105. paid = True
  106. print(paid)
  107. return response.json(0)
  108. else:
  109. print("not paid...")
  110. return response.json(404)
  111. def do_fcm_push(self, request):
  112. rg_id = request.GET.get('rg_id', '')
  113. serverKey = request.GET.get('serverKey', '')
  114. push_type = request.GET.get('push_type', None)
  115. # Send to single device.
  116. from pyfcm import FCMNotification
  117. # OR initialize with proxies
  118. # proxy_dict = {
  119. # "http": "http://127.0.0.1",
  120. # "https": "http://127.0.0.1",
  121. # }
  122. # push_service = FCMNotification(api_key="<api-key>", proxy_dict=proxy_dict)
  123. push_service = FCMNotification(api_key=serverKey)
  124. # Your api-key can be gotten from: https://console.firebase.google.com/project/<project-name>/settings/cloudmessaging
  125. registration_id = rg_id
  126. message_title = "Zosi Smart(xxxx 007)"
  127. now_time = int(time.time())
  128. data = {"alert": "Motion ", "event_time": now_time, "event_type": "51", "msg": "",
  129. "received_at": now_time, "sound": "sound.aif", "uid": "98UXAA8BRPA35VAL111A", "zpush": "1"}
  130. # message_body = json.dumps(data)
  131. message_body = '警告:Motion Channel:1 日期:{tt}'.format(
  132. tt=str(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))))
  133. print(message_body)
  134. from var_dump import var_dump
  135. if push_type == '1':
  136. var_dump('1111111')
  137. result = push_service.single_device_data_message(registration_id=registration_id, data_message=data)
  138. elif push_type == '2':
  139. var_dump('22222222')
  140. result = push_service.notify_single_device(registration_id=registration_id, message_title=message_title,
  141. message_body=message_body, data_message=data)
  142. else:
  143. var_dump('333333')
  144. result = push_service.notify_single_device(registration_id=registration_id, message_title=message_title,
  145. message_body=message_body)
  146. from var_dump import var_dump
  147. var_dump(result)
  148. response = ResponseObject()
  149. return response.json(0, result)
  150. def do_get_putOss_url(self, request_dict):
  151. import oss2
  152. obj_name = request_dict.get('obj_name', '')
  153. # 阿里云主账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM账号进行API访问或日常运维,请登录 https://ram.console.aliyun.com 创建RAM账号。
  154. auth = oss2.Auth('LTAIyMkGfEdogyL9', '71uIjpsqVOmF7DAITRyRuc259jHOjO')
  155. # Endpoint以杭州为例,其它Region请按实际情况填写。
  156. bucket = oss2.Bucket(auth, 'oss-cn-shenzhen.aliyuncs.com', 'apg')
  157. # 设置此签名URL在60秒内有效。
  158. url = bucket.sign_url('PUT', obj_name, 7200)
  159. response = ResponseObject()
  160. return response.json(0, url)
  161. def do_gcm_push(self, request):
  162. import json
  163. import requests
  164. response = ResponseObject()
  165. rg_id = request.GET.get('rg_id', '')
  166. serverKey = request.GET.get('serverKey', '')
  167. if not rg_id or not serverKey:
  168. return response.json(444)
  169. now_time = int(time.time())
  170. data = {"alert": "Motion ", "event_time": now_time, "event_type": "51", "msg": "",
  171. "received_at": now_time, "sound": "sound.aif", "uid": "XFDJUHUIOKJHYTGSFFDR", "zpush": "1"}
  172. json_data = {
  173. "collapse_key": "WhatYouWant",
  174. "data": data,
  175. "delay_while_idle": False,
  176. "time_to_live": 3600,
  177. "registration_ids": [rg_id]
  178. }
  179. url = 'https://android.googleapis.com/gcm/send'
  180. data = json.dumps(json_data).encode('utf-8')
  181. headers = {'Content-Type': 'application/json', 'Authorization': 'key=%s' % serverKey}
  182. req = requests.post(url, data, headers=headers)
  183. response = ResponseObject()
  184. return response.json(0)
  185. def jgPush(self, request):
  186. response = ResponseObject()
  187. devToken = request.GET.get('devToken', '')
  188. app_key = request.GET.get('app_key', '')
  189. master_secret = request.GET.get('master_secret', '')
  190. import jpush as jpush
  191. # 此处换成各自的app_key和master_secret
  192. _jpush = jpush.JPush(app_key, master_secret)
  193. push = _jpush.create_push()
  194. # if you set the logging level to "DEBUG",it will show the debug logging.
  195. _jpush.set_logging("DEBUG")
  196. # push.audience = jpush.all_
  197. n_time = int(time.time())
  198. event_type = 51
  199. push.audience = jpush.registration_id(devToken)
  200. push_data = {"alert": "Motion ", "event_time": n_time, "event_type": event_type, "msg": "",
  201. "received_at": n_time, "sound": "sound.aif", "uid": 'xoxoxoxoxoxoxoxoxoxo', "zpush": "1"}
  202. android = jpush.android(alert="Hello, Android msg", priority=1, style=1, alert_type=1, big_text='ssssssssssssssssss',
  203. extras=push_data,title='fffff')
  204. push.notification = jpush.notification(android=android)
  205. push.platform = jpush.all_
  206. try:
  207. push.send()
  208. except Exception as e:
  209. print("Exception")
  210. return response.json(10, repr(e))
  211. return response.json(0)
  212. def post(self, request, *args, **kwargs):
  213. response = ResponseObject()
  214. data = request.POST.dict()
  215. signature = data["sign"]
  216. data.pop('sign')
  217. print(json.dumps(data))
  218. print(signature)
  219. # verify
  220. app_private_key_string = open(BASE_DIR + '/Controller/alipay_private_2048.pem').read()
  221. alipay_public_key_string = open(BASE_DIR + '/Controller/alipay_public_2048.pem').read()
  222. alipay = AliPay(
  223. appid="2016092200569234",
  224. app_notify_url=None, # the default notify path
  225. app_private_key_string=app_private_key_string,
  226. alipay_public_key_string=alipay_public_key_string,
  227. sign_type="RSA2", # RSA or RSA2
  228. debug=False # False by default
  229. )
  230. success = alipay.verify(data, signature)
  231. if success and data["trade_status"] in ("TRADE_SUCCESS", "TRADE_FINISHED"):
  232. print("trade succeed")
  233. return response.json(0, signature)
  234. # 修改 资源改变
  235. def put(self, request):
  236. response = ResponseObject()
  237. return response.json(0, '')
  238. # 修改 属性改变
  239. def PATCH(self, request):
  240. response = ResponseObject()
  241. return response.json(0)
  242. # 删除
  243. def delete(self, request):
  244. response = ResponseObject()
  245. return response.json(0)
  246. def validation(self, request_dict, *args, **kwargs):
  247. response = ResponseObject()
  248. return response.json(0)