Bladeren bron

优化群发功能采用线程池方式

zhangdongming 11 maanden geleden
bovenliggende
commit
7533580f89
1 gewijzigde bestanden met toevoegingen van 47 en 25 verwijderingen
  1. 47 25
      AdminController/UserManageController.py

+ 47 - 25
AdminController/UserManageController.py

@@ -1,5 +1,7 @@
 import datetime
 import time
+from concurrent.futures import ThreadPoolExecutor
+
 import oss2
 import requests
 from django.contrib.auth.hashers import make_password, check_password  # 对密码加密模块
@@ -717,31 +719,20 @@ class UserManagement(View):
             return response.json(444)
 
         try:
-            with transaction.atomic():
-                # SysMassModel表创建群发消息记录数据
-                nowTime = int(time.time())
-                sender_id = Device_User.objects.filter(username='13800138001').values('userID').first()['userID']
-                SysMassModel.objects.create(
-                    sender_id=sender_id,
-                    lang=lang,
-                    platform=platform,
-                    recever=recever,
-                    msg=msg,
-                    addTime=nowTime,
-                    updTime=nowTime,
-                )
-                # 根据UserExModel表的userID群发消息给用户
-                region_list = lang.split(',')
-                appBundleId_list = recever.split(',')
-                userID_list = UserExModel.objects.filter(appBundleId__in=appBundleId_list, region__in=region_list) \
-                    .values_list('userID_id', flat=True)
-                sys_msg_list = []
-                for userID in userID_list:
-                    sys_msg_list.append(SysMsgModel(userID_id=userID, msg=msg, addTime=nowTime, updTime=nowTime))
-                    if len(sys_msg_list) > 5000:
-                        SysMsgModel.objects.bulk_create(sys_msg_list)
-                        sys_msg_list = []
-                SysMsgModel.objects.bulk_create(sys_msg_list)
+            # SysMassModel表创建群发消息记录数据
+            now_time = int(time.time())
+            sender_id = Device_User.objects.filter(username='13800138001').values('userID').first()['userID']
+            SysMassModel.objects.create(
+                sender_id=sender_id,
+                lang=lang,
+                platform=platform,
+                recever=recever,
+                msg=msg,
+                addTime=now_time,
+                updTime=now_time,
+            )
+            # 查询并处理数据
+            self.process_sys_messages(lang, recever, msg, now_time)
             return response.json(0)
         except Exception as e:
             print(e)
@@ -1212,3 +1203,34 @@ class UserManagement(View):
     @staticmethod
     def del_custom_customer(request_dict, response):
         pass
+
+    def process_sys_messages(self, lang, recever, msg, now_time):
+        """分页查询数据并分批保存系统消息"""
+        region_list = lang.split(',')
+        appBundleId_list = recever.split(',')
+
+        # 查询符合条件的用户ID
+        user_queryset = UserExModel.objects.filter(appBundleId__in=appBundleId_list,
+                                                   region__in=region_list).values_list('userID_id', flat=True)
+        paginator = Paginator(user_queryset, 10000)  # 每次查询10000条
+
+        # 创建线程池用于并发保存
+        with ThreadPoolExecutor(max_workers=5) as executor:
+            for page_num in paginator.page_range:
+                user_ids = paginator.page(page_num).object_list
+                # 提交保存任务到线程池
+                executor.submit(self.save_sys_messages_in_batches, user_ids, msg, now_time)
+
+    def save_sys_messages_in_batches(self, user_ids, msg, now_time):
+        """分批保存系统消息,每次最多5000条"""
+        sys_msg_list = []
+        for user_id in user_ids:
+            sys_msg_list.append(SysMsgModel(userID_id=user_id, msg=msg, addTime=now_time, updTime=now_time))
+
+            if len(sys_msg_list) >= 5000:
+                SysMsgModel.objects.bulk_create(sys_msg_list)
+                sys_msg_list = []
+
+        # 保存剩余的消息
+        if sys_msg_list:
+            SysMsgModel.objects.bulk_create(sys_msg_list)