#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ @Copyright (C) ansjer cop Video Technology Co.,Ltd.All rights reserved. @AUTHOR: ASJRD018 @NAME: Ansjer @software: PyCharm @DATE: 2018/5/22 13:58 @Version: python3.6 @MODIFY DECORD:ansjer dev @file: Test.py @Contact: chanjunkai@163.com """ from django.views.generic.base import View import os ''' http://192.168.136.40:8077/Test ''' from Object.ResponseObject import ResponseObject from Ansjer.config import BASE_DIR import json from alipay import AliPay import time import apns2 # 测试接口sdk class Test(View): def get(self, request, *args, **kwargs): request_dict = request.GET return self.do_apns(request_dict) # return self.do_get_putOss_url(request.GET) # from django.http import JsonResponse # return JsonResponse(status=200,data={ # 'code': 173, # 'msg': 'data is not exist'}) return self.do_gcm_push(request) return self.do_alipay_query_status() def do_alipay_query_status(self): response = ResponseObject() # app_private_key_string = open(BASE_DIR + '/Ansjer/file/alipay/alipay_private_2048.pem').read() # alipay_public_key_string = open(BASE_DIR + '/Ansjer/file/alipay/alipay_public_2048.pem').read() app_private_key_string = open(BASE_DIR + '/Ansjer/file/alipay/zosi_alipay_private_2048.pem').read() alipay_public_key_string = open(BASE_DIR + '/Ansjer/file/alipay/zosi_alipay_public_2048.pem').read() alipay = AliPay( # appid="2016092200569234", appid="2019041663958142", app_notify_url=None, # the default notify path app_private_key_string=app_private_key_string, alipay_public_key_string=alipay_public_key_string, sign_type="RSA2", # RSA or RSA2 debug=False # False by default ) # check order status print("now sleep 3s") # time.sleep(3) result = alipay.api_alipay_trade_query(out_trade_no="20190424085757859937") if result.get("trade_status", "") == "TRADE_SUCCESS": paid = True print(paid) return response.json(0) else: print("not paid...") return response.json(404) def do_apns(self,request_dict): token_val = request_dict.get('token_val',None) pem_path = os.path.join(BASE_DIR, 'Ansjer/file/apns_pem/apns-dev-test.pem') print(pem_path) response = ResponseObject() try: import apns2 now_time = int(time.time()) # cli = apns2.APNSClient(mode="dev", client_cert="/your/path.pem") # alert = apns2.PayloadAlert(body="body!", title="title!") # payload = apns2.Payload(alert=alert) # n = apns2.Notification(payload=payload, priority=apns2.PRIORITY_LOW) # response = cli.push(n=n, device_token="your_token") # assert response.status_code == 200, response.reason # assert response.apns_id # cli = apns2.APNSClient(mode="prod", client_cert=pem_path) cli = apns2.APNSClient(mode="dev", client_cert=pem_path,password='111111') push_data = {"alert": "Motion ", "event_time": now_time, "event_type": '51', "msg": "", "received_at": now_time, "sound": "sound.aif", "uid": "XFDJUHUIOKJHYTGSFFDR"} alert = apns2.PayloadAlert(body=json.dumps(push_data), title="title!") payload = apns2.Payload(alert=alert) n = apns2.Notification(payload=payload, priority=apns2.PRIORITY_LOW) res = cli.push(n=n, device_token=token_val,topic='com.ansjer.loocamccloud') if res.status_code == 200: return response.json(0) else: return response.json(404, res.reason) except Exception as e: return response.json(10, repr(e)) def do_get_putOss_url(self, request_dict): import oss2 obj_name = request_dict.get('obj_name','') # 阿里云主账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM账号进行API访问或日常运维,请登录 https://ram.console.aliyun.com 创建RAM账号。 auth = oss2.Auth('LTAIyMkGfEdogyL9', '71uIjpsqVOmF7DAITRyRuc259jHOjO') # Endpoint以杭州为例,其它Region请按实际情况填写。 bucket = oss2.Bucket(auth, 'oss-cn-shenzhen.aliyuncs.com', 'apg') # 设置此签名URL在60秒内有效。 url = bucket.sign_url('PUT', obj_name, 7200) response = ResponseObject() return response.json(0, url) def do_gcm_push(self, request): import json import requests response = ResponseObject() rg_id = request.GET.get('rg_id', '') serverKey = request.GET.get('serverKey', '') if not rg_id or not serverKey: return response.json(444) data = { "received_at": "1", "msg": "1", "uid": "1", "alert": "1", "sound": "1", "event_time": "1", "event_type": "1", } json_data = { "collapse_key": "WhatYouWant", "data": data, "delay_while_idle": False, "time_to_live": 3600, "registration_ids": [rg_id] } url = 'https://android.googleapis.com/gcm/send' data = json.dumps(json_data).encode('utf-8') headers = {'Content-Type': 'application/json', 'Authorization': 'key=%s' % serverKey} req = requests.post(url, data, headers=headers) response = ResponseObject() return response.json(0) def jgPush(self, request): response = ResponseObject() devToken = request.GET.get('devToken', '') app_key = request.GET.get('app_key', '') master_secret = request.GET.get('master_secret', '') import jpush as jpush # 此处换成各自的app_key和master_secret _jpush = jpush.JPush(app_key, master_secret) push = _jpush.create_push() # if you set the logging level to "DEBUG",it will show the debug logging. _jpush.set_logging("DEBUG") # push.audience = jpush.all_ push.audience = jpush.registration_id(devToken) push.notification = jpush.notification(alert="hello python jpush api") push.platform = jpush.all_ try: res = push.send() except Exception as e: print("Exception") return response.json(10, repr(e)) return response.json(0) def post(self, request, *args, **kwargs): response = ResponseObject() data = request.POST.dict() signature = data["sign"] data.pop('sign') print(json.dumps(data)) print(signature) # verify app_private_key_string = open(BASE_DIR + '/Controller/alipay_private_2048.pem').read() alipay_public_key_string = open(BASE_DIR + '/Controller/alipay_public_2048.pem').read() alipay = AliPay( appid="2016092200569234", app_notify_url=None, # the default notify path app_private_key_string=app_private_key_string, alipay_public_key_string=alipay_public_key_string, sign_type="RSA2", # RSA or RSA2 debug=False # False by default ) success = alipay.verify(data, signature) if success and data["trade_status"] in ("TRADE_SUCCESS", "TRADE_FINISHED"): print("trade succeed") return response.json(0, signature) # 修改 资源改变 def put(self, request): response = ResponseObject() return response.json(0, '') # 修改 属性改变 def PATCH(self, request): response = ResponseObject() return response.json(0) # 删除 def delete(self, request): response = ResponseObject() return response.json(0) def validation(self, request_dict, *args, **kwargs): response = ResponseObject() return response.json(0)