# @Author : Rocky # @File : AlexaController.py # @Time : 2023/12/25 10:46 import time import requests from django.views import View from Model.models import AlexaOauth from Object.ResponseObject import ResponseObject from Object.TokenObject import TokenObject from Ansjer.config import CONFIG_INFO, CONFIG_TEST, CONFIG_EUR # 域名 ALEXA_DOMAIN = 'smart.loocam2.com' AMAZON_API_DOMAIN = 'api.amazon.com' UPDATE_TOKEN_URL = 'https://{}/appToApp/oa2/updateAmazonToken'.format(ALEXA_DOMAIN) # Alexa loocam skill配置信息 # https://developer.amazon.com/alexa/console/ask # 开发中: development, 已上线: live LOOCAM_SKILL_STAGE = 'development' if CONFIG_INFO == CONFIG_TEST else 'live' LOOCAM_SKILL_ASIN = 'B0C94Q7H1L' LOOCAM_SKILL_ID = 'amzn1.ask.skill.ff5a5074-7ec7-442b-979b-cb57095f7a94' LOOCAM_CLIENT_ID = 'amzn1.application-oa2-client.98a01914518743e481d51115144dafb0' LOOCAM_CLIENT_SECRET = '43353cac67670aefd64a5f95309754ddd6bcfe8a087cc3cad1348b626f64b132' class AppToAppView(View): def get(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.GET, operation, request) def post(self, request, *args, **kwargs): request.encoding = 'utf-8' operation = kwargs.get('operation') return self.validation(request.POST, operation, request) def validation(self, request_dict, operation, request): response = ResponseObject() if operation == 'updateToken': # 更新token return self.update_token(request_dict, response) token = TokenObject(request.META.get('HTTP_AUTHORIZATION')) if token.code != 0: return response.json(token.code) user_id = token.userID if operation == 'getAlexaAppURLAndLWAFallbackURL': # 获取Alexa App和LWA fallback URL return self.get_alexa_app_url_and_lwa_fallback_url(response) elif operation == 'accountLinkWithAmazonAuthorizationCode': # 通过亚马逊授权码连接账号 return self.account_link_with_amazon_authorization_code(user_id, request_dict, response) elif operation == 'getAccountLinkingAndSkillStatus': # 获取账号连接和skill状态 return self.get_account_linking_and_skill_status(user_id, response) elif operation == 'disableSkillAndUnlinkAccount': # 取消连接skill和账号 return self.disable_skill_and_unlink_account(user_id, response) elif operation == 'getSkillPageURL': # 获取skill页面URL(取消链接) return self.get_skill_page_url(response) elif operation == 'getAlexaAppUrl': # 获取重定向至Alexa app的url return self.get_alexa_app_url(user_id, request_dict, response) else: return response.json(414) @staticmethod def update_token(request_dict, response): user_id = request_dict.get('user_id', None) amazon_access_token = request_dict.get('access_token', None) amazon_refresh_token = request_dict.get('refresh_token', None) if not all([user_id, amazon_access_token, amazon_refresh_token]): return response.json(444) now_time = int(time.time()) try: # 保存令牌数据 alexa_oauth_qs = AlexaOauth.objects.filter(user_id=user_id) if alexa_oauth_qs.exists(): alexa_oauth_qs.update(amazon_access_token=amazon_access_token, amazon_refresh_token=amazon_refresh_token, update_time=now_time) else: AlexaOauth.objects.create(user_id=user_id, amazon_access_token=amazon_access_token, amazon_refresh_token=amazon_refresh_token, create_time=now_time, update_time=now_time) return response.json(0) except Exception as e: return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def get_alexa_app_url_and_lwa_fallback_url(response): skill_stage = LOOCAM_SKILL_STAGE redirect_uri = 'https://{}'.format(ALEXA_DOMAIN) alexa_app_url = 'https://alexa.amazon.com/spa/skill-account-linking-consent?' \ 'fragment=skill-account-linking-consent&client_id={}&' \ 'scope=alexa::skills:account_linking&skill_stage={}&response_type=code&' \ 'redirect_uri={}'.format(LOOCAM_CLIENT_ID, skill_stage, redirect_uri) lwa_fallback_url = 'https://www.amazon.com/ap/oa?' \ 'client_id={}&scope=alexa::skills:account_linking&response_type=code&redirect_uri={}&'.\ format(LOOCAM_CLIENT_ID, redirect_uri) res = { 'alexa_app_url': alexa_app_url, 'lwa_fallback_url': lwa_fallback_url } return response.json(0, res) @classmethod def account_link_with_amazon_authorization_code(cls, user_id, request_dict, response): amazon_authorization_code = request_dict.get('amazon_authorization_code', None) if not amazon_authorization_code: return response.json(444) now_time = int(time.time()) # 获取亚马逊访问令牌 # https://developer.amazon.com/zh/docs/login-with-amazon/authorization-code-grant.html#access-token-request url = 'https://{}/auth/o2/token'.format(AMAZON_API_DOMAIN) redirect_uri = 'https://{}'.format(ALEXA_DOMAIN) data = { 'grant_type': 'authorization_code', 'code': amazon_authorization_code, 'client_id': LOOCAM_CLIENT_ID, 'client_secret': LOOCAM_CLIENT_SECRET, 'redirect_uri': redirect_uri } try: r = requests.post(url=url, data=data, timeout=10) assert r.status_code == 200 res_data = eval(r.content) assert res_data.get('access_token') assert res_data.get('refresh_token') amazon_access_token = res_data['access_token'] amazon_refresh_token = res_data['refresh_token'] # 更新Alexa服务器令牌 data = { 'user_id': user_id, 'access_token': amazon_access_token, 'refresh_token': amazon_refresh_token } r = requests.post(url=UPDATE_TOKEN_URL, data=data, timeout=10) assert r.status_code == 200 # 保存令牌数据 alexa_oauth_qs = AlexaOauth.objects.filter(user_id=user_id) if alexa_oauth_qs.exists(): alexa_oauth_qs.update(amazon_access_token=amazon_access_token, amazon_refresh_token=amazon_refresh_token, update_time=now_time) else: AlexaOauth.objects.create(user_id=user_id, amazon_access_token=amazon_access_token, amazon_refresh_token=amazon_refresh_token, create_time=now_time, update_time=now_time) res = cls.get_auth_code_and_token(user_id) user_authorization_code = res['res']['user_authorization_code'] data = { "stage": LOOCAM_SKILL_STAGE, "accountLinkRequest": { "redirectUri": redirect_uri, "authCode": user_authorization_code, "type": "AUTH_CODE" } } # 请求连接skill # https://developer.amazon.com/en-US/docs/alexa/smapi/skill-enablement.html headers = { 'Content-Type': 'application/json', 'Authorization': 'Bearer {}'.format(amazon_access_token) } alexa_api_endpoint_list = ['api.amazonalexa.com', 'api.eu.amazonalexa.com', 'api.fe.amazonalexa.com'] for alexa_api_endpoint in alexa_api_endpoint_list: url = 'https://{}/v1/users/~current/skills/{}/enablement'.format(alexa_api_endpoint, LOOCAM_SKILL_ID) r = requests.post(headers=headers, url=url, json=data, timeout=30) if r.status_code == 201: AlexaOauth.objects.filter(user_id=user_id).\ update(alexa_api_endpoint=alexa_api_endpoint, link_status=1) res_data = eval(r.content) return response.json(0, res_data) return response.json(0) except Exception as e: return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @classmethod def get_account_linking_and_skill_status(cls, user_id, response): # 未连接状态响应数据 res_data = { 'accountLink': { 'status': 'NOT_LINKED' }, 'status': 'DISABLED' } try: alexa_oauth_qs = AlexaOauth.objects.filter(user_id=user_id).values('link_status') if alexa_oauth_qs.exists(): link_status = alexa_oauth_qs[0]['link_status'] # 连接状态为1,通过api获取状态 if link_status == 1: request_method = 'get' try: r = cls.refresh_access_token(user_id, request_method) except AssertionError: alexa_oauth_qs.update(link_status=0) else: res_data = eval(r.content) return response.json(0, res_data) except Exception as e: return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @classmethod def disable_skill_and_unlink_account(cls, user_id, response): request_method = 'delete' try: r = cls.refresh_access_token(user_id, request_method) if r is not None: # 2xx响应状态码为成功 assert str(r.status_code)[:1] == '2' AlexaOauth.objects.filter(user_id=user_id).update(link_status=0) return response.json(0) except Exception as e: return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def get_skill_page_url(response): skill_page_url = 'https://alexa.amazon.com/spa/index.html#skills/dp/{}'.format(LOOCAM_SKILL_ASIN) lwa_page_url = 'https://www.amazon.com/dp/{}'.format(LOOCAM_SKILL_ASIN) res = { 'skill_page_url': skill_page_url, 'lwa_page_url': lwa_page_url } return response.json(0, res) @staticmethod def refresh_access_token(user_id, request_method): if request_method not in ['get', 'delete']: return alexa_oauth_qs = AlexaOauth.objects.filter(user_id=user_id).\ values('alexa_api_endpoint', 'amazon_refresh_token') if not alexa_oauth_qs: return now_time = int(time.time()) # 使用刷新令牌获取新的访问令牌 # https://developer.amazon.com/zh/docs/login-with-amazon/authorization-code-grant.html#using-refresh-tokens alexa_api_endpoint = alexa_oauth_qs[0]['alexa_api_endpoint'] amazon_refresh_token = alexa_oauth_qs[0]['amazon_refresh_token'] url = 'https://{}/auth/o2/token'.format(AMAZON_API_DOMAIN) data = { 'grant_type': 'refresh_token', 'refresh_token': amazon_refresh_token, 'client_id': LOOCAM_CLIENT_ID, 'client_secret': LOOCAM_CLIENT_SECRET } r = requests.post(url=url, data=data, timeout=10) assert r.status_code == 200 res_data = eval(r.content) assert res_data.get('access_token') assert res_data.get('refresh_token') new_access_token = res_data['access_token'] new_refresh_token = res_data['refresh_token'] # 更新Alexa服务器令牌 data = { 'user_id': user_id, 'access_token': new_access_token, 'refresh_token': new_refresh_token } r = requests.post(url=UPDATE_TOKEN_URL, data=data, timeout=10) assert r.status_code == 200 alexa_oauth_qs.update( amazon_access_token=new_access_token, amazon_refresh_token=new_refresh_token, update_time=now_time) headers = { 'Content-Type': 'application/json', 'Authorization': 'Bearer {}'.format(new_access_token) } url = 'https://{}/v1/users/~current/skills/{}/enablement'.format(alexa_api_endpoint, LOOCAM_SKILL_ID) if request_method == 'get': r = requests.get(headers=headers, url=url, timeout=30) elif request_method == 'delete': r = requests.delete(headers=headers, url=url, timeout=30) return r @classmethod def get_alexa_app_url(cls, user_id, request_dict, response): response_type = request_dict.get('response_type', None) operate = request_dict.get('operate', None) state = request_dict.get('state', None) redirect_uri = request_dict.get('redirect_uri', None) if not all([state, redirect_uri]) or response_type not in ['code', 'token'] or operate not in ['accept', 'deny']: return response.json(444) try: redirect_uri += '?state={}'.format(state) if operate == 'accept': redirect_uri += '&source=app' res = cls.get_auth_code_and_token(user_id) if response_type == 'code': # 获取用户授权码 user_authorization_code = res['res']['user_authorization_code'] redirect_uri += '&code={}'.format(user_authorization_code) elif response_type == 'token': # 获取令牌 refresh_token = res['res']['refresh_token'] redirect_uri += '&token={}&token_type=Bearer&expiration_time=3600'.format(refresh_token) AlexaOauth.objects.filter(user_id=user_id).update(link_status=1) else: AlexaOauth.objects.filter(user_id=user_id).update(link_status=0) redirect_uri += '&error=access_denied&error_description=The%20user%20denied%20the%20request.%20' res = { 'redirect_uri': redirect_uri } return response.json(0, res) except Exception as e: return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) @staticmethod def get_auth_code_and_token(user_id): """ 请求Alexa项目接口获取验证码和令牌 @param user_id: 用户id @return: """ base_url = 'https://{}'.format(ALEXA_DOMAIN) url = base_url + '/appToApp/oa2/getAuthCodeAndToken' region_code = 'EU' if CONFIG_INFO != CONFIG_EUR: region_code = 'US' params = { 'user_id': user_id, 'region_code': region_code } r = requests.get(url=url, params=params, timeout=10) assert r.status_code == 200 return eval(r.content)