Test.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. @Copyright (C) ansjer cop Video Technology Co.,Ltd.All rights reserved.
  5. @AUTHOR: ASJRD018
  6. @NAME: Ansjer
  7. @software: PyCharm
  8. @DATE: 2018/5/22 13:58
  9. @Version: python3.6
  10. @MODIFY DECORD:ansjer dev
  11. @file: Test.py
  12. @Contact: chanjunkai@163.com
  13. """
  14. from django.views.generic.base import View
  15. import os
  16. '''
  17. http://192.168.136.40:8077/Test
  18. '''
  19. from Object.ResponseObject import ResponseObject
  20. from Ansjer.config import BASE_DIR
  21. import json
  22. from alipay import AliPay
  23. import time
  24. import apns2
  25. # 测试接口sdk
  26. class Test(View):
  27. def get(self, request, *args, **kwargs):
  28. request_dict = request.GET
  29. return self.do_apns(request_dict)
  30. # return self.do_get_putOss_url(request.GET)
  31. # return self.do_gcm_push(request)
  32. def do_apns(self,request_dict):
  33. token_val = request_dict.get('token_val',None)
  34. pem_path = os.path.join(BASE_DIR,'Ansjer/file/apns_pem/apns-dev.pem')
  35. print(pem_path)
  36. response = ResponseObject()
  37. # apns_config = {
  38. # 'appbundleId': {'pem_path': 'xxxx', 'topic': 'topic', 'password': 'password'}
  39. # }
  40. try:
  41. import apns2
  42. now_time = int(time.time())
  43. cli = apns2.APNSClient(mode="prod", client_cert=pem_path)
  44. push_data = {"alert": "Motion ", "event_time": now_time, "event_type": '51', "msg": "",
  45. "received_at": now_time, "sound": "sound.aif", "uid": "XFDJUHUIOKJHYTGSFFDR"}
  46. alert = apns2.PayloadAlert(body=json.dumps(push_data), title="title!")
  47. payload = apns2.Payload(alert=alert)
  48. n = apns2.Notification(payload=payload, priority=apns2.PRIORITY_LOW)
  49. res = cli.push(n=n, device_token=token_val,topic='com.ansjer.zccloud')
  50. if res.status_code == 200:
  51. return response.json(0)
  52. else:
  53. return response.json(404, res.reason)
  54. except Exception as e:
  55. return response.json(10, repr(e))
  56. def do_get_putOss_url(self, request_dict):
  57. import oss2
  58. obj_name = request_dict.get('obj_name','')
  59. # 阿里云主账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM账号进行API访问或日常运维,请登录 https://ram.console.aliyun.com 创建RAM账号。
  60. auth = oss2.Auth('LTAIyMkGfEdogyL9', '71uIjpsqVOmF7DAITRyRuc259jHOjO')
  61. # Endpoint以杭州为例,其它Region请按实际情况填写。
  62. bucket = oss2.Bucket(auth, 'oss-cn-shenzhen.aliyuncs.com', 'apg')
  63. # 设置此签名URL在60秒内有效。
  64. url = bucket.sign_url('PUT', obj_name, 7200)
  65. response = ResponseObject()
  66. return response.json(0, url)
  67. def do_gcm_push(self, request):
  68. import json
  69. import requests
  70. response = ResponseObject()
  71. rg_id = request.GET.get('rg_id', '')
  72. serverKey = request.GET.get('serverKey', '')
  73. if not rg_id or not serverKey:
  74. return response.json(444)
  75. data = {
  76. "received_at": "1",
  77. "msg": "1",
  78. "uid": "1",
  79. "alert": "1",
  80. "sound": "1",
  81. "event_time": "1",
  82. "event_type": "1",
  83. }
  84. json_data = {
  85. "collapse_key": "WhatYouWant",
  86. "data": data,
  87. "delay_while_idle": False,
  88. "time_to_live": 3600,
  89. "registration_ids": [rg_id]
  90. }
  91. url = 'https://android.googleapis.com/gcm/send'
  92. data = json.dumps(json_data).encode('utf-8')
  93. headers = {'Content-Type': 'application/json', 'Authorization': 'key=%s' % serverKey}
  94. req = requests.post(url, data, headers=headers)
  95. response = ResponseObject()
  96. return response.json(0)
  97. def jgPush(self, request):
  98. response = ResponseObject()
  99. devToken = request.GET.get('devToken', '')
  100. app_key = request.GET.get('app_key', '')
  101. master_secret = request.GET.get('master_secret', '')
  102. import jpush as jpush
  103. # 此处换成各自的app_key和master_secret
  104. _jpush = jpush.JPush(app_key, master_secret)
  105. push = _jpush.create_push()
  106. # if you set the logging level to "DEBUG",it will show the debug logging.
  107. _jpush.set_logging("DEBUG")
  108. # push.audience = jpush.all_
  109. push.audience = jpush.registration_id(devToken)
  110. push.notification = jpush.notification(alert="hello python jpush api")
  111. push.platform = jpush.all_
  112. try:
  113. res = push.send()
  114. except Exception as e:
  115. print("Exception")
  116. return response.json(10, repr(e))
  117. return response.json(0)
  118. def post(self, request, *args, **kwargs):
  119. response = ResponseObject()
  120. data = request.POST.dict()
  121. signature = data["sign"]
  122. data.pop('sign')
  123. print(json.dumps(data))
  124. print(signature)
  125. # verify
  126. app_private_key_string = open(BASE_DIR + '/Controller/alipay_private_2048.pem').read()
  127. alipay_public_key_string = open(BASE_DIR + '/Controller/alipay_public_2048.pem').read()
  128. alipay = AliPay(
  129. appid="2016092200569234",
  130. app_notify_url=None, # the default notify path
  131. app_private_key_string=app_private_key_string,
  132. alipay_public_key_string=alipay_public_key_string,
  133. sign_type="RSA2", # RSA or RSA2
  134. debug=False # False by default
  135. )
  136. success = alipay.verify(data, signature)
  137. if success and data["trade_status"] in ("TRADE_SUCCESS", "TRADE_FINISHED"):
  138. print("trade succeed")
  139. return response.json(0, signature)
  140. # 修改 资源改变
  141. def put(self, request):
  142. response = ResponseObject()
  143. return response.json(0, '')
  144. # 修改 属性改变
  145. def PATCH(self, request):
  146. response = ResponseObject()
  147. return response.json(0)
  148. # 删除
  149. def delete(self, request):
  150. response = ResponseObject()
  151. return response.json(0)
  152. def validation(self, request_dict, *args, **kwargs):
  153. response = ResponseObject()
  154. return response.json(0)