Ver código fonte

新增国内鼎芯电信API

zhangdongming 1 ano atrás
pai
commit
1af43bab73

+ 5 - 0
Ansjer/config.py

@@ -495,6 +495,11 @@ DX_TECH_APP_KEY = '19b92a9a5e56429e97f271855ce731b2'
 # 鼎芯接口secret
 DX_TECH_SECRET = '6627548e7cfe4044add7ea72ae3391e0'
 
+# 中国电信
+CT_USER_ID = 'C8EQeMCNNroW0J5jTc3r0NVHU6Ii2NpU'
+CT_PASSWORD = 'Pngt1r1S1cRouEyw'
+CT_SECRET_KEY = 'i7CYm8s2T'
+
 
 # amazon电子邮件
 SES_SENDER = 'rdpublic@ansjer.com'  # 邮箱名

+ 27 - 5
Controller/UnicomCombo/UnicomComboController.py

@@ -26,6 +26,7 @@ from Object.EIoTClubObject import EIoTClubObject
 from Object.Enums.WXOperatorEnum import WXOperatorEnum
 from Object.RedisObject import RedisObject
 from Object.ResponseObject import ResponseObject
+from Object.TelecomObject import TelecomObject
 from Object.TokenObject import TokenObject
 from Object.UnicomObject import UnicomObjeect
 from Object.WXTechObject import WXTechObject
@@ -382,13 +383,14 @@ class UnicomComboView(View):
                 return response.json(0)
             params = {'iccid': iccid, 'serial_no': serial_no, 'updated_time': n_time,
                       'created_time': n_time, 'main_card': sim}
-            if sim == 0:
+            if sim == 0:  # 1:贴片卡,0:拔插卡
                 if cls.is_dingxin_iot(iccid):  # 鼎芯物联卡
                     params['card_type'] = 5
                     params['status'] = 2
                     UnicomDeviceInfo.objects.create(**params)
                     cls.create_operation_log('unicom/api/device-bind',
                                              ip, request_dict, '4G序列号{}新绑定鼎芯{}'.format(serial_no, iccid))
+                    return response.json(0)
                 return response.json(0, '外置卡不保存相关信息{}'.format(serial_no))
 
             if cls.is_unicom_sim(iccid):  # 联通卡
@@ -398,6 +400,11 @@ class UnicomComboView(View):
                                          ip, request_dict,
                                          '4G序列号{}绑定{},testFlowPackage{}'.format(serial_no, iccid, result))
                 return response.json(0)
+            elif cls.is_telecom_sim(iccid):  # 鼎芯电信
+                params['card_type'] = 3
+                params['status'] = 2
+                UnicomDeviceInfo.objects.create(**params)
+                return response.json(0)
             elif cls.is_dingxin_iot(iccid):  # 鼎芯物联卡
                 params['card_type'] = 5  # 国际
                 params['status'] = 2
@@ -412,10 +419,25 @@ class UnicomComboView(View):
                 logger.info('--->设备请求绑定{}验证失败'.format(iccid))
                 return response.json(173)
         except Exception as e:
-            print(e)
-            ex = traceback.format_exc()
-            logger.info('UnicomComboView.iccid_bind_serial_no error{}'.format(ex))
-            return response.json(177, repr(e))
+            LOGGER.info('*****UnicomComboView.iccid_bind_serial_no:serial_number:{}, errLine:{}, errMsg:{}'
+                        .format(serial_no, e.__traceback__.tb_lineno, repr(e)))
+            return response.json(177, 'error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
+
+    @classmethod
+    def is_telecom_sim(cls, iccid):
+        """
+        判断是否电信SIM卡
+        @param iccid: iccid
+        @return: 是否鼎芯电信卡
+        """
+        try:
+            telecom = TelecomObject()
+            result = telecom.query_card_main_status(iccid[0:19])
+            return result['resultCode'] == '0'
+        except Exception as e:
+            LOGGER.info('*****UnicomComboView.is_telecom_sim*****error_line:{}, error_msg:{}'
+                        .format(e.__traceback__.tb_lineno, repr(e)))
+            return False
 
     @classmethod
     def is_unicom_sim(cls, iccid):

+ 426 - 0
Object/TelecomObject.py

@@ -0,0 +1,426 @@
+# -*- encoding: utf-8 -*-
+"""
+@File    : TelecomObject.py
+@Time    : 2024/1/11 10:40
+@Author  : stephen
+@Email   : zhangdongming@asj6.wecom.work
+@Software: PyCharm
+"""
+import json
+
+import requests
+import xmltodict
+
+from Ansjer.config import CT_USER_ID, CT_PASSWORD, CT_SECRET_KEY, LOGGER
+from Object.utils import LocalDateTimeUtil
+from Object.utils.DesUtils import DesUtils
+
+
+class TelecomObject:
+    def __init__(self):
+        self.user_id = CT_USER_ID
+        self.password = CT_PASSWORD
+        self.secret_key = CT_SECRET_KEY
+        self.key1 = CT_SECRET_KEY[0:3]
+        self.key2 = CT_SECRET_KEY[3:6]
+        self.key3 = CT_SECRET_KEY[6:9]
+        self.url = 'http://api.ct10649.com:9001/m2m_ec/query.do'
+        self.session = requests.Session()
+
+    def get_password_enc(self):
+        return DesUtils.str_enc(self.password, self.key1, self.key2, self.key3)
+
+    def get_sign(self, arr):
+        return DesUtils.str_enc(DesUtils.natural_ordering(arr), self.key1, self.key2, self.key3)
+
+    def get_required_arr(self, iccid, method):
+        return [iccid, self.user_id, self.password, method]
+
+    def get_params_dict_by_iccid(self, method, iccid, arr):
+        return {'method': method, 'iccid': iccid,
+                'user_id': self.user_id,
+                'passWord': self.get_password_enc(),
+                'sign': self.get_sign(arr)}
+
+    def get_params_dict_by_access_number(self, method, access_number, arr):
+        return {'method': method, 'access_number': access_number,
+                'user_id': self.user_id,
+                'passWord': self.get_password_enc(),
+                'sign': self.get_sign(arr)}
+
+    def query_card_main_status(self, iccid):
+        """
+        卡主状态查询接口
+
+        :param iccid: 卡的ICCID号。
+        :return: 包含卡状态的结果字典。
+        :raises ValueError: 如果响应为空或HTTP请求失败。
+        """
+        try:
+            if not iccid:
+                raise ValueError("error ICCID不能为空")
+            method = 'queryCardMainStatus'
+            arr = self.get_required_arr(iccid, method)
+            # 密码加密
+            re_params = self.get_params_dict_by_iccid(method, iccid, arr)
+
+            response = self.session.get(self.url, params=re_params)
+            if response.status_code != 200:
+                LOGGER(f"*****TelecomObject.query_card_main_status error HTTP请求失败,状态码: {response.status_code}")
+                return None
+
+            result = response.json()
+            LOGGER.info(f'*****TelecomObject.query_card_main_status****iccid:{iccid},result:{result}')
+            return result
+
+        except Exception as e:
+            LOGGER.info('***TelecomObject.query_card_main_status:errLine:{}, errMsg:{}'
+                        .format(e.__traceback__.tb_lineno, repr(e)))
+
+            return None
+
+    def query_total_usage_by_date(self, access_number):
+        """
+        总使用量查询接口(时间段) 不可跨月查询
+
+        :param access_number: 11位接入号码
+        :return: 套餐使用量查询结果
+        :raises ValueError: 如果响应为空或HTTP请求失败。
+
+        """
+        try:
+            if not access_number:
+                raise ValueError("error access_number不能为空")
+            start_date = LocalDateTimeUtil.get_first_day_of_month()
+            end_date = LocalDateTimeUtil.get_last_day_of_month()
+            method = 'queryTotalUsageByDate'
+            arr = [method, self.user_id, access_number, self.password, start_date, end_date]
+            re_params = self.get_params_dict_by_access_number(method, access_number, arr)
+            re_params['startDate'] = start_date
+            re_params['endDate'] = end_date
+
+            response = self.session.get(self.url, params=re_params)
+            if response.status_code != 200:
+                LOGGER.info(f"*****TelecomObject.query_total_usage_by_date error HTTP请求失败,状态码: {response.status_code}")
+                return None
+
+            msg = response.text
+            if not msg:
+                return None
+
+            # 检查响应类型
+            content_type = response.headers.get('Content-Type', '')
+            if 'application/xml' in content_type or 'text/xml' in content_type:
+                result = xmltodict.parse(msg)
+            elif 'application/json' in content_type or 'text/json' in content_type:
+                result = json.loads(msg)
+                LOGGER.info(f"***TelecomObject.query_total_usage_by_date查询流量异常{access_number},error{result}")
+                return None
+            else:
+                return None
+
+            return result
+
+        except Exception as e:
+            LOGGER.info('***TelecomObject.query_total_usage_by_date:errLine:{}, errMsg:{}'
+                        .format(e.__traceback__.tb_lineno, repr(e)))
+            return None
+
+    def disabled_number(self, access_number, orderTypeId):
+        """
+        停机保号/复机/测试期去激活
+
+        :param access_number: 11位接入号码
+        :param orderTypeId: 19表示停机保号,20表示停机保号后复机,21表示测试期去激活,22表示测试期去激活后回到测试激活。
+        :return: 流量总使用量查询(时间段)
+        :raises ValueError: 如果响应为空或HTTP请求失败。
+
+        """
+        try:
+            if not access_number:
+                raise ValueError("*****TelecomObject.disabled_number error****access_number不能为空")
+            method = 'disabledNumber'
+            arr = [method, self.user_id, access_number, self.password, orderTypeId, '']
+            re_params = self.get_params_dict_by_access_number(method, access_number, arr)
+            re_params['orderTypeId'] = orderTypeId
+            re_params['acctCd'] = ''
+            response = self.session.get(self.url, params=re_params)
+            if response.status_code != 200:
+                LOGGER.info(f"*****TelecomObject.disabled_number error HTTP请求失败,状态码: {response.status_code}")
+                return None
+
+            msg = response.text
+            if not msg:
+                return None
+
+            # 检查响应类型
+            content_type = response.headers.get('Content-Type', '')
+            if 'application/xml' in content_type or 'text/xml' in content_type:
+                result = xmltodict.parse(msg)
+            elif 'application/json' in content_type or 'text/json' in content_type:
+                result = json.loads(msg)
+                LOGGER.info(f"***TelecomObject.disabled_number停机/复机异常{access_number},error{result}")
+                return None
+            else:
+                return None
+
+            return result
+
+        except Exception as e:
+            LOGGER.info('***TelecomObject.disabled_number:errLine:{}, errMsg:{}'
+                        .format(e.__traceback__.tb_lineno, repr(e)))
+            return None
+
+    def single_cut_net(self, access_number, action):
+        """
+        单独添加/恢复断网接口
+
+        :param access_number: 11位接入号码
+        :param action: ADD:单独添加断网;DEL:单独恢复断网
+        :return: 单独添加/恢复断网结果
+        :raises ValueError: 如果响应为空或HTTP请求失败。
+
+        """
+        try:
+            if not access_number:
+                raise ValueError("*****TelecomObject.single_cut_net***error:access_number不能为空")
+            method = 'singleCutNet'
+            arr = [method, self.user_id, access_number, self.password, action]
+            re_params = self.get_params_dict_by_access_number(method, access_number, arr)
+            re_params['action'] = action
+            response = self.session.get(self.url, params=re_params)
+
+            if response.status_code != 200:
+                LOGGER.info(f"*****TelecomObject.query_card_main_status error HTTP请求失败,状态码: {response.status_code}")
+                return None
+
+            msg = response.text
+            if not msg:
+                return None
+
+            # 检查响应类型
+            content_type = response.headers.get('Content-Type', '')
+            if 'application/xml' in content_type or 'text/xml' in content_type:
+                result = xmltodict.parse(msg)
+            elif 'application/json' in content_type or 'text/json' in content_type:
+                result = json.loads(msg)
+                LOGGER.info(f"***TelecomObject.query_card_main_status停机/复机异常{access_number},error{result}")
+                return None
+            else:
+                return None
+
+            return result
+
+        except Exception as e:
+            LOGGER.info('***TelecomObject.single_cut_net:errLine:{}, errMsg:{}'
+                        .format(e.__traceback__.tb_lineno, repr(e)))
+            return None
+
+    def get_telephone(self, iccid):
+        """
+        接入号码查询接口
+
+        :param iccid: 卡的ICCID号。
+        :return: 接入号码查询结果
+        :raises ValueError: 如果响应为空或HTTP请求失败。
+
+        """
+        try:
+            if not iccid:
+                raise ValueError("*****TelecomObject.get_telephone error****iccid不能为空")
+            method = 'getTelephone'
+            arr = [method, self.user_id, iccid, self.password]
+            re_params = self.get_params_dict_by_iccid(method, iccid, arr)
+            response = self.session.get(self.url, params=re_params)
+
+            if response.status_code != 200:
+                LOGGER.info(f"*****TelecomObject.get_telephone error HTTP请求失败,状态码: {response.status_code}")
+                return None
+            msg = response.text
+            if not msg:
+                return None
+
+            # 检查响应类型
+            return json.loads(msg)
+
+        except Exception as e:
+            LOGGER.info('***TelecomObject.get_telephone:errLine:{}, errMsg:{}'
+                        .format(e.__traceback__.tb_lineno, repr(e)))
+            return None
+
+    def query_traffic(self, iccid, need_dtl='0'):
+        """
+        表示流量总使用量查询(当月)
+
+        :param iccid: 卡的ICCID号。
+        :param need_dtl: 0
+        :return: 表示流量总使用量查询(当月)。
+        :raises ValueError: 如果响应为空或HTTP请求失败。
+
+        """
+        try:
+            if not iccid:
+                raise ValueError("*****TelecomObject.query_traffic error****ICCID不能为空")
+            method = 'queryTraffic'
+            arr = [method, self.user_id, iccid, self.password]
+            re_params = self.get_params_dict_by_iccid(method, iccid, arr)
+            re_params['needDtl'] = need_dtl
+            response = self.session.get(self.url, params=re_params)
+            if response.status_code != 200:
+                LOGGER.info(
+                    f"*****TelecomObject.query_traffic error HTTP请求失败,状态码: {response.status_code}")
+                return None
+
+            msg = response.text
+            if not msg:
+                return None
+
+            # 检查响应类型
+            content_type = response.headers.get('Content-Type', '')
+            if 'application/xml' in content_type or 'text/xml' in content_type:
+                result = xmltodict.parse(msg)
+            elif 'application/json' in content_type or 'text/json' in content_type:
+                result = json.loads(msg)
+                LOGGER.info(f"***TelecomObject.query_traffic查询流量异常{iccid},error{result}")
+                return None
+            else:
+                return None
+
+            return result
+
+        except Exception as e:
+            LOGGER.info(
+                '***TelecomObject.query_traffic:errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno, repr(e)))
+            return None
+
+    def query_package_batch_flow(self, iccid, month_date):
+        """
+        套餐使用量批量查询接口
+
+        :param iccid: 卡的ICCID号。
+        :param month_date: 例如20160501,表示查询2016年5月份的套餐;如果不填,则为查询实时套餐
+        :return: 套餐使用量查询结果
+        :raises ValueError: 如果响应为空或HTTP请求失败。
+
+        """
+        try:
+            if not iccid:
+                raise ValueError("*****TelecomObject.query_package_batch_flow error****ICCID不能为空")
+            method = 'queryPakagePlus'
+            arr = [method, self.user_id, iccid, self.password]
+            re_params = self.get_params_dict_by_access_number(method, iccid, arr)
+            response = self.session.get(self.url, params=re_params)
+            if response.status_code != 200:
+                LOGGER.info(f"*****TelecomObject.query_package_batch_flow error HTTP请求失败,状态码: {response.status_code}")
+                return None
+
+            msg = response.text
+            if not msg:
+                return None
+
+            # 检查响应类型
+            content_type = response.headers.get('Content-Type', '')
+            if 'application/xml' in content_type or 'text/xml' in content_type:
+                result = xmltodict.parse(msg)
+            elif 'application/json' in content_type or 'text/json' in content_type:
+                result = json.loads(msg)
+                LOGGER.info(f"***TelecomObject.query_package_batch_flow查询流量异常{iccid},error{result}")
+                return None
+            else:
+                LOGGER.info("***TelecomObject.query_package_batch_flow无法识别的响应类型: {}".format(content_type))
+                return None
+
+            return result
+
+        except Exception as e:
+            LOGGER.info(
+                '***TelecomObject.query_package_batch_flow:errLine:{}, errMsg:{}'.format(e.__traceback__.tb_lineno,
+                                                                                         repr(e)))
+            return None
+
+    def query_traffic_by_date(self, iccid):
+        """
+        流量总使用量查询(时间段) 会被限流 成功则返回XML
+
+        :param iccid: 卡的ICCID号。
+        :return: 流量总使用量查询(时间段)
+        :raises ValueError: 如果响应为空或HTTP请求失败。
+
+        """
+        try:
+            if not iccid:
+                raise ValueError("*****TelecomObject.query_traffic_by_date error****ICCID不能为空")
+            method = 'queryTrafficByDate'
+            arr = [method, self.user_id, iccid, self.password]
+            re_params = self.get_params_dict_by_iccid(method, iccid, arr)
+            re_params['startDate'] = '20240101'
+            re_params['endDate'] = '20240115'
+            re_params['needDtl'] = '0'
+            response = self.session.get(self.url, params=re_params)
+            if response.status_code != 200:
+                LOGGER.info(f"*****TelecomObject.query_traffic_by_date error HTTP请求失败,状态码: {response.status_code}")
+                return None
+
+            msg = response.text
+            if not msg:
+                return None
+
+            # 检查响应类型
+            content_type = response.headers.get('Content-Type', '')
+            if 'application/xml' in content_type or 'text/xml' in content_type:
+                result = xmltodict.parse(msg)
+            elif 'application/json' in content_type or 'text/json' in content_type:
+                result = json.loads(msg)
+                LOGGER.info(f"***TelecomObject.query_traffic_by_date 查询流量异常{iccid},error{result}")
+                return None
+            else:
+                LOGGER.info("***TelecomObject.query_traffic_by_date 无法识别的响应类型: {}".format(content_type))
+                return None
+
+            return result
+
+        except Exception as e:
+            LOGGER.info('***TelecomObject.query_traffic_by_date:errLine:{}, errMsg:{}'
+                        .format(e.__traceback__.tb_lineno, repr(e)))
+            return None
+
+    def query_product_information(self, access_number):
+        """
+        产品资料查询接口
+        :param access_number: 接入号码access_number。
+        :return: SIM卡的产品资料,包含基础信息、套餐、状态及 断网类型等
+        :raises ValueError: 如果响应为空或HTTP请求失败。
+        """
+        try:
+            if not access_number:
+                raise ValueError("*****TelecomObject.query_product_information error****access_number不能为空")
+            method = 'prodInstQuery'
+            arr = [method, self.user_id, access_number, self.password]
+            re_params = self.get_params_dict_by_access_number(method, access_number, arr)
+            response = self.session.get(self.url, params=re_params)
+
+            if response.status_code != 200:
+                LOGGER.info(f"*****TelecomObject.query_product_information error HTTP请求失败,状态码: {response.status_code}")
+                return None
+
+            msg = response.text
+            if not msg:
+                return None
+
+            content_type = response.headers.get('Content-Type', '')
+            if 'application/xml' in content_type or 'text/xml' in content_type:
+                result = xmltodict.parse(msg)
+            elif 'application/json' in content_type or 'text/json' in content_type:
+                result = json.loads(msg)
+                LOGGER.info(f"***TelecomObject.query_product_information 查询产品资料异常:{access_number},error{result}")
+                return None
+            else:
+                LOGGER.info("***TelecomObject.query_product_information 无法识别的响应类型: {}".format(content_type))
+                return None
+            return result
+
+        except Exception as e:
+            LOGGER.info('***TelecomObject.query_product_information:errLine:{}, errMsg:{}'
+                        .format(e.__traceback__.tb_lineno, repr(e)))
+            return None
+

+ 305 - 0
Object/utils/DesUtils.py

@@ -0,0 +1,305 @@
+# -*- encoding: utf-8 -*-
+"""
+@File    : DesUtils.py
+@Time    : 2024/1/11 10:23
+@Author  : stephen
+@Email   : zhangdongming@asj6.wecom.work
+@Software: PyCharm
+"""
+
+
+class DesUtils:
+
+    @staticmethod
+    def str_enc(data, first_key, second_key, third_key):
+        enc_data = ""
+        if first_key:
+            first_key_bt = DesUtils.get_key_bytes(first_key)
+        if second_key:
+            second_key_bt = DesUtils.get_key_bytes(second_key)
+        if third_key:
+            third_key_bt = DesUtils.get_key_bytes(third_key)
+
+        for i in range(0, len(data), 4):
+            temp_data = data[i:i + 4].ljust(4, '\0')
+            temp_byte = DesUtils.str_to_bt(temp_data)
+
+            if first_key:
+                for key_bt in first_key_bt:
+                    temp_byte = DesUtils.des_enc(temp_byte, key_bt)
+            if second_key:
+                for key_bt in second_key_bt:
+                    temp_byte = DesUtils.des_enc(temp_byte, key_bt)
+            if third_key:
+                for key_bt in third_key_bt:
+                    temp_byte = DesUtils.des_enc(temp_byte, key_bt)
+
+            enc_data += DesUtils.bt64_to_hex(temp_byte)
+
+        return enc_data
+
+    @staticmethod
+    def str_dec(data, first_key, second_key, third_key):
+        dec_str = ""
+        if first_key:
+            first_key_bt = DesUtils.get_key_bytes(first_key)
+        if second_key:
+            second_key_bt = DesUtils.get_key_bytes(second_key)
+        if third_key:
+            third_key_bt = DesUtils.get_key_bytes(third_key)
+
+        for i in range(0, len(data), 16):
+            temp_data = data[i:i + 16]
+            str_byte = DesUtils.hex_to_bt64(temp_data)
+            int_byte = [int(bit) for bit in str_byte]
+
+            if third_key:
+                for key_bt in reversed(third_key_bt):
+                    int_byte = DesUtils.des_dec(int_byte, key_bt)
+            if second_key:
+                for key_bt in reversed(second_key_bt):
+                    int_byte = DesUtils.des_dec(int_byte, key_bt)
+            if first_key:
+                for key_bt in reversed(first_key_bt):
+                    int_byte = DesUtils.des_dec(int_byte, key_bt)
+
+            dec_str += DesUtils.byte_to_string(int_byte)
+
+        return dec_str
+
+    @staticmethod
+    def get_key_bytes(key):
+        key_bytes = []
+        length = len(key)
+        for i in range(0, length, 4):
+            key_bytes.append(DesUtils.str_to_bt(key[i:i + 4]))
+        return key_bytes
+
+    @staticmethod
+    def str_to_bt(s):
+        bt = [0] * 64
+        for i, char in enumerate(s.ljust(4, '\0')):
+            k = ord(char)
+            for j in range(16):
+                bt[i * 16 + j] = (k >> (15 - j)) & 1
+        return bt
+
+    @staticmethod
+    def bt4_to_hex(binary):
+        return format(int(binary, 2), 'X')
+
+    @staticmethod
+    def hex_to_bt4(hex_str):
+        return format(int(hex_str, 16), '04b')
+
+    @staticmethod
+    def byte_to_string(byte_data):
+        return ''.join(chr(int(''.join(map(str, byte_data[i:i + 16])), 2)) for i in range(0, 64, 16)).rstrip('\0')
+
+    @staticmethod
+    def bt64_to_hex(byte_data):
+        return ''.join(DesUtils.bt4_to_hex(''.join(map(str, byte_data[i:i + 4]))) for i in range(0, 64, 4))
+
+    @staticmethod
+    def hex_to_bt64(hex_str):
+        return ''.join(DesUtils.hex_to_bt4(hex_str[i]) for i in range(16))
+
+    @staticmethod
+    # 加密
+    def des_enc(data_byte, key_byte):
+        keys = DesUtils.generate_keys(key_byte)
+        ip_byte = DesUtils.init_permute(data_byte)
+        ip_left = ip_byte[:32]
+        ip_right = ip_byte[32:]
+
+        for i in range(16):
+            temp_left = ip_left[:]
+            ip_left = ip_right[:]
+            temp_right = DesUtils.xor(
+                DesUtils.p_permute(DesUtils.s_box_permute(DesUtils.xor(DesUtils.expand_permute(ip_right), keys[i]))),
+                temp_left)
+            ip_right = temp_right[:]
+
+        final_data = ip_right + ip_left
+        return DesUtils.finally_permute(final_data)
+
+    @staticmethod
+    # 解密
+    def des_dec(data_byte, key_byte):
+        keys = DesUtils.generate_keys(key_byte)
+        ip_byte = DesUtils.init_permute(data_byte)
+        ip_left = ip_byte[:32]
+        ip_right = ip_byte[32:]
+
+        for i in range(15, -1, -1):
+            temp_left = ip_left[:]
+            ip_left = ip_right[:]
+            temp_right = DesUtils.xor(
+                DesUtils.p_permute(DesUtils.s_box_permute(DesUtils.xor(DesUtils.expand_permute(ip_right), keys[i]))),
+                temp_left)
+            ip_right = temp_right[:]
+
+        final_data = ip_right + ip_left
+        return DesUtils.finally_permute(final_data)
+
+    @staticmethod
+    def init_permute(original_data):
+        ip_byte = [0] * 64
+        for i in range(4):
+            for j in range(8):
+                ip_byte[i * 8 + j] = original_data[(7 - j) * 8 + i * 2 + 1]
+                ip_byte[i * 8 + j + 32] = original_data[(7 - j) * 8 + i * 2]
+        return ip_byte
+
+    @staticmethod
+    def expand_permute(right_data):
+        ep_byte = [0] * 48
+        for i in range(8):
+            ep_byte[i * 6 + 0] = right_data[31] if i == 0 else right_data[i * 4 - 1]
+            ep_byte[i * 6 + 1] = right_data[i * 4 + 0]
+            ep_byte[i * 6 + 2] = right_data[i * 4 + 1]
+            ep_byte[i * 6 + 3] = right_data[i * 4 + 2]
+            ep_byte[i * 6 + 4] = right_data[i * 4 + 3]
+            ep_byte[i * 6 + 5] = right_data[0] if i == 7 else right_data[i * 4 + 4]
+        return ep_byte
+
+    @staticmethod
+    def xor(byte_one, byte_two):
+        return [b1 ^ b2 for b1, b2 in zip(byte_one, byte_two)]
+
+    @staticmethod
+    def s_box_permute(expand_byte):
+        s_box_byte = [0] * 32
+        s_boxes = [
+            # S1
+            [
+                [14, 4, 13, 1, 2, 15, 11, 8, 3, 10, 6, 12, 5, 9, 0, 7],
+                [0, 15, 7, 4, 14, 2, 13, 1, 10, 6, 12, 11, 9, 5, 3, 8],
+                [4, 1, 14, 8, 13, 6, 2, 11, 15, 12, 9, 7, 3, 10, 5, 0],
+                [15, 12, 8, 2, 4, 9, 1, 7, 5, 11, 3, 14, 10, 0, 6, 13]
+            ],
+            # S2
+            [
+                [15, 1, 8, 14, 6, 11, 3, 4, 9, 7, 2, 13, 12, 0, 5, 10],
+                [3, 13, 4, 7, 15, 2, 8, 14, 12, 0, 1, 10, 6, 9, 11, 5],
+                [0, 14, 7, 11, 10, 4, 13, 1, 5, 8, 12, 6, 9, 3, 2, 15],
+                [13, 8, 10, 1, 3, 15, 4, 2, 11, 6, 7, 12, 0, 5, 14, 9],
+            ],
+            # S3
+            [
+                [10, 0, 9, 14, 6, 3, 15, 5, 1, 13, 12, 7, 11, 4, 2, 8],
+                [13, 7, 0, 9, 3, 4, 6, 10, 2, 8, 5, 14, 12, 11, 15, 1],
+                [13, 6, 4, 9, 8, 15, 3, 0, 11, 1, 2, 12, 5, 10, 14, 7],
+                [1, 10, 13, 0, 6, 9, 8, 7, 4, 15, 14, 3, 11, 5, 2, 12],
+            ],
+            # S4
+            [
+                [7, 13, 14, 3, 0, 6, 9, 10, 1, 2, 8, 5, 11, 12, 4, 15],
+                [13, 8, 11, 5, 6, 15, 0, 3, 4, 7, 2, 12, 1, 10, 14, 9],
+                [10, 6, 9, 0, 12, 11, 7, 13, 15, 1, 3, 14, 5, 2, 8, 4],
+                [3, 15, 0, 6, 10, 1, 13, 8, 9, 4, 5, 11, 12, 7, 2, 14],
+            ],  # Similar structure as S1
+            # S5
+            [
+                [2, 12, 4, 1, 7, 10, 11, 6, 8, 5, 3, 15, 13, 0, 14, 9],
+                [14, 11, 2, 12, 4, 7, 13, 1, 5, 0, 15, 10, 3, 9, 8, 6],
+                [4, 2, 1, 11, 10, 13, 7, 8, 15, 9, 12, 5, 6, 3, 0, 14],
+                [11, 8, 12, 7, 1, 14, 2, 13, 6, 15, 0, 9, 10, 4, 5, 3],
+            ],
+            # S6
+            [
+                [12, 1, 10, 15, 9, 2, 6, 8, 0, 13, 3, 4, 14, 7, 5, 11],
+                [10, 15, 4, 2, 7, 12, 9, 5, 6, 1, 13, 14, 0, 11, 3, 8],
+                [9, 14, 15, 5, 2, 8, 12, 3, 7, 0, 4, 10, 1, 13, 11, 6],
+                [4, 3, 2, 12, 9, 5, 15, 10, 11, 14, 1, 7, 6, 0, 8, 13],
+            ],
+            # S7
+            [
+                [4, 11, 2, 14, 15, 0, 8, 13, 3, 12, 9, 7, 5, 10, 6, 1],
+                [13, 0, 11, 7, 4, 9, 1, 10, 14, 3, 5, 12, 2, 15, 8, 6],
+                [1, 4, 11, 13, 12, 3, 7, 14, 10, 15, 6, 8, 0, 5, 9, 2],
+                [6, 11, 13, 8, 1, 4, 10, 7, 9, 5, 0, 15, 14, 2, 3, 12]
+            ],
+            # S8
+            [
+                [13, 2, 8, 4, 6, 15, 11, 1, 10, 9, 3, 14, 5, 0, 12, 7],
+                [1, 15, 13, 8, 10, 3, 7, 4, 12, 5, 6, 11, 0, 14, 9, 2],
+                [7, 11, 4, 1, 9, 12, 14, 2, 0, 6, 10, 13, 15, 3, 5, 8],
+                [2, 1, 14, 7, 4, 10, 8, 13, 15, 12, 9, 0, 3, 5, 6, 11],
+            ]
+        ]
+
+        for m in range(8):
+            i = expand_byte[m * 6] * 2 + expand_byte[m * 6 + 5]
+            j = (expand_byte[m * 6 + 1] * 8 + expand_byte[m * 6 + 2] * 4 +
+                 expand_byte[m * 6 + 3] * 2 + expand_byte[m * 6 + 4])
+            s_box_value = s_boxes[m][i][j]
+            binary = format(s_box_value, '04b')
+            for n in range(4):
+                s_box_byte[m * 4 + n] = int(binary[n])
+
+        return s_box_byte
+
+    @staticmethod
+    def p_permute(s_box_byte):
+        p_box_permute = [0] * 32
+        positions = [15, 6, 19, 20, 28, 11, 27, 16, 0, 14, 22, 25, 4, 17, 30, 9,
+                     1, 7, 23, 13, 31, 26, 2, 8, 18, 12, 29, 5, 21, 10, 3, 24]
+        for i, pos in enumerate(positions):
+            p_box_permute[i] = s_box_byte[pos]
+        return p_box_permute
+
+    @staticmethod
+    def finally_permute(end_byte):
+        fp_byte = [0] * 64
+        positions = [39, 7, 47, 15, 55, 23, 63, 31, 38, 6, 46, 14, 54, 22, 62, 30,
+                     37, 5, 45, 13, 53, 21, 61, 29, 36, 4, 44, 12, 52, 20, 60, 28,
+                     35, 3, 43, 11, 51, 19, 59, 27, 34, 2, 42, 10, 50, 18, 58, 26,
+                     33, 1, 41, 9, 49, 17, 57, 25, 32, 0, 40, 8, 48, 16, 56, 24]
+        for i, pos in enumerate(positions):
+            fp_byte[i] = end_byte[pos]
+        return fp_byte
+
+    @staticmethod
+    def get_box_binary(i):
+        return format(i, '04b')
+
+    @staticmethod
+    def generate_keys(key_byte):
+        key = [0] * 56
+        keys = [[0] * 48 for _ in range(16)]
+        loop = [1, 1, 2, 2, 2, 2, 2, 2, 1, 2, 2, 2, 2, 2, 2, 1]
+
+        for i in range(7):
+            for j, k in zip(range(8), reversed(range(8))):
+                key[i * 8 + j] = key_byte[8 * k + i]
+
+        for i in range(16):
+            for j in range(loop[i]):
+                temp_left = key[0]
+                temp_right = key[28]
+                for k in range(27):
+                    key[k] = key[k + 1]
+                    key[28 + k] = key[29 + k]
+                key[27] = temp_left
+                key[55] = temp_right
+
+            temp_key = [
+                key[13], key[16], key[10], key[23], key[0], key[4], key[2], key[27],
+                key[14], key[5], key[20], key[9], key[22], key[18], key[11], key[3],
+                key[25], key[7], key[15], key[6], key[26], key[19], key[12], key[1],
+                key[40], key[51], key[30], key[36], key[46], key[54], key[29], key[39],
+                key[50], key[44], key[32], key[47], key[43], key[48], key[38], key[55],
+                key[33], key[52], key[45], key[41], key[49], key[35], key[28], key[31]
+            ]
+
+            keys[i] = temp_key
+
+        return keys
+
+    @staticmethod
+    def natural_ordering(str_list):
+        if str_list and len(str_list) > 0:
+            str_list.sort()
+            return ",".join(str_list)
+        return ""

+ 21 - 0
Object/utils/LocalDateTimeUtil.py

@@ -318,3 +318,24 @@ def get_date_time(timestamp, timezone_offset):
 
     # 转换为日期时间对象
     return datetime.datetime.fromtimestamp(timestamp, tz=datetime.timezone(offset))
+
+
+def get_first_day_of_month(strformat="%Y%m%d"):
+    """
+    获取当月第一天日期
+    @param strformat:
+    @return: 返回当月第一天日期
+    """
+    today = datetime.date.today()
+    return datetime.date(today.year, today.month, 1).strftime(strformat)
+
+
+def get_last_day_of_month(strformat="%Y%m%d"):
+    """
+    获取当月最后一天日期
+    @param strformat: 日期格式化
+    @return: 返回当月最后一天日期
+    """
+    today = datetime.date.today()
+    last_day = calendar.monthrange(today.year, today.month)[1]
+    return datetime.date(today.year, today.month, last_day).strftime(strformat)