123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- @Copyright (C) ansjer cop Video Technology Co.,Ltd.All rights reserved.
- @AUTHOR: ASJRD018
- @NAME: Ansjer
- @software: PyCharm
- @DATE: 2018/5/22 13:58
- @Version: python3.6
- @MODIFY DECORD:ansjer dev
- @file: Test.py
- @Contact: chanjunkai@163.com
- """
- from django.views.generic.base import View
- import os
- '''
- http://192.168.136.40:8077/Test
- '''
- from Object.ResponseObject import ResponseObject
- from Ansjer.config import BASE_DIR
- import json
- from alipay import AliPay
- import time
- import apns2
- # 测试接口sdk
- class Test(View):
- def get(self, request, *args, **kwargs):
-
- request_dict = request.GET
- return self.do_apns(request_dict)
- # return self.do_get_putOss_url(request.GET)
- # from django.http import JsonResponse
- # return JsonResponse(status=200,data={
- # 'code': 173,
- # 'msg': 'data is not exist'})
- return self.do_gcm_push(request)
- return self.do_alipay_query_status()
- def do_alipay_query_status(self):
- response = ResponseObject()
- # app_private_key_string = open(BASE_DIR + '/Ansjer/file/alipay/alipay_private_2048.pem').read()
- # alipay_public_key_string = open(BASE_DIR + '/Ansjer/file/alipay/alipay_public_2048.pem').read()
- app_private_key_string = open(BASE_DIR + '/Ansjer/file/alipay/zosi_alipay_private_2048.pem').read()
- alipay_public_key_string = open(BASE_DIR + '/Ansjer/file/alipay/zosi_alipay_public_2048.pem').read()
- alipay = AliPay(
- # appid="2016092200569234",
- appid="2019041663958142",
- app_notify_url=None, # the default notify path
- app_private_key_string=app_private_key_string,
- alipay_public_key_string=alipay_public_key_string,
- sign_type="RSA2", # RSA or RSA2
- debug=False # False by default
- )
- # check order status
- print("now sleep 3s")
- # time.sleep(3)
- result = alipay.api_alipay_trade_query(out_trade_no="20190424085757859937")
- if result.get("trade_status", "") == "TRADE_SUCCESS":
- paid = True
- print(paid)
- return response.json(0)
- else:
- print("not paid...")
- return response.json(404)
- def do_apns(self,request_dict):
- token_val = request_dict.get('token_val',None)
- pem_path = os.path.join(BASE_DIR, 'Ansjer/file/apns_pem/apns-dev-test.pem')
- print(pem_path)
- response = ResponseObject()
- try:
- import apns2
- now_time = int(time.time())
- # cli = apns2.APNSClient(mode="dev", client_cert="/your/path.pem")
- # alert = apns2.PayloadAlert(body="body!", title="title!")
- # payload = apns2.Payload(alert=alert)
- # n = apns2.Notification(payload=payload, priority=apns2.PRIORITY_LOW)
- # response = cli.push(n=n, device_token="your_token")
- # assert response.status_code == 200, response.reason
- # assert response.apns_id
- # cli = apns2.APNSClient(mode="prod", client_cert=pem_path)
- cli = apns2.APNSClient(mode="dev", client_cert=pem_path,password='111111')
- push_data = {"alert": "Motion ", "event_time": now_time, "event_type": '51', "msg": "",
- "received_at": now_time, "sound": "sound.aif", "uid": "XFDJUHUIOKJHYTGSFFDR"}
- alert = apns2.PayloadAlert(body=json.dumps(push_data), title="title!")
- payload = apns2.Payload(alert=alert)
- n = apns2.Notification(payload=payload, priority=apns2.PRIORITY_LOW)
- res = cli.push(n=n, device_token=token_val,topic='com.ansjer.loocamccloud')
- if res.status_code == 200:
- return response.json(0)
- else:
- return response.json(404, res.reason)
- except Exception as e:
- return response.json(10, repr(e))
- def do_get_putOss_url(self, request_dict):
- import oss2
- obj_name = request_dict.get('obj_name','')
- # 阿里云主账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM账号进行API访问或日常运维,请登录 https://ram.console.aliyun.com 创建RAM账号。
- auth = oss2.Auth('LTAIyMkGfEdogyL9', '71uIjpsqVOmF7DAITRyRuc259jHOjO')
- # Endpoint以杭州为例,其它Region请按实际情况填写。
- bucket = oss2.Bucket(auth, 'oss-cn-shenzhen.aliyuncs.com', 'apg')
- # 设置此签名URL在60秒内有效。
- url = bucket.sign_url('PUT', obj_name, 7200)
- response = ResponseObject()
- return response.json(0, url)
- def do_gcm_push(self, request):
- import json
- import requests
- response = ResponseObject()
- rg_id = request.GET.get('rg_id', '')
- serverKey = request.GET.get('serverKey', '')
- if not rg_id or not serverKey:
- return response.json(444)
- data = {
- "received_at": "1",
- "msg": "1",
- "uid": "1",
- "alert": "1",
- "sound": "1",
- "event_time": "1",
- "event_type": "1",
- }
- json_data = {
- "collapse_key": "WhatYouWant",
- "data": data,
- "delay_while_idle": False,
- "time_to_live": 3600,
- "registration_ids": [rg_id]
- }
- url = 'https://android.googleapis.com/gcm/send'
- data = json.dumps(json_data).encode('utf-8')
- headers = {'Content-Type': 'application/json', 'Authorization': 'key=%s' % serverKey}
- req = requests.post(url, data, headers=headers)
- response = ResponseObject()
- return response.json(0)
- def jgPush(self, request):
- response = ResponseObject()
- devToken = request.GET.get('devToken', '')
- app_key = request.GET.get('app_key', '')
- master_secret = request.GET.get('master_secret', '')
- import jpush as jpush
- # 此处换成各自的app_key和master_secret
- _jpush = jpush.JPush(app_key, master_secret)
- push = _jpush.create_push()
- # if you set the logging level to "DEBUG",it will show the debug logging.
- _jpush.set_logging("DEBUG")
- # push.audience = jpush.all_
- push.audience = jpush.registration_id(devToken)
- push.notification = jpush.notification(alert="hello python jpush api")
- push.platform = jpush.all_
- try:
- res = push.send()
- except Exception as e:
- print("Exception")
- return response.json(10, repr(e))
- return response.json(0)
- def post(self, request, *args, **kwargs):
- response = ResponseObject()
- data = request.POST.dict()
- signature = data["sign"]
- data.pop('sign')
- print(json.dumps(data))
- print(signature)
- # verify
- app_private_key_string = open(BASE_DIR + '/Controller/alipay_private_2048.pem').read()
- alipay_public_key_string = open(BASE_DIR + '/Controller/alipay_public_2048.pem').read()
- alipay = AliPay(
- appid="2016092200569234",
- app_notify_url=None, # the default notify path
- app_private_key_string=app_private_key_string,
- alipay_public_key_string=alipay_public_key_string,
- sign_type="RSA2", # RSA or RSA2
- debug=False # False by default
- )
- success = alipay.verify(data, signature)
- if success and data["trade_status"] in ("TRADE_SUCCESS", "TRADE_FINISHED"):
- print("trade succeed")
- return response.json(0, signature)
- # 修改 资源改变
- def put(self, request):
- response = ResponseObject()
- return response.json(0, '')
- # 修改 属性改变
- def PATCH(self, request):
- response = ResponseObject()
- return response.json(0)
- # 删除
- def delete(self, request):
- response = ResponseObject()
- return response.json(0)
- def validation(self, request_dict, *args, **kwargs):
- response = ResponseObject()
- return response.json(0)
|