Преглед на файлове

对接aws中的DynamoDB数据库,完成创建表,和增删改查所对应的接口

pzb преди 6 години
родител
ревизия
7fd1ad26ee
променени са 2 файла, в които са добавени 381 реда и са изтрити 131 реда
  1. 380 130
      Controller/UserBrandController.py
  2. 1 1
      Service/CommonService.py

+ 380 - 130
Controller/UserBrandController.py

@@ -31,6 +31,307 @@ http://192.168.136.39:8000/userbrandinfo/queryAllByAdmin?token=test&page=1&line=
 
 http://192.168.136.39:8000/userbrandinfo/queryAll?token=eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJ1c2VySUQiOiIxNTQzOTA5MDUwNDEzMTM4MDAxMzgwMDAiLCJsYW5nIjoiY24iLCJ1c2VyIjoiMTM4MDAxMzgwMDEiLCJtX2NvZGUiOiIxMjM0MTMyNDMyMTQiLCJleHAiOjE1NTk4OTY4NTd9.nhK3VSghSGjyXKjel4woz7R_3bhjgqQDlX-ypYsklNU&page=1&line=5
 '''
+# coding:utf-8
+from boto3 import Session
+from botocore.exceptions import ClientError
+from boto3.dynamodb.conditions import Key, Attr
+import logging
+import json
+
+logger = logging.getLogger(__name__)
+
+class MyserviceDynamodb(object):
+    def __init__(self, **kwargs):
+        self.region = 'us-west-1'
+        self.access_key = 'AKIA2E67UIMD4PZTYKYD'
+        self.secret_key = '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'
+        self.session = self.__session()
+
+    def __session(self):
+        try:
+            session = Session(aws_access_key_id=self.access_key, aws_secret_access_key=self.secret_key,region_name=self.region)
+        except:
+            print("Failed to connect session in region{0}".format(self.region))
+        return session
+    # 创建user_brand_all表
+    def user_brand_all_table_create(self, table_name):
+        dynamodb = self.session.resource('dynamodb')
+        try:
+            table = dynamodb.create_table(
+                TableName=table_name,
+                KeySchema=[
+                    {
+                        'AttributeName': 'userID',
+                        'KeyType': 'HASH'
+                    },
+                    {
+                        'AttributeName': 'addTime',
+                        'KeyType': 'RANGE'
+                    }
+                ],
+                AttributeDefinitions=[
+                    {
+                        'AttributeName': 'userID',
+                        'AttributeType': 'S'
+                    },
+                    {
+                        'AttributeName': 'addTime',
+                        'AttributeType': 'N'
+                    },
+
+                ],
+                ProvisionedThroughput={
+                    'ReadCapacityUnits': 5,
+                    'WriteCapacityUnits': 5,
+                }
+            )
+        except Exception:
+            print ('已经创建该表了')
+
+    # 创建user_brand表
+    def table_create(self,table_name):
+        dynamodb = self.session.resource('dynamodb')
+        try:
+            table = dynamodb.create_table(
+                TableName=table_name,
+                KeySchema=[
+                    {
+                        'AttributeName': 'userID',
+                        'KeyType': 'HASH'
+                    },
+                    {
+                        'AttributeName': 'username',
+                        'KeyType': 'RANGE'
+                    }
+                ],
+                AttributeDefinitions=[
+                    {
+                        'AttributeName': 'userID',
+                        'AttributeType': 'S'
+                    },
+                    {
+                        'AttributeName': 'username',
+                        'AttributeType': 'S'
+                    },
+
+                ],
+                ProvisionedThroughput={
+                    'ReadCapacityUnits': 5,
+                    'WriteCapacityUnits': 5,
+                }
+            )
+        except Exception:
+            print ('已经创建该表了')
+
+    # 添加user_brand_all表数据
+    def user_brand_all_item_put(self, table_name,user_id, add_time,username,device_supplier,device_model,os_type,os_version,country,province,city,area,street,longitude,latitude,app_id,status_all,ip):
+        dynamodb = self.session.resource('dynamodb')
+        table = dynamodb.Table(table_name)
+        table.put_item(
+            Item={
+                'userID': user_id,
+                'addTime': add_time,
+                'username': username,
+                'deviceSupplier': device_supplier,
+                'deviceModel': device_model,
+                'osType': os_type,
+                'osVersion': os_version,
+                'country' : country,
+                'province': province,
+                'city': city,
+                'area': area,
+                'street': street,
+                'longitude': longitude,
+                'latitude': latitude,
+                'appId': app_id,
+                'status_all': status_all,
+                'ip': ip,
+            }
+        )
+        print ('添加数据成功!')
+
+    # 添加user_brand表数据
+    def user_brand_item_put(self, table_name, user_id, username,add_time, device_supplier, device_model, os_type,
+                                os_version, country, province, city, area, street, longitude, latitude, app_id,
+                                status_all, ip):
+        dynamodb = self.session.resource('dynamodb')
+        table = dynamodb.Table(table_name)
+        table.put_item(
+            Item={
+                'userID': user_id,
+                'username': username,
+                'addTime': add_time,
+                'deviceSupplier': device_supplier,
+                'deviceModel': device_model,
+                'osType': os_type,
+                'osVersion': os_version,
+                'country': country,
+                'province': province,
+                'city': city,
+                'area': area,
+                'street': street,
+                'longitude': longitude,
+                'latitude': latitude,
+                'appId': app_id,
+                'status_all': status_all,
+                'ip': ip,
+            }
+        )
+        print ('添加数据成功!')
+
+    # page分页数,line条数,table_name表名称,last_evaluated查询开始值,page_value记录分页开始值
+    def get_page_line(self, page, line, table_name, last_evaluated,page_value):
+        dynamodb = self.session.resource('dynamodb')
+        if not dynamodb:
+            raise DynamodbConnectionError("Failed to get resource for dynamodb!")
+        table = dynamodb.Table(table_name)
+        last_evaluated_key = last_evaluated
+        try:
+            if last_evaluated_key is None:
+                response = table.scan(
+                    Limit=line,
+                )
+                page_value.append(last_evaluated_key)
+            else:
+                # 构建分页的起点,传入下一页面的起点,这是由主键来控制的,last_evaluated_key的值就是本表中的uid
+                response = table.scan(
+                    Limit=line,
+                    ExclusiveStartKey=last_evaluated_key)
+            try:
+                last_evaluated_key = response['LastEvaluatedKey']
+                # 如果有知就追加在该值的后面
+                page_value.append(last_evaluated_key)
+                my.get_page_line(page, line, table_name, last_evaluated_key,page_value)
+            except Exception:
+                return page_value
+        except Exception as e:
+            logger.error("Failed to get table {0}, error".format(table_name, e))
+        return page_value
+
+    def item_get(self,line,table_name,last_evaluated_key):
+        dynamodb = self.session.resource('dynamodb')
+        table = dynamodb.Table(table_name)
+        if last_evaluated_key is None:
+            response = table.scan(
+                Limit=line,
+            )
+        else:
+            # 构建分页的起点,传入下一页面的起点,这是由主键来控制的,last_evaluated_key的值就是本表中的uid
+            response = table.scan(
+                Limit=line,
+                ExclusiveStartKey=last_evaluated_key)
+        return response
+
+    def item_get_count(self,table_name):
+        dynamodb = self.session.resource('dynamodb')
+        table = dynamodb.Table(table_name)
+        response = table.scan(
+        )
+        return len(response['Items'])
+
+    def item_get_brand(self,table_name):
+        dynamodb = self.session.resource('dynamodb')
+        table = dynamodb.Table(table_name)
+        response = table.scan()
+        response = response['Items']
+        return response
+
+    def put_item(self, table, item_dict=None):
+        try:
+            response = table.put_item(Item=item_dict)
+        except Exception as e:
+            logger.error("Failed to put item in to {0}:error{1}".format(table,e))
+        return response
+
+    def get_item(self, table_name,queryname ,username):
+        dynamodb = self.session.resource('dynamodb')
+        if not dynamodb:
+            raise DynamodbConnectionError("Failed to get resource for dynamodb!")
+        try:
+            table = dynamodb.Table(table_name)
+            response = table.scan(
+                FilterExpression=Attr(queryname).eq(username)
+            )
+            items = response['Items']
+        except Exception as e:
+            logger.error("Failed to get table {0}, error".format(table_name, e))
+        return items
+
+    def update_table(self, table_name, user_id,username, add_time, deviceSupplier, deviceModel, osType, osVersion, country, province, city, area, street, longitude , latitude,appId, ip,status_all
+                    ):
+        dynamodb = self.session.resource('dynamodb')
+        table = dynamodb.Table(table_name)
+        try:
+            response = table.update_item(
+                    Key = {
+                        'userID': user_id,
+                        'username':username
+                    },UpdateExpression = "SET addTime = :add_time, deviceSupplier= :deviceSupplier, deviceModel = :deviceModel, osType = :osType, osVersion = :osVersion, country = :country, province = :province, city = :city, area= :area, street= :street, longitude= :longitude, latitude= :latitude, appId= :appId, ip= :ip, status_all= :status_all",
+
+                    ExpressionAttributeValues={
+                        ':add_time': add_time,
+                        ':deviceSupplier': deviceSupplier,
+                        ':deviceModel': deviceModel,
+                        ':osType':osType,
+                        ':osVersion':osVersion,
+                        ':country':country,
+                        ':province':province,
+                        ':city': city,
+                        ':area': area,
+                        ':street': street,
+                        ':longitude': longitude,
+                        ':latitude': latitude,
+                        ':appId': appId,
+                        ':ip': ip,
+                        ':status_all': status_all,
+                    },ReturnValues="UPDATED_NEW")
+        except ClientError as e:
+            if e.response['Error']['Code'] == "ConditionalCheckFailedException":
+                logger.error(e.response['Error']['Message'])
+            else:
+                print('Failed update the dynamodb by event_id,not Failed Conditional')
+        else:
+            print ('修改成功')
+
+
+    def table_delete(self, table_name):
+        dynamodb = self.session.resource('dynamodb')
+        table = dynamodb.Table(table_name)
+        table.delete()
+        print ('删除表成功')
+
+    def item_delete(self, table_name,user_id,user_name,add_time):
+        dynamodb = self.session.resource('dynamodb')
+        table = dynamodb.Table(table_name)
+        try:
+            print (str(table_name))
+            if str(table_name) == str('user_brand'):
+                table.delete_item(
+                    Key={
+                        'userID': user_id,
+                        'username': user_name,
+
+                    }
+                )
+            else:
+                table.delete_item(
+                    Key={
+                        'userID': user_id,
+                        'addTime': int(add_time),
+                    }
+                )
+            return 'ok'
+        except Exception:
+            logger.error("Failed to put item in to {0}:error{1}".format(table))
+            return 'no'
+
+my = MyserviceDynamodb()
+# print(my.table_delete('user_brand'))
+# print(my.table_delete('user_brand_all'))
+my.table_create('user_brand')
+my.user_brand_all_table_create('user_brand_all')
+# print(my.item_put('user_brand'))
+# table_value = my.get_table('user_brand')
 
 
 class UserBrandInfo(View):
@@ -82,10 +383,6 @@ class UserBrandInfo(View):
         else:
             return response.json(309)
             # 获取外网IP
-
-
-
-
     # http://192.168.136.39:8000/userbrandinfo?operation=add&token=test&deviceSupplier=小米&deviceModel=HM NOTE 1TD&osType=WEB&osVersion=4.0.0
     def add_info(self, request_dict, userID,response):
         deviceSupplier = request_dict.get('deviceSupplier', None)
@@ -100,14 +397,16 @@ class UserBrandInfo(View):
         longitude = request_dict.get('longitude', None)
         latitude = request_dict.get('latitude', None)
         appId = request_dict.get('appId', None)
+        if city is None:
+            city='无'
         if area is None:
-            area=''
+            area=''
         if street is None:
-            street=''
+            street=''
         if longitude is None:
-            longitude=''
+            longitude=''
         if latitude is None:
-            latitude=''
+            latitude=''
         param_area = CommonService.get_param_flag(data=[country,province,city])
         must_fill_in = CommonService.get_param_flag(data=[appId,deviceSupplier,deviceModel,osType,osVersion])
         if must_fill_in is False:
@@ -127,85 +426,86 @@ class UserBrandInfo(View):
             latitude = latitude
         else:
             status = 0
-            print (self.clientIP)
             jsonData = CommonService.getIpIpInfo(ip=self.clientIP,lang='CN')
             country = jsonData['country_name']
             province = jsonData['region_name']
             city = jsonData['city_name']
-            area = ''
-            street = ''
+            area = ''
+            street = ''
             longitude = jsonData['longitude']
             latitude = jsonData['latitude']
+            if country == '局域网':
+                city = '无'
+                longitude = '无'
+                latitude = '无'
+
         param_flag = CommonService.get_param_flag(data=[deviceSupplier, deviceModel,osType,osVersion])
         if param_flag is True:
             try:
-                user_brand = User_Brand(
-                    userID=Device_User.objects.get(userID=userID),
-                    deviceSupplier=deviceSupplier,
-                    deviceModel=deviceModel,
-                    osType=osType,
-                    osVersion=osVersion,
-                    ip=self.clientIP,
-                    addTime = int(time.time()),
-                    status = status,
-                    country = country,
-                    province = province,
-                    city = city,
-                    appId=appId,
-                    area = area,
-                    street = street,
-                    longitude = longitude,
-                    latitude = latitude
-                )
-                user_brand.save()
+                addTime = int(time.time())
+                username = Device_User.objects.filter(userID = userID).values('userID','username','NickName')
+                username = username[0]['username']
+                # 添加user_brand_all表信息
+                my.user_brand_all_item_put('user_brand_all', userID, addTime, username, deviceSupplier, deviceModel, osType, osVersion,
+                                country, province, city, area, street, longitude , latitude, appId, status,
+                                        self.clientIP)
+                # 添加user_brand表信息
+                my.user_brand_item_put('user_brand', userID, username, addTime, deviceSupplier, deviceModel,
+                            osType, osVersion,
+                            country, province, city, area, street, longitude, latitude, appId, status,
+                            self.clientIP)
             except Exception:
                 errorInfo = traceback.format_exc()
-                print(errorInfo)
                 return response.json(424, {'details': errorInfo})
             else:
-                print(type(user_brand.addTime))
-                return response.json(0,{'id':user_brand.id})
+                return response.json(0,{'OK':int(time.time())})
         else:
             # 参数错误
              return response.json(444)
 
 
-
-
+    # 查询每个用户的最新登录数据表
     def query_info(self, request_dict, userID,response):
         page = int(request_dict.get('page', None))
         line = int(request_dict.get('line', None))
         username = request_dict.get('username',None)
-
         param_flag = CommonService.get_param_flag(data=[page, line])
         if param_flag is True:
             check_perm = ModelService.check_perm(userID=userID,permID=30)
+            page_value=[]
             if check_perm is True:
                 if username is None or username is '':
-                    user_brand_queryset = User_Brand.objects.all().order_by( '-id')
+                    table_Limit_value = my.get_page_line(page,line,'user_brand',None,page_value)
+                    # 页面减1
+                    page = page-1
+                    if page < len(table_Limit_value):
+                        # 分页后查询
+                        table_value = my.item_get(line,'user_brand',table_Limit_value[page])
+                        table_value = table_value['Items']
+                        count = my.item_get_count('user_brand')
+                        return response.json(0, {'datas': table_value, 'count': count})
                 else:
-                    user_brand_queryset = User_Brand.objects.filter(userID__username=username).order_by('-id')
-                if user_brand_queryset.exists():
-                    count = user_brand_queryset.count()
-                    res = user_brand_queryset[(page - 1) * line:page * line]
-                    send_json = CommonService.qs_to_dict(res)
-                    for k, v in enumerate(send_json["datas"]):
-                            username = ModelService.get_user_name(userID=send_json["datas"][k]['fields']['userID'])
-                            send_json["datas"][k]['fields']['username']=username
-                    send_json['count'] = count
-                    return response.json(0, send_json)
-                return response.json(0, {'datas': [], 'count': 0})
+                    # 查询
+                    table_value = my.get_item('user_brand', 'username' ,username)
+                    return response.json(0, {'datas': table_value, 'count': len(table_value)})
             else:
                 return response.json(404)
         else:
             return response.json(444)
     def delete_by_admin(self, request_dict, userID,response):
-        id_list = request_dict.getlist('id', None)
-        param_flag = CommonService.get_param_flag(data=[id_list])
+        id = request_dict.getlist('id', None)
+        username = request_dict.getlist('username', None)
+        add_time = request_dict.getlist('add_time', None)
+        table_name =  request_dict.getlist('table_name', None)
+        param_flag = CommonService.get_param_flag(data=[id,username])
+        print (table_name[0])
         if param_flag is True:
             check_perm = ModelService.check_perm(userID=userID, permID=10)
             if check_perm is True:
-                is_delete = User_Brand.objects.filter(id__in=id_list).delete()
+                if str(table_name[0])==str('user_brand'):
+                    is_delete = my.item_delete('user_brand', id[0],username[0], add_time[0])
+                else:
+                    is_delete = my.item_delete('user_brand_all', id[0],add_time[0], add_time[0])
                 return response.json(0, {'delete_count': is_delete[0]})
             else:
                 return response.json(404)
@@ -219,26 +519,20 @@ class UserBrandInfo(View):
         param_flag = CommonService.get_param_flag(data=[page, line])
         if param_flag is True:
             check_perm = ModelService.check_perm(userID=userID,permID=30)
+            page_value = []
             if check_perm is True:
                 if username is None or username is '':
-                # 按照用户去重复查询
-                # 分页
-                    user_brand_queryset = User_Brand.objects.all().order_by('userID').values_list('userID', flat=True).distinct()[(page - 1) * line:page * line]
+                    table_Limit_value = my.get_page_line(page, line, 'user_brand_all', None, page_value)
+                    page = page - 1
+                    if page < len(table_Limit_value):
+                        table_value = my.item_get(line, 'user_brand_all', table_Limit_value[page])
+                        table_value = table_value['Items']
+                        count = my.item_get_count('user_brand_all')
+                        return response.json(0, {'datas': table_value, 'count': count})
                 else:
-                    # 分页
-                    user_brand_queryset = User_Brand.objects.filter(userID__username=username).order_by('userID').values_list( 'userID',flat=True).distinct()[(page - 1) * line:page * line]
-                send_jsons=[]
-                counts=0
-                for i in user_brand_queryset:
-                    counts=counts+1
-                    user_brand_querysetlast = User_Brand.objects.filter(userID=i).order_by('-addTime')[:1]
-                    user_brand_querysetlast = CommonService.qs_to_dict(user_brand_querysetlast)
-                    username = ModelService.get_user_name(userID=user_brand_querysetlast["datas"][0]['fields']['userID'])
-                    user_brand_querysetlast["datas"][0]['fields']['username']=username
-                    send_jsons.append(user_brand_querysetlast["datas"][0])
-                # 按照加入的日期排序
-                send_jsons = sorted(send_jsons, key=lambda x:x['fields']['addTime'], reverse=True)
-                return response.json(0, {'datas': send_jsons, 'count': counts})
+                    # 查询
+                    table_value = my.get_item('user_brand_all', 'username', username)
+                    return response.json(0, {'datas': table_value, 'count': len(table_value)})
             else:
                 return response.json(404)
         else:
@@ -247,53 +541,17 @@ class UserBrandInfo(View):
     # 品牌统计的接口
     def query_deviceSupplier_info(self, request_dict, userID,response):
         check_perm = ModelService.check_perm(userID=userID,permID=30)
-        print ('---------------------------------------------------------')
         if check_perm is True:
-            # 按照用户去重复查询
-            user_brand_queryset = User_Brand.objects.all().order_by('userID').values_list('userID', flat=True).distinct()
-            send_jsons=[]
-            counts=0
-            for i in user_brand_queryset:
-                counts=counts+1
-                user_brand_querysetlast = User_Brand.objects.filter(userID=i).order_by('-addTime')[:1]
-                user_brand_querysetlast = CommonService.qs_to_dict(user_brand_querysetlast)
-                username = ModelService.get_user_name(userID=user_brand_querysetlast["datas"][0]['fields']['userID'])
-                user_brand_querysetlast["datas"][0]['fields']['username']=username
-                send_jsons.append(user_brand_querysetlast["datas"][0])
-            deviceSupplier=[]
-            for k, v in enumerate(send_jsons):
-                deviceSupplier.append(v['fields']['deviceSupplier'])
-            # user_brand_queryset = User_Brand.objects.all().order_by('userID').values_list('userID', flat=True).distinct()
-            # send_jsons=[]
-            # counts=0
-            # for i in user_brand_queryset:
-            #     counts=counts+1
-            #     user_brand_querysetlast = User_Brand.objects.filter(userID=i).order_by('-addTime')[:1]
-            #     user_brand_querysetlast = CommonService.qs_to_dict(user_brand_querysetlast)
-            #     username = ModelService.get_user_name(userID=user_brand_querysetlast["datas"][0]['fields']['userID'])
-            #     user_brand_querysetlast["datas"][0]['fields']['username']=username
-            #     send_jsons.append(user_brand_querysetlast["datas"][0])
-            # deviceSupplier=[]
-            # for k, v in enumerate(send_jsons):
-            #     deviceSupplier.append(v['fields']['deviceSupplier'])
-            # deviceSupplier = Counter(deviceSupplier)
-            # return response.json(0, {'datas': deviceSupplier,'counts':counts})
-
-            ub_qs = User_Brand.objects.filter().values('userID','deviceSupplier').order_by('userID').order_by('userID').distinct()
-            count = ub_qs.count()
-            print (ub_qs)
-            print ('---------------------------------------------------------')
-            deviceSupplier = []
-            ub_ql = CommonService.qs_to_list(ub_qs)
-
-            for v in ub_qs:
-                deviceSupplier.append(v['deviceSupplier'])
-                print (v)
-            deviceSupplier = Counter(deviceSupplier)
-            return response.json(0, {'datas': deviceSupplier,'counts':counts})
-            print(deviceSupplier)
-            print ('---------------------------------------------------------')
-            return response.json(0, {'datas': deviceSupplier, 'counts': count})
+            table_value = my.item_get_brand('user_brand')
+            send_jsons = []
+            counts = 0
+            for i in table_value:
+                send_jsons.append(table_value[counts]['deviceSupplier'])
+                counts = counts + 1
+            send_jsons = Counter(send_jsons)
+
+            return response.json(0, {'datas': send_jsons,'counts':counts})
+
         else:
             return response.json(404)
     # 区域统计的接口
@@ -302,22 +560,14 @@ class UserBrandInfo(View):
         district = request_dict.get('district', None)
         if check_perm is True:
             # 按照用户去重复查询
-            user_brand_queryset = User_Brand.objects.all().order_by('userID').values_list('userID', flat=True).distinct()
-            send_jsons=[]
-            counts=0
-            for i in user_brand_queryset:
-                counts=counts+1
-                user_brand_querysetlast = User_Brand.objects.filter(userID=i).order_by('-addTime')[:1]
-                user_brand_querysetlast = CommonService.qs_to_dict(user_brand_querysetlast)
-                username = ModelService.get_user_name(userID=user_brand_querysetlast["datas"][0]['fields']['userID'])
-                user_brand_querysetlast["datas"][0]['fields']['username']=username
-                send_jsons.append(user_brand_querysetlast["datas"][0])
-            deviceSupplier=[]
-            for k, v in enumerate(send_jsons):
-                deviceSupplier.append(v['fields'][district])
-            deviceSupplier = Counter(deviceSupplier)
-            return response.json(0, {'datas': deviceSupplier,'counts':counts})
-
+            table_value = my.item_get_brand('user_brand')
+            send_jsons = []
+            counts = 0
+            for  i in table_value:
+                send_jsons.append(table_value[counts][district])
+                counts = counts + 1
+            send_jsons = Counter(send_jsons)
+            return response.json(0, {'datas': send_jsons, 'counts': counts})
         else:
             return response.json(404)
 

+ 1 - 1
Service/CommonService.py

@@ -50,7 +50,7 @@ class CommonService:
 
     @staticmethod
     def get_param_flag(data=[]):
-        print(data)
+        # print(data)
         flag = True
         for v in data:
             if v is None: