#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ @Copyright (C) ansjer cop Video Technology Co.,Ltd.All rights reserved. @AUTHOR: ASJRD018 @NAME: Ansjer @software: PyCharm @DATE: 2018/5/22 13:58 @Version: python3.6 @MODIFY DECORD:ansjer dev @file: Test.py @Contact: chanjunkai@163.com """ from django.views.generic.base import View import os ''' http://192.168.136.40:8077/Test ''' from Object.ResponseObject import ResponseObject from Ansjer.config import BASE_DIR import json from alipay import AliPay import time from django.http import JsonResponse # 测试接口sdk class Test(View): def get(self, request, *args, **kwargs): request_dict = request.GET test_push_type = request_dict.get('test_push_type') if test_push_type == 'jpush': return self.jgPush(request) elif test_push_type == 'fcm': return self.do_fcm_push(request) elif test_push_type == 's3sts': return self.do_get_s3_sts() return self.do_gcm_push(request) return self.do_alipay_query_status() def do_get_s3_sts(self): import boto3 REGION_NAME = 'us-east-1' # e.g import json boto3_sts = boto3.client( 'sts', aws_access_key_id='AKIA2E67UIMD45Y3HL53', aws_secret_access_key='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', region_name=REGION_NAME ) Policy = {"Version": "2012-10-17", "Statement": [ {"Effect": "Allow", "Action": "s3:*", "Resource": ["arn:aws:s3:::azvod1/*"]}]} response = boto3_sts.get_federation_token( Name='chanjunkai@166.com', Policy=json.dumps(Policy), DurationSeconds=7200 ) response['Credentials']['bucket_name'] = 'azvod1' return JsonResponse(status=200, data=response) def do_get_aws_kinesis_vidoe(self): import boto3 REGION_NAME = 'us-east-1' # e.g import json sts = boto3.client( 'sts', aws_access_key_id='AKIA2E67UIMD45Y3HL53', aws_secret_access_key='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', region_name=REGION_NAME ) Policy = {"Version": "2012-10-17", "Statement": [{"Effect": "Allow", "Action": "kinesisvideo:*", "Resource": ["*"]}]} credentials = sts.get_federation_token( Name='chanjunkai@163.com', # or any unique text related to user Policy=json.dumps(Policy), DurationSeconds=36000, ) print(credentials) access_key_id = credentials['Credentials']['AccessKeyId'] session_token = credentials['Credentials']['SessionToken'] secret_access_key = credentials['Credentials']['SecretAccessKey'] response = ResponseObject() res = { 'access_key_id':access_key_id, 'secret_access_key':secret_access_key, 'session_token':session_token, } return response.json(0,res) def do_alipay_query_status(self): response = ResponseObject() # app_private_key_string = open(BASE_DIR + '/Ansjer/file/alipay/alipay_private_2048.pem').read() # alipay_public_key_string = open(BASE_DIR + '/Ansjer/file/alipay/alipay_public_2048.pem').read() app_private_key_string = open(BASE_DIR + '/Ansjer/file/alipay/zosi_alipay_private_2048.pem').read() alipay_public_key_string = open(BASE_DIR + '/Ansjer/file/alipay/zosi_alipay_public_2048.pem').read() alipay = AliPay( # appid="2016092200569234", appid="2019041663958142", app_notify_url=None, # the default notify path app_private_key_string=app_private_key_string, alipay_public_key_string=alipay_public_key_string, sign_type="RSA2", # RSA or RSA2 debug=False # False by default ) # check order status print("now sleep 3s") # time.sleep(3) result = alipay.api_alipay_trade_query(out_trade_no="20190424085757859937") if result.get("trade_status", "") == "TRADE_SUCCESS": paid = True print(paid) return response.json(0) else: print("not paid...") return response.json(404) def do_fcm_push(self, request): rg_id = request.GET.get('rg_id', '') serverKey = request.GET.get('serverKey', '') push_type = request.GET.get('push_type', None) # Send to single device. from pyfcm import FCMNotification # OR initialize with proxies # proxy_dict = { # "http": "http://127.0.0.1", # "https": "http://127.0.0.1", # } # push_service = FCMNotification(api_key="", proxy_dict=proxy_dict) push_service = FCMNotification(api_key=serverKey) # Your api-key can be gotten from: https://console.firebase.google.com/project//settings/cloudmessaging registration_id = rg_id message_title = "Zosi Smart(xxxx 007)" now_time = int(time.time()) data = {"alert": "Motion ", "event_time": now_time, "event_type": "51", "msg": "", "received_at": now_time, "sound": "sound.aif", "uid": "98UXAA8BRPA35VAL111A", "zpush": "1"} # message_body = json.dumps(data) message_body = '警告:Motion Channel:1 日期:{tt}'.format( tt=str(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())))) print(message_body) from var_dump import var_dump if push_type == '1': var_dump('1111111') result = push_service.single_device_data_message(registration_id=registration_id, data_message=data) elif push_type == '2': var_dump('22222222') result = push_service.notify_single_device(registration_id=registration_id, message_title=message_title, message_body=message_body, data_message=data) else: var_dump('333333') result = push_service.notify_single_device(registration_id=registration_id, message_title=message_title, message_body=message_body) from var_dump import var_dump var_dump(result) response = ResponseObject() return response.json(0, result) def do_get_putOss_url(self, request_dict): import oss2 obj_name = request_dict.get('obj_name', '') # 阿里云主账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM账号进行API访问或日常运维,请登录 https://ram.console.aliyun.com 创建RAM账号。 auth = oss2.Auth('LTAIyMkGfEdogyL9', '71uIjpsqVOmF7DAITRyRuc259jHOjO') # Endpoint以杭州为例,其它Region请按实际情况填写。 bucket = oss2.Bucket(auth, 'oss-cn-shenzhen.aliyuncs.com', 'apg') # 设置此签名URL在60秒内有效。 url = bucket.sign_url('PUT', obj_name, 7200) response = ResponseObject() return response.json(0, url) def do_gcm_push(self, request): import json import requests response = ResponseObject() rg_id = request.GET.get('rg_id', '') serverKey = request.GET.get('serverKey', '') if not rg_id or not serverKey: return response.json(444) now_time = int(time.time()) data = {"alert": "Motion ", "event_time": now_time, "event_type": "51", "msg": "", "received_at": now_time, "sound": "sound.aif", "uid": "XFDJUHUIOKJHYTGSFFDR", "zpush": "1"} json_data = { "collapse_key": "WhatYouWant", "data": data, "delay_while_idle": False, "time_to_live": 3600, "registration_ids": [rg_id] } url = 'https://android.googleapis.com/gcm/send' data = json.dumps(json_data).encode('utf-8') headers = {'Content-Type': 'application/json', 'Authorization': 'key=%s' % serverKey} req = requests.post(url, data, headers=headers) response = ResponseObject() return response.json(0) def jgPush(self, request): response = ResponseObject() devToken = request.GET.get('devToken', '') app_key = request.GET.get('app_key', '') master_secret = request.GET.get('master_secret', '') import jpush as jpush # 此处换成各自的app_key和master_secret _jpush = jpush.JPush(app_key, master_secret) push = _jpush.create_push() # if you set the logging level to "DEBUG",it will show the debug logging. _jpush.set_logging("DEBUG") # push.audience = jpush.all_ n_time = int(time.time()) event_type = 51 push.audience = jpush.registration_id(devToken) push_data = {"alert": "Motion ", "event_time": n_time, "event_type": event_type, "msg": "", "received_at": n_time, "sound": "sound.aif", "uid": 'xoxoxoxoxoxoxoxoxoxo', "zpush": "1"} android = jpush.android(alert="Hello, Android msg", priority=1, style=1, alert_type=1, big_text='ssssssssssssssssss', extras=push_data,title='fffff') push.notification = jpush.notification(android=android) push.platform = jpush.all_ try: push.send() except Exception as e: print("Exception") return response.json(10, repr(e)) return response.json(0) def post(self, request, *args, **kwargs): response = ResponseObject() data = request.POST.dict() signature = data["sign"] data.pop('sign') print(json.dumps(data)) print(signature) # verify app_private_key_string = open(BASE_DIR + '/Controller/alipay_private_2048.pem').read() alipay_public_key_string = open(BASE_DIR + '/Controller/alipay_public_2048.pem').read() alipay = AliPay( appid="2016092200569234", app_notify_url=None, # the default notify path app_private_key_string=app_private_key_string, alipay_public_key_string=alipay_public_key_string, sign_type="RSA2", # RSA or RSA2 debug=False # False by default ) success = alipay.verify(data, signature) if success and data["trade_status"] in ("TRADE_SUCCESS", "TRADE_FINISHED"): print("trade succeed") return response.json(0, signature) # 修改 资源改变 def put(self, request): response = ResponseObject() return response.json(0, '') # 修改 属性改变 def PATCH(self, request): response = ResponseObject() return response.json(0) # 删除 def delete(self, request): response = ResponseObject() return response.json(0) def validation(self, request_dict, *args, **kwargs): response = ResponseObject() return response.json(0)