Test.py 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. @Copyright (C) ansjer cop Video Technology Co.,Ltd.All rights reserved.
  5. @AUTHOR: ASJRD018
  6. @NAME: Ansjer
  7. @software: PyCharm
  8. @DATE: 2018/5/22 13:58
  9. @Version: python3.6
  10. @MODIFY DECORD:ansjer dev
  11. @file: Test.py
  12. @Contact: chanjunkai@163.com
  13. """
  14. from django.views.generic.base import View
  15. import os
  16. '''
  17. http://192.168.136.40:8077/Test
  18. '''
  19. from Object.ResponseObject import ResponseObject
  20. from Ansjer.config import BASE_DIR
  21. import json
  22. from alipay import AliPay
  23. import time
  24. import apns2
  25. # 测试接口sdk
  26. class Test(View):
  27. def get(self, request, *args, **kwargs):
  28. request_dict = request.GET
  29. # return self.do_apns(request_dict)
  30. # return self.do_get_putOss_url(request.GET)
  31. # from django.http import JsonResponse
  32. # return JsonResponse(status=200,data={
  33. # 'code': 173,
  34. # 'msg': 'data is not exist'})
  35. return self.do_fcm_push(request)
  36. return self.do_gcm_push(request)
  37. return self.do_alipay_query_status()
  38. def do_alipay_query_status(self):
  39. response = ResponseObject()
  40. # app_private_key_string = open(BASE_DIR + '/Ansjer/file/alipay/alipay_private_2048.pem').read()
  41. # alipay_public_key_string = open(BASE_DIR + '/Ansjer/file/alipay/alipay_public_2048.pem').read()
  42. app_private_key_string = open(BASE_DIR + '/Ansjer/file/alipay/zosi_alipay_private_2048.pem').read()
  43. alipay_public_key_string = open(BASE_DIR + '/Ansjer/file/alipay/zosi_alipay_public_2048.pem').read()
  44. alipay = AliPay(
  45. # appid="2016092200569234",
  46. appid="2019041663958142",
  47. app_notify_url=None, # the default notify path
  48. app_private_key_string=app_private_key_string,
  49. alipay_public_key_string=alipay_public_key_string,
  50. sign_type="RSA2", # RSA or RSA2
  51. debug=False # False by default
  52. )
  53. # check order status
  54. print("now sleep 3s")
  55. # time.sleep(3)
  56. result = alipay.api_alipay_trade_query(out_trade_no="20190424085757859937")
  57. if result.get("trade_status", "") == "TRADE_SUCCESS":
  58. paid = True
  59. print(paid)
  60. return response.json(0)
  61. else:
  62. print("not paid...")
  63. return response.json(404)
  64. def do_fcm_push(self, request):
  65. rg_id = request.GET.get('rg_id', '')
  66. serverKey = request.GET.get('serverKey', '')
  67. # Send to single device.
  68. from pyfcm import FCMNotification
  69. push_service = FCMNotification(api_key="<api-key>")
  70. # OR initialize with proxies
  71. # proxy_dict = {
  72. # "http": "http://127.0.0.1",
  73. # "https": "http://127.0.0.1",
  74. # }
  75. # push_service = FCMNotification(api_key="<api-key>", proxy_dict=proxy_dict)
  76. push_service = FCMNotification(api_key=serverKey)
  77. # Your api-key can be gotten from: https://console.firebase.google.com/project/<project-name>/settings/cloudmessaging
  78. registration_id = rg_id
  79. message_title = "Zosi Smart(Camera 007)"
  80. now_time = int(time.time())
  81. data = {"alert": "Motion ", "event_time": now_time, "event_type": "51", "msg": "",
  82. "received_at": now_time, "sound": "sound.aif", "uid": "XFDJUHUIOKJHYTGSFFDR", "zpush": "1"}
  83. # message_body = json.dumps(data)
  84. message_body = '警告:Motion Channel:1 日期:{tt}'.format(tt=str(time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(time.time()))))
  85. result = push_service.notify_single_device(registration_id=registration_id, message_title=message_title,
  86. message_body=message_body,data_message=data)
  87. from var_dump import var_dump
  88. var_dump(result)
  89. response = ResponseObject()
  90. return response.json(0, result)
  91. def do_apns(self, request_dict):
  92. token_val = request_dict.get('token_val', None)
  93. pem_path = os.path.join(BASE_DIR, 'Ansjer/file/apns_pem/apns-dev-test.pem')
  94. print(pem_path)
  95. response = ResponseObject()
  96. try:
  97. import apns2
  98. now_time = int(time.time())
  99. # cli = apns2.APNSClient(mode="dev", client_cert="/your/path.pem")
  100. # alert = apns2.PayloadAlert(body="body!", title="title!")
  101. # payload = apns2.Payload(alert=alert)
  102. # n = apns2.Notification(payload=payload, priority=apns2.PRIORITY_LOW)
  103. # response = cli.push(n=n, device_token="your_token")
  104. # assert response.status_code == 200, response.reason
  105. # assert response.apns_id
  106. # cli = apns2.APNSClient(mode="prod", client_cert=pem_path)
  107. cli = apns2.APNSClient(mode="dev", client_cert=pem_path, password='111111')
  108. push_data = {"alert": "Motion ", "event_time": now_time, "event_type": "51", "msg": "",
  109. "received_at": now_time, "sound": "sound.aif", "uid": "XFDJUHUIOKJHYTGSFFDR", "zpush": "1"}
  110. alert = apns2.PayloadAlert(body=json.dumps(push_data), title="title!")
  111. payload = apns2.Payload(alert=alert)
  112. n = apns2.Notification(payload=payload, priority=apns2.PRIORITY_LOW)
  113. res = cli.push(n=n, device_token=token_val, topic='com.ansjer.loocamccloud')
  114. if res.status_code == 200:
  115. return response.json(0)
  116. else:
  117. return response.json(404, res.reason)
  118. except Exception as e:
  119. return response.json(10, repr(e))
  120. def do_get_putOss_url(self, request_dict):
  121. import oss2
  122. obj_name = request_dict.get('obj_name', '')
  123. # 阿里云主账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM账号进行API访问或日常运维,请登录 https://ram.console.aliyun.com 创建RAM账号。
  124. auth = oss2.Auth('LTAIyMkGfEdogyL9', '71uIjpsqVOmF7DAITRyRuc259jHOjO')
  125. # Endpoint以杭州为例,其它Region请按实际情况填写。
  126. bucket = oss2.Bucket(auth, 'oss-cn-shenzhen.aliyuncs.com', 'apg')
  127. # 设置此签名URL在60秒内有效。
  128. url = bucket.sign_url('PUT', obj_name, 7200)
  129. response = ResponseObject()
  130. return response.json(0, url)
  131. def do_gcm_push(self, request):
  132. import json
  133. import requests
  134. response = ResponseObject()
  135. rg_id = request.GET.get('rg_id', '')
  136. serverKey = request.GET.get('serverKey', '')
  137. if not rg_id or not serverKey:
  138. return response.json(444)
  139. now_time = int(time.time())
  140. data = {"alert": "Motion ", "event_time": now_time, "event_type": "51", "msg": "",
  141. "received_at": now_time, "sound": "sound.aif", "uid": "XFDJUHUIOKJHYTGSFFDR", "zpush": "1"}
  142. json_data = {
  143. "collapse_key": "WhatYouWant",
  144. "data": data,
  145. "delay_while_idle": False,
  146. "time_to_live": 3600,
  147. "registration_ids": [rg_id]
  148. }
  149. url = 'https://android.googleapis.com/gcm/send'
  150. data = json.dumps(json_data).encode('utf-8')
  151. headers = {'Content-Type': 'application/json', 'Authorization': 'key=%s' % serverKey}
  152. req = requests.post(url, data, headers=headers)
  153. response = ResponseObject()
  154. return response.json(0)
  155. def jgPush(self, request):
  156. response = ResponseObject()
  157. devToken = request.GET.get('devToken', '')
  158. app_key = request.GET.get('app_key', '')
  159. master_secret = request.GET.get('master_secret', '')
  160. import jpush as jpush
  161. # 此处换成各自的app_key和master_secret
  162. _jpush = jpush.JPush(app_key, master_secret)
  163. push = _jpush.create_push()
  164. # if you set the logging level to "DEBUG",it will show the debug logging.
  165. _jpush.set_logging("DEBUG")
  166. # push.audience = jpush.all_
  167. push.audience = jpush.registration_id(devToken)
  168. push.notification = jpush.notification(alert="hello python jpush api")
  169. push.platform = jpush.all_
  170. try:
  171. res = push.send()
  172. except Exception as e:
  173. print("Exception")
  174. return response.json(10, repr(e))
  175. return response.json(0)
  176. def post(self, request, *args, **kwargs):
  177. response = ResponseObject()
  178. data = request.POST.dict()
  179. signature = data["sign"]
  180. data.pop('sign')
  181. print(json.dumps(data))
  182. print(signature)
  183. # verify
  184. app_private_key_string = open(BASE_DIR + '/Controller/alipay_private_2048.pem').read()
  185. alipay_public_key_string = open(BASE_DIR + '/Controller/alipay_public_2048.pem').read()
  186. alipay = AliPay(
  187. appid="2016092200569234",
  188. app_notify_url=None, # the default notify path
  189. app_private_key_string=app_private_key_string,
  190. alipay_public_key_string=alipay_public_key_string,
  191. sign_type="RSA2", # RSA or RSA2
  192. debug=False # False by default
  193. )
  194. success = alipay.verify(data, signature)
  195. if success and data["trade_status"] in ("TRADE_SUCCESS", "TRADE_FINISHED"):
  196. print("trade succeed")
  197. return response.json(0, signature)
  198. # 修改 资源改变
  199. def put(self, request):
  200. response = ResponseObject()
  201. return response.json(0, '')
  202. # 修改 属性改变
  203. def PATCH(self, request):
  204. response = ResponseObject()
  205. return response.json(0)
  206. # 删除
  207. def delete(self, request):
  208. response = ResponseObject()
  209. return response.json(0)
  210. def validation(self, request_dict, *args, **kwargs):
  211. response = ResponseObject()
  212. return response.json(0)