Prechádzať zdrojové kódy

加密获取用户信息接口

linhaohong 11 mesiacov pred
rodič
commit
82572cb438

+ 2 - 0
Ansjer/urls.py

@@ -29,6 +29,7 @@ from Controller.CustomCustomer import CustomCustomerController
 from Controller.MessagePush import EquipmentMessagePush
 from Controller.Surveys import CloudStorageController
 from Controller.UserDevice import UserSubscriptionController
+from Controller.CampaignController import AdDepartmentController
 from Controller.SensorGateway import SensorGatewayController, EquipmentFamilyController
 from django.urls import include
 
@@ -381,6 +382,7 @@ urlpatterns = [
     re_path('account/changeAccountInfo/(?P<operation>.*)$', UserController.ChangeAccountInfoView.as_view()),
     re_path('deviceCommon/(?P<operation>.*)$', DeviceCommonController.DeviceCommonView.as_view()),
     re_path('customCustomer/(?P<operation>.*)$', CustomCustomerController.CustomCustomerView.as_view()),
+    re_path('adDepartment/(?P<operation>.*)$', AdDepartmentController.AdDepartmentView.as_view()),
 
 
     # 后台界面接口 -----------------------------------------------------

+ 115 - 0
Controller/CampaignController/AdDepartmentController.py

@@ -0,0 +1,115 @@
+from collections import defaultdict
+
+from django.http import JsonResponse
+from django.views import View
+
+from Model.models import Device_User, Device_Info
+from Object.HMACValidatorObject import HMACValidatorObject
+
+
+class AdDepartmentView(View):
+    def get(self, request, *args, **kwargs):
+        request.encoding = 'utf-8'
+        operation = kwargs.get('operation')
+        request_dict = request.GET
+        return self.validation(request, request_dict, operation)
+
+    def post(self, request, *args, **kwargs):
+        request.encoding = 'utf-8'
+        operation = kwargs.get('operation')
+        request_dict = request.POST
+        return self.validation(request, request_dict, operation)
+
+    def validation(self, request, request_dict, operation):
+        if operation == 'getUserList':
+            return self.get_user_list(request_dict)
+        else:
+            return JsonResponse({'code': 400, 'msg': 'operation not found'})
+
+    @staticmethod
+    def get_user_list(request_dict):
+        # 手机号或者邮箱查询
+        phone = request_dict.get('phone', '')
+        email = request_dict.get('email', '')
+        provided_signature = request_dict.get('signature', '')
+        pageNo = int(request_dict.get('pageNo', 1))  # 默认第一页
+        pageSize = int(request_dict.get('pageSize', 10))  # 默认每页10条
+        timestamp = request_dict.get('timestamp', None)
+
+        # 判断时间戳是否存在
+        if not all([provided_signature, timestamp]):
+            return JsonResponse(status=400, data={"error": "缺少关键参数"})
+
+        # 生成用于签名的数据
+        data = f"phone={phone}&email={email}&timestamp={timestamp}"
+
+        # 创建HMAC验证器对象
+        validator = HMACValidatorObject()
+
+        # 验证签名是否正确
+        is_signature_valid = validator.verify(data, provided_signature)
+
+        # 验证时间戳是否在有效范围内,防止重放攻击
+        is_timestamp_valid = validator.validate_timestamp(timestamp)
+
+        # 如果签名和时间戳均有效
+        if is_signature_valid and is_timestamp_valid:
+            # 获取所有用户查询集
+            device_user_qs = Device_User.objects.all()
+
+            # 条件查询:手机号
+            if phone:
+                device_user_qs = device_user_qs.filter(phone__icontains=phone)
+
+            # 条件查询:邮箱
+            if email:
+                device_user_qs = device_user_qs.filter(userEmail__icontains=email)
+
+            total = device_user_qs.count()
+
+            if int(pageSize) > 200:
+                pageSize = 200
+
+            # 分页处理
+            start_index = (pageNo - 1) * pageSize
+            end_index = start_index + pageSize
+            paginated_users = device_user_qs[start_index:end_index]
+
+            # 使用 prefetch_related 预加载设备信息,避免 N+1 查询
+            paginated_users = paginated_users.prefetch_related('device_info_set')
+
+            # 构造返回的用户信息列表
+            user_info_list = []
+            for user in paginated_users:
+                device_list = [
+                    {
+                        "uid": device.UID,
+                        "serialNumber": device.serial_number,
+                        "addTime": device.data_joined,
+                        "isShared": device.isShare,
+                        "isPrimaryUser": user.userID == device.primaryUserID,
+                    }
+                    for device in user.device_info_set.all()  # 使用预加载的设备信息
+                ]
+
+                user_info_list.append({
+                    "username": user.username,
+                    "email": user.userEmail,
+                    "phone": user.phone,
+                    "registrationTime": user.data_joined,
+                    "devices": device_list
+                })
+
+            # 构造最终的返回数据
+            user_info = {
+                "total": total,  # 总数
+                "pageNo": pageNo,
+                "pageSize": pageSize,
+                "users": user_info_list
+            }
+
+            return JsonResponse(status=200, data=user_info)
+
+        # 如果签名或时间戳验证失败
+        return JsonResponse(status=400, data={"error": "认证失败"})
+

+ 58 - 0
Object/HMACValidatorObject.py

@@ -0,0 +1,58 @@
+import hmac
+import hashlib
+import base64
+from datetime import datetime
+
+class HMACValidatorObject:
+    def __init__(self):
+        """
+        初始化HMAC签名与验签类。
+        """
+        secret = "4f92e6aa7e231ea8b9c"
+        self.secret = secret.encode('utf-8')
+
+    def sign(self, data):
+        """
+        使用共享密钥对数据进行签名。
+        :param data: 需要签名的字符串数据
+        :return: 返回签名的base64编码字符串
+        """
+        signature = hmac.new(self.secret, data.encode('utf-8'), hashlib.sha256).digest()
+        return base64.b64encode(signature).decode('utf-8')
+
+    def verify(self, data, provided_signature):
+        """
+        使用共享密钥对签名进行验证。
+        :param data: 原始字符串数据
+        :param provided_signature: 请求中提供的签名 (base64编码)
+        :return: 验签结果,True表示签名有效,False表示签名无效
+        """
+        expected_signature = self.sign(data)
+        return hmac.compare_digest(expected_signature, provided_signature)
+
+    @staticmethod
+    def generate_timestamp():
+        """
+        生成当前时间戳,用于防止重放攻击。
+        :return: 当前时间的ISO格式字符串
+        """
+        return datetime.utcnow().isoformat()
+
+    @staticmethod
+    def validate_timestamp(timestamp, tolerance_seconds=300):
+        """
+        校验时间戳是否在允许的范围内。
+        :param timestamp: Unix时间戳 (秒级)
+        :param tolerance_seconds: 容许的时间误差,单位为秒 (默认5分钟)
+        :return: 时间戳是否在有效范围内
+        """
+        try:
+            # 将Unix时间戳转为datetime对象
+            request_time = datetime.utcfromtimestamp(int(timestamp))
+            current_time = datetime.utcnow()
+            # 计算时间差(秒级)
+            delta = (current_time - request_time).total_seconds()
+            # 判断时间差是否在容许范围内
+            return abs(delta) <= tolerance_seconds
+        except ValueError:
+            return False