123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339 |
- # @Author : Rocky
- # @File : AlexaController.py
- # @Time : 2023/12/25 10:46
- import time
- import requests
- from django.views import View
- from Model.models import AlexaOauth
- from Object.ResponseObject import ResponseObject
- from Object.TokenObject import TokenObject
- from Ansjer.config import CONFIG_INFO, CONFIG_TEST, CONFIG_EUR
- # 域名
- ALEXA_DOMAIN = 'smart.loocam2.com'
- AMAZON_API_DOMAIN = 'api.amazon.com'
- UPDATE_TOKEN_URL = 'https://{}/appToApp/oa2/updateAmazonToken'.format(ALEXA_DOMAIN)
- # Alexa loocam skill配置信息
- # https://developer.amazon.com/alexa/console/ask
- # 开发中: development, 已上线: live
- LOOCAM_SKILL_STAGE = 'development' if CONFIG_INFO == CONFIG_TEST else 'live'
- LOOCAM_SKILL_ASIN = 'B0C94Q7H1L'
- LOOCAM_SKILL_ID = 'amzn1.ask.skill.ff5a5074-7ec7-442b-979b-cb57095f7a94'
- LOOCAM_CLIENT_ID = 'amzn1.application-oa2-client.98a01914518743e481d51115144dafb0'
- LOOCAM_CLIENT_SECRET = '43353cac67670aefd64a5f95309754ddd6bcfe8a087cc3cad1348b626f64b132'
- class AppToAppView(View):
- def get(self, request, *args, **kwargs):
- request.encoding = 'utf-8'
- operation = kwargs.get('operation')
- return self.validation(request.GET, operation, request)
- def post(self, request, *args, **kwargs):
- request.encoding = 'utf-8'
- operation = kwargs.get('operation')
- return self.validation(request.POST, operation, request)
- def validation(self, request_dict, operation, request):
- response = ResponseObject()
- if operation == 'updateToken': # 更新token
- return self.update_token(request_dict, response)
- token = TokenObject(request.META.get('HTTP_AUTHORIZATION'))
- if token.code != 0:
- return response.json(token.code)
- user_id = token.userID
- if operation == 'getAlexaAppURLAndLWAFallbackURL': # 获取Alexa App和LWA fallback URL
- return self.get_alexa_app_url_and_lwa_fallback_url(response)
- elif operation == 'accountLinkWithAmazonAuthorizationCode': # 通过亚马逊授权码连接账号
- return self.account_link_with_amazon_authorization_code(user_id, request_dict, response)
- elif operation == 'getAccountLinkingAndSkillStatus': # 获取账号连接和skill状态
- return self.get_account_linking_and_skill_status(user_id, response)
- elif operation == 'disableSkillAndUnlinkAccount': # 取消连接skill和账号
- return self.disable_skill_and_unlink_account(user_id, response)
- elif operation == 'getSkillPageURL': # 获取skill页面URL(取消链接)
- return self.get_skill_page_url(response)
- elif operation == 'getAlexaAppUrl': # 获取重定向至Alexa app的url
- return self.get_alexa_app_url(user_id, request_dict, response)
- else:
- return response.json(414)
- @staticmethod
- def update_token(request_dict, response):
- user_id = request_dict.get('user_id', None)
- amazon_access_token = request_dict.get('access_token', None)
- amazon_refresh_token = request_dict.get('refresh_token', None)
- if not all([user_id, amazon_access_token, amazon_refresh_token]):
- return response.json(444)
- now_time = int(time.time())
- try:
- # 保存令牌数据
- alexa_oauth_qs = AlexaOauth.objects.filter(user_id=user_id)
- if alexa_oauth_qs.exists():
- alexa_oauth_qs.update(amazon_access_token=amazon_access_token,
- amazon_refresh_token=amazon_refresh_token,
- update_time=now_time)
- else:
- AlexaOauth.objects.create(user_id=user_id, amazon_access_token=amazon_access_token,
- amazon_refresh_token=amazon_refresh_token, create_time=now_time,
- update_time=now_time)
- return response.json(0)
- except Exception as e:
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def get_alexa_app_url_and_lwa_fallback_url(response):
- skill_stage = LOOCAM_SKILL_STAGE
- redirect_uri = 'https://{}'.format(ALEXA_DOMAIN)
- alexa_app_url = 'https://alexa.amazon.com/spa/skill-account-linking-consent?' \
- 'fragment=skill-account-linking-consent&client_id={}&' \
- 'scope=alexa::skills:account_linking&skill_stage={}&response_type=code&' \
- 'redirect_uri={}'.format(LOOCAM_CLIENT_ID, skill_stage, redirect_uri)
- lwa_fallback_url = 'https://www.amazon.com/ap/oa?' \
- 'client_id={}&scope=alexa::skills:account_linking&response_type=code&redirect_uri={}&'.\
- format(LOOCAM_CLIENT_ID, redirect_uri)
- res = {
- 'alexa_app_url': alexa_app_url,
- 'lwa_fallback_url': lwa_fallback_url
- }
- return response.json(0, res)
- @classmethod
- def account_link_with_amazon_authorization_code(cls, user_id, request_dict, response):
- amazon_authorization_code = request_dict.get('amazon_authorization_code', None)
- if not amazon_authorization_code:
- return response.json(444)
- now_time = int(time.time())
- # 获取亚马逊访问令牌
- # https://developer.amazon.com/zh/docs/login-with-amazon/authorization-code-grant.html#access-token-request
- url = 'https://{}/auth/o2/token'.format(AMAZON_API_DOMAIN)
- redirect_uri = 'https://{}'.format(ALEXA_DOMAIN)
- data = {
- 'grant_type': 'authorization_code',
- 'code': amazon_authorization_code,
- 'client_id': LOOCAM_CLIENT_ID,
- 'client_secret': LOOCAM_CLIENT_SECRET,
- 'redirect_uri': redirect_uri
- }
- try:
- r = requests.post(url=url, data=data, timeout=10)
- assert r.status_code == 200
- res_data = eval(r.content)
- assert res_data.get('access_token')
- assert res_data.get('refresh_token')
- amazon_access_token = res_data['access_token']
- amazon_refresh_token = res_data['refresh_token']
- # 更新Alexa服务器令牌
- data = {
- 'user_id': user_id,
- 'access_token': amazon_access_token,
- 'refresh_token': amazon_refresh_token
- }
- r = requests.post(url=UPDATE_TOKEN_URL, data=data, timeout=10)
- assert r.status_code == 200
- # 保存令牌数据
- alexa_oauth_qs = AlexaOauth.objects.filter(user_id=user_id)
- if alexa_oauth_qs.exists():
- alexa_oauth_qs.update(amazon_access_token=amazon_access_token,
- amazon_refresh_token=amazon_refresh_token,
- update_time=now_time)
- else:
- AlexaOauth.objects.create(user_id=user_id, amazon_access_token=amazon_access_token,
- amazon_refresh_token=amazon_refresh_token, create_time=now_time,
- update_time=now_time)
- res = cls.get_auth_code_and_token(user_id)
- user_authorization_code = res['res']['user_authorization_code']
- data = {
- "stage": LOOCAM_SKILL_STAGE,
- "accountLinkRequest": {
- "redirectUri": redirect_uri,
- "authCode": user_authorization_code,
- "type": "AUTH_CODE"
- }
- }
- # 请求连接skill
- # https://developer.amazon.com/en-US/docs/alexa/smapi/skill-enablement.html
- headers = {
- 'Content-Type': 'application/json',
- 'Authorization': 'Bearer {}'.format(amazon_access_token)
- }
- alexa_api_endpoint_list = ['api.amazonalexa.com', 'api.eu.amazonalexa.com', 'api.fe.amazonalexa.com']
- for alexa_api_endpoint in alexa_api_endpoint_list:
- url = 'https://{}/v1/users/~current/skills/{}/enablement'.format(alexa_api_endpoint, LOOCAM_SKILL_ID)
- r = requests.post(headers=headers, url=url, json=data, timeout=30)
- if r.status_code == 201:
- AlexaOauth.objects.filter(user_id=user_id).\
- update(alexa_api_endpoint=alexa_api_endpoint, link_status=1)
- res_data = eval(r.content)
- return response.json(0, res_data)
- return response.json(0)
- except Exception as e:
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @classmethod
- def get_account_linking_and_skill_status(cls, user_id, response):
- # 未连接状态响应数据
- res_data = {
- 'accountLink': {
- 'status': 'NOT_LINKED'
- },
- 'status': 'DISABLED'
- }
- try:
- alexa_oauth_qs = AlexaOauth.objects.filter(user_id=user_id).values('link_status')
- if alexa_oauth_qs.exists():
- link_status = alexa_oauth_qs[0]['link_status']
- # 连接状态为1,通过api获取状态
- if link_status == 1:
- request_method = 'get'
- try:
- r = cls.refresh_access_token(user_id, request_method)
- except AssertionError:
- alexa_oauth_qs.update(link_status=0)
- else:
- res_data = eval(r.content)
- return response.json(0, res_data)
- except Exception as e:
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @classmethod
- def disable_skill_and_unlink_account(cls, user_id, response):
- request_method = 'delete'
- try:
- r = cls.refresh_access_token(user_id, request_method)
- if r is not None:
- # 2xx响应状态码为成功
- assert str(r.status_code)[:1] == '2'
- AlexaOauth.objects.filter(user_id=user_id).update(link_status=0)
- return response.json(0)
- except Exception as e:
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def get_skill_page_url(response):
- skill_page_url = 'https://alexa.amazon.com/spa/index.html#skills/dp/{}'.format(LOOCAM_SKILL_ASIN)
- lwa_page_url = 'https://www.amazon.com/dp/{}'.format(LOOCAM_SKILL_ASIN)
- res = {
- 'skill_page_url': skill_page_url,
- 'lwa_page_url': lwa_page_url
- }
- return response.json(0, res)
- @staticmethod
- def refresh_access_token(user_id, request_method):
- if request_method not in ['get', 'delete']:
- return
- alexa_oauth_qs = AlexaOauth.objects.filter(user_id=user_id).\
- values('alexa_api_endpoint', 'amazon_refresh_token')
- if not alexa_oauth_qs:
- return
- now_time = int(time.time())
- # 使用刷新令牌获取新的访问令牌
- # https://developer.amazon.com/zh/docs/login-with-amazon/authorization-code-grant.html#using-refresh-tokens
- alexa_api_endpoint = alexa_oauth_qs[0]['alexa_api_endpoint']
- amazon_refresh_token = alexa_oauth_qs[0]['amazon_refresh_token']
- url = 'https://{}/auth/o2/token'.format(AMAZON_API_DOMAIN)
- data = {
- 'grant_type': 'refresh_token',
- 'refresh_token': amazon_refresh_token,
- 'client_id': LOOCAM_CLIENT_ID,
- 'client_secret': LOOCAM_CLIENT_SECRET
- }
- r = requests.post(url=url, data=data, timeout=10)
- assert r.status_code == 200
- res_data = eval(r.content)
- assert res_data.get('access_token')
- assert res_data.get('refresh_token')
- new_access_token = res_data['access_token']
- new_refresh_token = res_data['refresh_token']
- # 更新Alexa服务器令牌
- data = {
- 'user_id': user_id,
- 'access_token': new_access_token,
- 'refresh_token': new_refresh_token
- }
- r = requests.post(url=UPDATE_TOKEN_URL, data=data, timeout=10)
- assert r.status_code == 200
- alexa_oauth_qs.update(
- amazon_access_token=new_access_token, amazon_refresh_token=new_refresh_token, update_time=now_time)
- headers = {
- 'Content-Type': 'application/json',
- 'Authorization': 'Bearer {}'.format(new_access_token)
- }
- url = 'https://{}/v1/users/~current/skills/{}/enablement'.format(alexa_api_endpoint, LOOCAM_SKILL_ID)
- if request_method == 'get':
- r = requests.get(headers=headers, url=url, timeout=30)
- elif request_method == 'delete':
- r = requests.delete(headers=headers, url=url, timeout=30)
- return r
- @classmethod
- def get_alexa_app_url(cls, user_id, request_dict, response):
- response_type = request_dict.get('response_type', None)
- operate = request_dict.get('operate', None)
- state = request_dict.get('state', None)
- redirect_uri = request_dict.get('redirect_uri', None)
- if not all([state, redirect_uri]) or response_type not in ['code', 'token'] or operate not in ['accept', 'deny']:
- return response.json(444)
- try:
- redirect_uri += '?state={}'.format(state)
- if operate == 'accept':
- redirect_uri += '&source=app'
- res = cls.get_auth_code_and_token(user_id)
- if response_type == 'code':
- # 获取用户授权码
- user_authorization_code = res['res']['user_authorization_code']
- redirect_uri += '&code={}'.format(user_authorization_code)
- elif response_type == 'token':
- # 获取令牌
- refresh_token = res['res']['refresh_token']
- redirect_uri += '&token={}&token_type=Bearer&expiration_time=3600'.format(refresh_token)
- AlexaOauth.objects.filter(user_id=user_id).update(link_status=1)
- else:
- AlexaOauth.objects.filter(user_id=user_id).update(link_status=0)
- redirect_uri += '&error=access_denied&error_description=The%20user%20denied%20the%20request.%20'
- res = {
- 'redirect_uri': redirect_uri
- }
- return response.json(0, res)
- except Exception as e:
- return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
- @staticmethod
- def get_auth_code_and_token(user_id):
- """
- 请求Alexa项目接口获取验证码和令牌
- @param user_id: 用户id
- @return:
- """
- base_url = 'https://{}'.format(ALEXA_DOMAIN)
- url = base_url + '/appToApp/oa2/getAuthCodeAndToken'
- region_code = 'EU'
- if CONFIG_INFO != CONFIG_EUR:
- region_code = 'US'
- params = {
- 'user_id': user_id,
- 'region_code': region_code
- }
- r = requests.get(url=url, params=params, timeout=10)
- assert r.status_code == 200
- return eval(r.content)
|