Test.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. @Copyright (C) ansjer cop Video Technology Co.,Ltd.All rights reserved.
  5. @AUTHOR: ASJRD018
  6. @NAME: Ansjer
  7. @software: PyCharm
  8. @DATE: 2018/5/22 13:58
  9. @Version: python3.6
  10. @MODIFY DECORD:ansjer dev
  11. @file: Test.py
  12. @Contact: chanjunkai@163.com
  13. """
  14. from django.views.generic.base import View
  15. '''
  16. http://192.168.136.40:8077/Test
  17. '''
  18. from Object.ResponseObject import ResponseObject
  19. from Ansjer.config import BASE_DIR
  20. import json
  21. from alipay import AliPay
  22. import time
  23. # 测试接口sdk
  24. class Test(View):
  25. def get(self, request, *args, **kwargs):
  26. return self.do_get_putOss_url()
  27. # return self.do_gcm_push(request)
  28. def do_get_putOss_url(self):
  29. import oss2
  30. # 阿里云主账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM账号进行API访问或日常运维,请登录 https://ram.console.aliyun.com 创建RAM账号。
  31. auth = oss2.Auth('LTAIyMkGfEdogyL9', '71uIjpsqVOmF7DAITRyRuc259jHOjO')
  32. # Endpoint以杭州为例,其它Region请按实际情况填写。
  33. bucket = oss2.Bucket(auth, 'oss-cn-shenzhen.aliyuncs.com', 'cnvod1')
  34. # 设置此签名URL在60秒内有效。
  35. url = bucket.sign_url('PUT', 'wupengyangceshi.png', 7200)
  36. response = ResponseObject()
  37. return response.json(0,url)
  38. def do_gcm_push(self, request):
  39. import json
  40. import requests
  41. response = ResponseObject()
  42. rg_id = request.GET.get('rg_id', '')
  43. serverKey = request.GET.get('serverKey', '')
  44. if not rg_id or not serverKey:
  45. return response.json(444)
  46. data = {
  47. "received_at": "1",
  48. "msg": "1",
  49. "uid": "1",
  50. "alert": "1",
  51. "sound": "1",
  52. "event_time": "1",
  53. "event_type": "1",
  54. }
  55. json_data = {
  56. "collapse_key": "WhatYouWant",
  57. "data": data,
  58. "delay_while_idle": False,
  59. "time_to_live": 3600,
  60. "registration_ids": [rg_id]
  61. }
  62. url = 'https://android.googleapis.com/gcm/send'
  63. data = json.dumps(json_data).encode('utf-8')
  64. headers = {'Content-Type': 'application/json', 'Authorization': 'key=%s' % serverKey}
  65. req = requests.post(url, data, headers=headers)
  66. response = ResponseObject()
  67. return response.json(0)
  68. def jgPush(self, request):
  69. response = ResponseObject()
  70. devToken = request.GET.get('devToken', '')
  71. app_key = request.GET.get('app_key', '')
  72. master_secret = request.GET.get('master_secret', '')
  73. import jpush as jpush
  74. # 此处换成各自的app_key和master_secret
  75. _jpush = jpush.JPush(app_key, master_secret)
  76. push = _jpush.create_push()
  77. # if you set the logging level to "DEBUG",it will show the debug logging.
  78. _jpush.set_logging("DEBUG")
  79. # push.audience = jpush.all_
  80. push.audience = jpush.registration_id(devToken)
  81. push.notification = jpush.notification(alert="hello python jpush api")
  82. push.platform = jpush.all_
  83. try:
  84. res = push.send()
  85. except Exception as e:
  86. print("Exception")
  87. return response.json(10, repr(e))
  88. return response.json(0)
  89. def post(self, request, *args, **kwargs):
  90. response = ResponseObject()
  91. data = request.POST.dict()
  92. signature = data["sign"]
  93. data.pop('sign')
  94. print(json.dumps(data))
  95. print(signature)
  96. # verify
  97. app_private_key_string = open(BASE_DIR + '/Controller/alipay_private_2048.pem').read()
  98. alipay_public_key_string = open(BASE_DIR + '/Controller/alipay_public_2048.pem').read()
  99. alipay = AliPay(
  100. appid="2016092200569234",
  101. app_notify_url=None, # the default notify path
  102. app_private_key_string=app_private_key_string,
  103. alipay_public_key_string=alipay_public_key_string,
  104. sign_type="RSA2", # RSA or RSA2
  105. debug=False # False by default
  106. )
  107. success = alipay.verify(data, signature)
  108. if success and data["trade_status"] in ("TRADE_SUCCESS", "TRADE_FINISHED"):
  109. print("trade succeed")
  110. return response.json(0, signature)
  111. # 修改 资源改变
  112. def put(self, request):
  113. response = ResponseObject()
  114. return response.json(0, '')
  115. # 修改 属性改变
  116. def PATCH(self, request):
  117. response = ResponseObject()
  118. return response.json(0)
  119. # 删除
  120. def delete(self, request):
  121. response = ResponseObject()
  122. return response.json(0)
  123. def validation(self, request_dict, *args, **kwargs):
  124. response = ResponseObject()
  125. return response.json(0)