| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198 | #!/usr/bin/env python3# -*- coding: utf-8 -*-import jsonimport timeimport urllibimport uuidimport boto3import threadingimport loggingfrom boto3.session import Sessionfrom django.http import JsonResponse, HttpResponseRedirect, HttpResponsefrom django.views.generic.base import Viewfrom Model.models import Device_Info, Role, MenuModelfrom Object.ResponseObject import ResponseObjectfrom Object.TokenObject import TokenObjectfrom Object.UidTokenObject import UidTokenObjectfrom Service.CommonService import CommonServicefrom django.db.models import Q, Ffrom time import strftimefrom Ansjer.config import BASE_DIRfrom Object import MergePicimport osimport base64class TestServeView(View):    def get(self, request, *args, **kwargs):        request.encoding = 'utf-8'        operation = kwargs.get('operation')        print("--------------------get")        return self.validation(request.GET, request, operation)    def post(self, request, *args, **kwargs):        request.encoding = 'utf-8'        operation = kwargs.get('operation')        print("--------------------post")        return self.validation(request.POST, request, operation)    def validation(self, request_dict, request, operation):        language = request_dict.get('language', 'en')        response = ResponseObject(language, 'pc')        if operation == '??':            return 0        else:            tko = TokenObject(                request.META.get('HTTP_AUTHORIZATION'),                returntpye='pc')            # if tko.code != 0:            #     return response.json(tko.code)            response.lang = tko.lang            userID = tko.userID            if operation == 'AItest':                return self.AItest(userID, request, request_dict, response)            if operation == 'doEdit':                return self.doEdit(userID, request_dict, response)            if operation == 'doDelete':                return self.doDelete(userID, request_dict, response)            else:                return response.json(404)    def AItest(self, userID, request, request_dict, response):        # print("--------------request_dict")        # print(request_dict)        file_post_one = request_dict.get('file_one', None)        file_post = file_post_one.replace(' ','+')        # file_decode = base64.b64decode(file_post)        file_post_two = request_dict.get('file_two', None)        file_post_two = file_post_two.replace(' ', '+')        # file_decode_two = base64.b64decode(file_post_two)        now_time = int(time.time())        dir_path = os.path.join(BASE_DIR, 'static/ai/')        if not os.path.exists(dir_path):            os.makedirs(dir_path)        file_path_one = dir_path + 'one'+ str(now_time) + '.jpeg'        file_path_two = dir_path + 'two'+ str(now_time) + '.jpeg'        file_list = ['one','two']        for index in file_list:            file_path = dir_path + index + str(now_time) + '.jpeg'            with open(file_path, 'wb') as f:                # file_byte = file_post.encode('utf-8')                f.write(file_decode)        return HttpResponse("seccess")        file2 = request.FILES.get('file_one', None)        file3 = request.FILES.get('file_two', None)        file4 = request.FILES.get('file_three', None)        print('--------------------------file')        print(file)        print('===========================post_file')        print(file_post)        post_file_list = [file, file2, file3, file4]        file_list = []        for index in range(len(post_file_list)):            if post_file_list[index]:                file_list.append(post_file_list[index])        del post_file_list        if len(file_list) > 1:            merge = []            now_time = int(time.time())            dir_path = os.path.join(BASE_DIR, 'static/', str(now_time))            if not os.path.exists(dir_path):                os.makedirs(dir_path)            for item in file_list:                if hasattr(item, 'name'):                    file_path = os.path.join(dir_path, item.name)                    with open(file_path, 'wb') as f:                        for c in item.chunks():                            f.write(c)                            merge.append(file_path)            image_size = 500  # 每张小图片的大小            image_colnum = 2  # 合并成一张图后,一行有几个小图            MergePic.merge_images(dir_path, image_size, image_colnum)            # return HttpResponse(dir_path + '.jpg')            photo = open(dir_path + '.jpg', 'rb')        else:            photo = file_list[0]        maxLabels = int(request_dict.get('maxLabels', 10))        minConfidence = float(request_dict.get('minConfidence', 30))        try:            client = boto3.client(                'rekognition',                aws_access_key_id='AKIA2E67UIMD6JD6TN3J',                aws_secret_access_key='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',                region_name='us-east-1')            # doc:            # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/rekognition.html#Rekognition.Client.detect_labels            rekognition_res = client.detect_labels(                Image={                    'Bytes': photo.read()},                MaxLabels=maxLabels,                MinConfidence=minConfidence)            print('rekognition_res: ', rekognition_res)            return response.json(0, {'rekognition_res': rekognition_res})        except Exception as e:            print(e)            return response.json(500, repr(e))        files1 = request.FILES.get('image1')        files2 = request.FILES.get('image2')        files3 = request.FILES.get('image3')        files4 = request.FILES.get('image4')        file_list = [files1,files2,files3,files4]        merge = []        now_time = int(time.time())        dir_path = os.path.join(BASE_DIR,'static/', str(now_time))        if not os.path.exists(dir_path):            os.makedirs(dir_path)        for item in file_list:            if hasattr(item,'name'):                file_path = os.path.join(dir_path,item.name)                with open(file_path, 'wb') as f:                    for c in item.chunks():                        f.write(c)                        merge.append(file_path)        image_size = 500  # 每张小图片的大小        image_colnum = 2  # 合并成一张图后,一行有几个小图        MergePic.merge_images(dir_path, image_size, image_colnum)        files = open(dir_path+'.jpg','rb')        # return HttpResponse(files.read())        labels = int(request_dict.get('labels',5))        minConfidence = int(request_dict.get('minConfidence',99))        if not files:            return HttpResponse('请上传图片!!!!')        client = boto3.client('rekognition', aws_access_key_id='AKIA2E67UIMD6JD6TN3J',aws_secret_access_key='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',region_name='us-east-1')        # image = open('E:/photo/a615fa40b8c476bab0f6eeb332e62a5a-1000.jpg', "rb")        response = client.detect_labels(Image={'Bytes':files.read()},MaxLabels=labels,MinConfidence=minConfidence)        # for obj in response['Labels']:        #     exit(obj)        #     if obj['Name'] == 'Person':        #         jsonstr = json.dumps(obj)        return HttpResponse(json.dumps(response, ensure_ascii=False),                            content_type="application/json,charset=utf-8")
 |