| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487 | import refrom datetime import datetimeimport concurrent.futuresimport pytzimport requestsfrom django.db.models import Q, Ffrom django.views import Viewfrom Crypto.Cipher import AESfrom Crypto.Util.Padding import padfrom django.contrib.auth.hashers import check_password, make_passwordimport concurrent.futuresfrom Controller.CheckUserData import DataValidfrom Model.models import Device_User, CountryModel, LanguageModel, CountryLanguageModelfrom Object.RedisObject import RedisObjectfrom Object.ResponseObject import ResponseObjectimport base64import hmacimport hashlibimport osimport jsonfrom Ansjer.config import SHOPIFY_CONFIG, CONFIG_INFO, CONFIG_EUR, CONFIG_USfrom Service.CommonService import CommonServiceclass ShopifyMultipass:    @staticmethod    def generate_multipass_token(secret, customer_data):        """        使用指定的密钥对加密并签名JSON数据,返回Base64编码的Multipass令牌        """        # 第一步:将客户数据转换为JSON格式        json_data = json.dumps(customer_data)        # 第二步:生成加密密钥和签名密钥        hash_digest = hashlib.sha256(secret.encode()).digest()        encryption_key = hash_digest[:16]  # 128位加密密钥        signature_key = hash_digest[16:32]  # 128位签名密钥        # 第三步:加密JSON数据        iv = os.urandom(16)  # 随机初始化向量        cipher = AES.new(encryption_key, AES.MODE_CBC, iv)        ciphertext = cipher.encrypt(pad(json_data.encode(), AES.block_size))        # 第四步:签名加密数据        data_to_sign = iv + ciphertext        signature = hmac.new(signature_key, data_to_sign, hashlib.sha256).digest()        # 第五步:Base64编码        multipass_token = base64.urlsafe_b64encode(iv + ciphertext + signature).decode()        return multipass_token    @staticmethod    def search_customer_by_email(store_name, access_token, email):        # 设置请求URL        url = f"https://{store_name}.myshopify.com/admin/api/2024-10/customers/search.json"        params = {            "query": f"email:{email}"        }        # 设置请求头        headers = {            "X-Shopify-Access-Token": access_token,        }        # 发送GET请求        response = requests.get(url, headers=headers, params=params)        # 返回响应的JSON数据        return response.json()    @staticmethod    def get_timezone_by_country(iso2: str):        iso2 = iso2.lower()        timezone_map = {            "us": "America/New_York",  # 美国时区(例如:纽约)            "jp": "Asia/Tokyo",  # 日本时区            "de": "Europe/Berlin",  # 德国时区            "uk": "Europe/London",  # 英国时区            "gb": "Europe/London",  # 英国时区            "eu": "Europe/Paris",  # 欧盟时区(默认设为法国巴黎)        }        # 获取当前时间并格式化时间戳        now = datetime.now(pytz.timezone(timezone_map[iso2]))        timestamp = now.strftime('%Y-%m-%dT%H:%M:%S%z')        timestamp = timestamp[:-2] + ':' + timestamp[-2:]        return timestampclass ShopifyView(View):    def get(self, request, *args, **kwargs):        request.encoding = 'utf-8'        operation = kwargs.get('operation')        request_dict = request.GET        return self.validation(request, request_dict, operation)    def post(self, request, *args, **kwargs):        request.encoding = 'utf-8'        operation = kwargs.get('operation')        request_dict = request.POST        return self.validation(request, request_dict, operation)    def validation(self, request, request_dict, operation):        language = request_dict.get('language', 'cn')        response = ResponseObject(language, "pc")        if operation == 'shopifyLogin':            return self.shopify_login(request_dict, response)        elif operation == 'shopifyRegister':            return self.shopify_register(request_dict, response)        # 查询FAPP注册账号情况        elif operation == 'searchCustomer':            return self.search_customer(request_dict, response)        # 官网检测账号接口        elif operation == 'searchAccount':            return self.search_account(request_dict, response)        elif operation == 'getCountryDomainList':            return self.get_country_domain_list(request_dict, response)        # 忘记密码        elif operation == 'shopifyChangePassword':            return self.shopify_change_password(request_dict, response)        elif operation == 'verifyAuthcode':            return self.verify_authcode(request_dict, response)        else:            return response.json(414)    def shopify_login(self, request_dict, response):        email = request_dict.get("email", None)        password = request_dict.get("password", None)        account_iso2 = request_dict.get("accountCountry", None)        shopify_country = request_dict.get("shopifyCountry", "")        if not all([email, password, account_iso2]):            return response.json(444)        user_qs = Device_User.objects.filter(Q(username=email) | Q(userEmail=email))        if not user_qs.exists():            return response.json(104)        users = user_qs.values(            'role__rid', 'role__roleName', 'userID', 'NickName',            'username', 'userEmail', 'phone', 'password', 'userIconPath'        )[0]        if not check_password(password, users['password']):            return response.json(111)        # 根据条件选择配置键        shopify_domain = shopify_country        iso2 = account_iso2        redirect_url = self.shopify_login_url(shopify_domain, iso2, email)        if redirect_url:            return response.json(0, redirect_url)        return response.json(444)    def shopify_register(self, request_dict, response):        email = request_dict.get("email", None)        country_code = request_dict.get("countryCode", None)        password = request_dict.get("password", None)        authcode = request_dict.get("authCode", None)        if not all([email, password, authcode, country_code]):            return response.json(444)        data_valid = DataValid()        if data_valid.email_validate(email) is not True:            return response.json(105)        re_flag = data_valid.password_validate(password)        has_upper = bool(re.search(r"[A-Z]", password))  # 大写字母        has_lower = bool(re.search(r"[a-z]", password))  # 小写字母        has_digit = bool(re.search(r"[0-9]", password))  # 数字        has_special = bool(re.search(r"[!@#$%^&*()_+\-=\[\]{}|;:'\",.<>?/]", password))  # 特殊字符        # 至少包含任意两类字符        categories = sum([has_upper, has_lower, has_digit, has_special])        if re_flag is not True and categories > 2:            return response.json(109)        reds = RedisObject()        identifyingCode = reds.get_data(key=email + '_identifyingCode')        # 判断验证码是否过期        if identifyingCode is False:            return response.json(120)        # 验证码是否正确        if authcode != identifyingCode:            return response.json(121)        # 注册        if Device_User.objects.filter(Q(username=email) | Q(userEmail=email)).exists():            return response.json(103)        # 创建用户        password = make_password(password)        new_userID = CommonService.getUserID(μs=False, setOTAID=True)        country_code = country_code.upper()        countr_qs = CountryModel.objects.filter(country_code=country_code).values("id")        if countr_qs.exists():            region_country = countr_qs[0]['id']        else:            return response.json(173)        user_data = {            "username": email,            "NickName": email,            "userEmail": email,            "password": password,            "userID": new_userID,            "is_active": True,            "user_isValid": True,            "region_country": region_country,            "region_status": 1        }        Device_User.objects.create(**user_data)        reds.del_data(key=email + '_identifyingCode')        redirect_url = self.shopify_login_url("", country_code, email)        return response.json(0, redirect_url)    def search_account(self, request_dict, response):        email = request_dict.get("email")        if not email:            return response.json(444)        store_configs = [            ("us", SHOPIFY_CONFIG["us_store_name"], SHOPIFY_CONFIG["us_token"]),            ("eu", SHOPIFY_CONFIG["eu_store_name"], SHOPIFY_CONFIG["eu_token"]),            ("de", SHOPIFY_CONFIG["de_store_name"], SHOPIFY_CONFIG["de_token"]),            ("uk", SHOPIFY_CONFIG["uk_store_name"], SHOPIFY_CONFIG["uk_token"]),            ("jp", SHOPIFY_CONFIG["jp_store_name"], SHOPIFY_CONFIG["jp_token"]),        ]        def search_customer(store_name, token):            return ShopifyMultipass.search_customer_by_email(store_name, token, email)        try:            shopify_results = {}            with concurrent.futures.ThreadPoolExecutor() as executor:                future_to_country = {executor.submit(search_customer, store_name, token): country for                                     country, store_name, token in store_configs}                for future in concurrent.futures.as_completed(future_to_country):                    country = future_to_country[future]                    shopify_results[country] = future.result()            shopify_country = next((country for country, result in shopify_results.items() if result["customers"]), "")            account_country = self.call_search_customer(email)            servers_continent = "us" if account_country["us"] else "eu" if account_country["eu"] else ""            if servers_continent:                account_region_list = []                if account_country.get("us"):                    account_region_list.append({                        "url": "https://www.dvema.com/shopify/shopifyLogin",                        "accountCountry": account_country["us"].lower(),                        "shopifyCountry": shopify_country                    })                if account_country.get("eu"):                    account_region_list.append({                        "url": "https://api.zositeche.com/shopify/shopifyLogin",                        "accountCountry": account_country["eu"].lower(),                        "shopifyCountry": shopify_country                    })                return response.json(0, {"accountStatus": 3, "accountRegionList": account_region_list})            elif shopify_country:                if shopify_country == "eu":                    url = "https://eu.zositech.com/account/login"                elif shopify_country == "jp":                    url = "https://www.zosi.jp/account/login"                elif shopify_country == "de":                    url = "https://www.zositech.de/account/login"                elif shopify_country == "uk":                    url = "https://www.zositech.co.uk/account/login"                else:                    url = "https://www.zositech.com/account/login"                return response.json(0, {"accountStatus": 2, "url": url})            else:                url = "注册链接"                return response.json(0, {"accountStatus": 1, "url": url})        except Exception as e:            print(e)            return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))    @staticmethod    def call_search_customer(email):        urls = {            "us": "https://www.dvema.com/shopify/searchCustomer",            "eu": "https://api.zositeche.com/shopify/searchCustomer"        }        params = {"email": email}  # Use the provided email parameter        customer_region = {}        def fetch_customer(region, url):            try:                response = requests.get(url=url, params=params)                response.raise_for_status()  # Raise an error for bad responses                customer_country = response.json()["data"]                if customer_country == "":                    return region, None                return region, customer_country            except requests.RequestException:                return region, None        with concurrent.futures.ThreadPoolExecutor() as executor:            future_to_region = {executor.submit(fetch_customer, region, url): region for region, url in urls.items()}            for future in concurrent.futures.as_completed(future_to_region):                region, customer_country = future.result()                customer_region[region] = customer_country        return customer_region    @staticmethod    def search_customer(request_dict, response):        email = request_dict.get("email")        user_qs = Device_User.objects.filter(Q(username=email) | Q(userEmail=email))        if not user_qs.exists():            return response.json(104)        user_region_id = user_qs.values_list('region_country', flat=True).first()        country_code = CountryModel.objects.filter(id=user_region_id).values_list("country_code", flat=True).first()        if country_code:            return response.json(0, country_code)        elif CONFIG_INFO == CONFIG_EUR:            return response.json(0, "uk")        elif CONFIG_INFO == CONFIG_US:            return response.json(0, "us")        return response.json(173)    @staticmethod    def get_country_domain_list(request_dict, response):        lang = request_dict.get('lang', 'en')        time_stamp = request_dict.get('time_stamp', None)        time_stamp_token = request_dict.get('time_stamp_token', None)        if not all([time_stamp, time_stamp_token]):            return response.json(444)        try:            # 时间戳token校验            if not CommonService.check_time_stamp_token(time_stamp_token, time_stamp):                return response.json(13)            lang_qs = LanguageModel.objects.filter(lang=lang)            language = lang_qs[0]            country_qs = CountryLanguageModel.objects.filter(language_id=language.id)            country_qs = country_qs.annotate(api=F('country__region__zosi_api'), country_code=F('country__country_code'))            country_qs = country_qs.values('country_id', 'country_name', 'country_code', 'api').order_by('country_name')            country_list = []            for country in country_qs:                country['api'] = country['api']                country_list.append(country)            return response.json(0, country_list)        except Exception as e:            print(e)            return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))    @staticmethod    def shopify_change_password(request_dict, response):        email = request_dict.get("email", None)        password = request_dict.get("password", None)        authcode = request_dict.get("authCode", None)        if not all([email, password, authcode]):            return response.json(444)        try:            data_valid = DataValid()            if data_valid.email_validate(email) is not True:                return response.json(105)            re_flag = data_valid.password_validate(password)            has_upper = bool(re.search(r"[A-Z]", password))  # 大写字母            has_lower = bool(re.search(r"[a-z]", password))  # 小写字母            has_digit = bool(re.search(r"[0-9]", password))  # 数字            has_special = bool(re.search(r"[!@#$%^&*()_+\-=\[\]{}|;:'\",.<>?/]", password))  # 特殊字符            # 至少包含任意两类字符            categories = sum([has_upper, has_lower, has_digit, has_special])            if re_flag is not True and categories > 2:                return response.json(109)            reds = RedisObject()            identifyingCode = reds.get_data(key=email + '_forgetPwdResetCode')            # 判断验证码是否过期            if identifyingCode is False:                return response.json(120)            # 验证码是否正确            if authcode != identifyingCode:                return response.json(121)            user_qs = Device_User.objects.filter(Q(username=email) | Q(userEmail=email))            if not user_qs.exists():                return response.json(173)            password = make_password(password)            user_qs.update(password=password)            reds.del_data(key=email + '_forgetPwdResetCode')            return response.json(0)        except Exception as e:            return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))    @staticmethod    def verify_authcode(request_dict, response):        """        在修改密码的时候验证验证码        """        email = request_dict.get("email", None)        authcode = request_dict.get("authCode", None)        code_type = request_dict.get("codeType", None)        if not all([email, authcode, code_type]):            return response.json(444)        try:            code_type = int(code_type)            if code_type == 1:                reds_key = "_identifyingCode"                user_qs = Device_User.objects.filter(Q(username=email) | Q(userEmail=email))                if user_qs.exists():                    return response.json(174)            elif code_type == 2:                reds_key = "_forgetPwdResetCode"                user_qs = Device_User.objects.filter(Q(username=email) | Q(userEmail=email))                if not user_qs.exists():                    return response.json(173)            else:                return response.json(444)            reds = RedisObject()            identifyingCode = reds.get_data(key=email + reds_key)            if identifyingCode is False:                return response.json(120)            # 验证码是否正确            if authcode != identifyingCode:                return response.json(121)            return response.json(0)        except Exception as e:            return response.json(500, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))    @staticmethod    def shopify_login_url(shopify_domain: str, iso2: str, email: str):        iso2 = iso2.lower()        shopify_domain = shopify_domain.lower()        shopify_country_to_use = shopify_domain or iso2        timestamp = ShopifyMultipass.get_timezone_by_country(shopify_country_to_use)        customer_data = {            "email": email,            "created_at": timestamp,        }        if shopify_domain:            secret_key = f"{shopify_domain}_multipass_secret"            multipass_secret = SHOPIFY_CONFIG[secret_key]            if shopify_domain == "eu":                token = ShopifyMultipass.generate_multipass_token(multipass_secret, customer_data)                redirect_url = f"https://eu.zositech.com/account/login/multipass/{token}"            elif shopify_domain == "uk":                token = ShopifyMultipass.generate_multipass_token(multipass_secret, customer_data)                redirect_url = f"https://www.zositech.co.uk/account/login/multipass/{token}"            elif shopify_domain == "us":                token = ShopifyMultipass.generate_multipass_token(multipass_secret, customer_data)                redirect_url = f"https://www.zositech.com/account/login/multipass/{token}"            else:                token = ShopifyMultipass.generate_multipass_token(multipass_secret, customer_data)                redirect_url = f"https://www.zositech.{shopify_domain}/account/login/multipass/{token}"            return redirect_url        elif iso2 in ["de", "jp"]:            secret_key = f"{iso2}_multipass_secret"            multipass_secret = SHOPIFY_CONFIG[secret_key]            token = ShopifyMultipass.generate_multipass_token(multipass_secret, customer_data)            redirect_url = f"https://www.zositech.{iso2}/account/login/multipass/{token}"        elif iso2 == "gb":            secret_key = f"uk_multipass_secret"            multipass_secret = SHOPIFY_CONFIG[secret_key]            token = ShopifyMultipass.generate_multipass_token(multipass_secret, customer_data)            redirect_url = f"https://www.zositech.co.uk/account/login/multipass/{token}"        elif CONFIG_INFO == CONFIG_EUR:            secret_key = "eu_multipass_secret"            multipass_secret = SHOPIFY_CONFIG[secret_key]            token = ShopifyMultipass.generate_multipass_token(multipass_secret, customer_data)            redirect_url = f"https://eu.zositech.com/account/login/multipass/{token}"        elif CONFIG_INFO == CONFIG_US:            secret_key = "us_multipass_secret"            multipass_secret = SHOPIFY_CONFIG[secret_key]            token = ShopifyMultipass.generate_multipass_token(multipass_secret, customer_data)            redirect_url = f"https://www.zositech.com/account/login/multipass/{token}"        else:            return ""        return redirect_url
 |