# @Author : Rocky # @File : IPWeatherObject.py # @Time : 2023/8/16 8:56 import datetime import json import geoip2.webservice import requests from Model.models import IPAddr, OpenWeatherMapCallCount from Object.AWS.S3Email import S3Email from Object.RedisObject import RedisObject from Service.CommonService import CommonService from Ansjer.config import LOGGER, CONFIG_INFO class IPQuery: """ 阿里云IP地址查询 https://market.aliyun.com/products/57002003/cmapi021970.html?spm=5176.2020520132.recommend-card.dbottom-suggestion.33e17218GYjWDt#sku=yuncode15970000020 """ def __init__(self, ip): self.appcode = 'd7d63b34b1d54214be446608a57ff0a2' self.host = 'https://c2ba.api.huachen.cn' self.path = '/ip' self.district = '' # 区 self.city = '' # 市 self.region = '' # 省/州 self.country_id = '' param = 'ip=' + ip url = self.host + self.path + '?' + param # 获取ip的区级和城市信息 headers = {'Authorization': 'APPCODE ' + self.appcode} res = requests.get(url=url, headers=headers) if res.status_code == 200: # 反序列化响应数据 res_data = eval(res.content.decode('utf-8')) if res_data['ret'] == 200: district = res_data['data']['district'] city = res_data['data']['city'] region = res_data['data']['region'] country_id = res_data['data']['country_id'] # 国内城市数据不为空字符,拼上'市'字 if country_id == 'CN' and city != '': city += '市' # ip地址信息存表或更新 ip_addr_qs = IPAddr.objects.filter(ip=ip, is_geoip2=False) if ip_addr_qs.exists(): ip_addr_qs.update(district=district, city=city, region=region, country_code=country_id) else: IPAddr.objects.create(ip=ip, district=district, city=city, region=region, country_code=country_id) self.city = city # 市 self.region = region # 省/州 if district != '': self.district = district elif city != '': self.district = city self.country_id = country_id class WeatherInfo: """ 阿里云墨迹天气服务 https://market.aliyun.com/products/57096001/cmapi013828.html?spm=5176.2020520132.101.19.2b8f7218NuiGPd#sku=yuncode782800000 """ def __init__(self, city_id): self.appcode = 'd7d63b34b1d54214be446608a57ff0a2' self.headers = {'Authorization': 'APPCODE ' + self.appcode, 'Content-Type': 'application/x-www-form-urlencoded'} self.city_id = city_id def get_city_weather(self): url = "https://aliv18.data.moji.com/whapi/json/alicityweather/condition" # 获取实时天气 data = {'cityId': self.city_id} response = requests.post(url, headers=self.headers, data=data, verify=False) if response.status_code == 200: result = response.json() if result['code'] == 0: # 返回温湿度 return result['data']['condition']['temp'], result['data']['condition']['humidity'] return None, None def get_city_24_weather(self): url = 'https://aliv18.data.moji.com/whapi/json/alicityweather/forecast24hours' # 获取城市24小时天气 data = {'cityId': self.city_id} response = requests.post(url, headers=self.headers, data=data, verify=False) if response.status_code == 200: result = response.json() if result['code'] == 0: # 返回天气列表 return result['data']['hourly'] return None class GeoIP2: """ MaxMind GeoIP2查询国外ip 同时保存ip信息 https://www.maxmind.com/ """ def __init__(self, ip): self.account_id = 938644 self.license_key = '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' self.license_key_sandbox = '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' try: with geoip2.webservice.Client(self.account_id, self.license_key) as client: # You can also use `client.city` or `client.insights` # `client.insights` is not available to GeoLite2 users response = client.city(ip) # 经纬度精确到小数点两位 lat = round(response.location.latitude, 2) lon = round(response.location.longitude, 2) # 获取中文或英文城市名,省/州 city = '' city_names = response.city.names city_cn = city_names.get('zh-CN') if city_cn: city = city_cn elif city_names.get('en'): city = city_names['en'] region = '' subdivisions_names = response.subdivisions[0].names region_cn = subdivisions_names.get('zh-CN') if region_cn: region = region_cn elif subdivisions_names.get('en'): region = subdivisions_names['en'] country_code = response.country.iso_code # 保存ip信息 ip_addr_data = { 'ip': ip, 'lat': lat, 'lon': lon, 'city': city, 'region': region, 'country_code': country_code, 'is_geoip2': True } IPAddr.objects.create(**ip_addr_data) except Exception as e: LOGGER.info('GeoIP2解析ip异常:error_line:{}, error_msg:{}'.format(e.__traceback__.tb_lineno, repr(e))) class OpenWeatherMap: """ OpenWeatherMap查询国外天气服务 https://openweathermap.org/ """ def __init__(self, lat, lon): self.appid = '7a6cd7dfeb034ededa451ed575788857' self.lat = lat # 纬度 self.lon = lon # 经度 def get_weather(self): """ 从缓存查询天气数据 或者查询当前天气,并缓存数据 @return: temp, humidity """ # 查询缓存数据 today = datetime.datetime.today() now_time = datetime.datetime(today.year, today.month, today.day, today.hour) str_time = now_time.strftime('%Y-%m-%d %H:%M:%S') time_stamp = CommonService.str_to_timestamp(str_time) key = 'weather:lat:{}_lon:{}_time_stamp:{}'.format(self.lat, self.lon, time_stamp) redis_obj = RedisObject() weather = redis_obj.get_data(key) if weather: temp, humidity = weather.split('/') else: temp, humidity = self.get_current_weather(str_time[:7]) if temp is not None and humidity is not None: key = 'weather:lat:{}_lon:{}_time_stamp:{}'.format(self.lat, self.lon, time_stamp) redis_obj.set_ex_data(key, '{}/{}'.format(temp, humidity), 3600) return temp, humidity def get_current_weather(self, month): """ 根据经纬度获取当前天气 @param month: 年月份 @return: temp, humidity """ url = 'https://api.openweathermap.org/data/2.5/weather' params = { 'lat': self.lat, 'lon': self.lon, 'appid': self.appid, 'units': 'metric' # 公制单位,温度单位:摄氏度 } res = requests.get(url=url, params=params, timeout=10) # 记录调用次数 open_weather_map_call_count_qs = OpenWeatherMapCallCount.objects.filter(month=month).values('count') if not open_weather_map_call_count_qs.exists(): OpenWeatherMapCallCount.objects.create(month=month) else: count = open_weather_map_call_count_qs[0]['count'] + 1 open_weather_map_call_count_qs.update(count=count) # 调用次数超过750,000,邮件提醒 warning_count = 750000 if count > warning_count: redis_obj = RedisObject() key = 'open_weather_map_call_count_warning_time_limit' time_limit = redis_obj.get_data(key) if not time_limit: # 限制一天提醒一次 redis_obj.set_data(key, 1, 60 * 60 * 24) subject = '邮件提醒' data = '{}服open weather调用次数大于{}'.format(CONFIG_INFO, warning_count) S3Email().send_email(subject, data, 'servers@ansjer.com') if res.status_code != 200: return None, None res_data = eval(res.text) if res_data['cod'] != 200: return None, None temp = str(int(res_data['main']['temp'])) humidity = str(int(res_data['main']['humidity'])) return temp, humidity class Findip: """ findip 查询国外ip 同时保存ip信息 https://www.findip.net/Main/Index """ def __init__(self, ip): self.token = 'bb51156441ec401caec0476864ec2b93' self.ip = ip self.lat = 0.0 self.lon = 0.0 self.city = '' self.region = '' self.country_code = '' try: LOGGER.info('FindIP解析ip:{}'.format(self.ip)) # 调用findip API获取IP信息 self._get_ip_info() # 保存ip信息 ip_addr_data = { 'ip': self.ip, 'lat': self.lat, 'lon': self.lon, 'city': self.city, 'region': self.region, 'country_code': self.country_code, 'is_geoip2': True # 是否为GeoIP2解析 } LOGGER.info('FindIP解析ip成功:{}'.format(ip_addr_data)) IPAddr.objects.create(**ip_addr_data) except Exception as e: LOGGER.error('FindIP解析ip异常:error_line:{}, error_msg:{}'.format( e.__traceback__.tb_lineno, repr(e) )) def _get_ip_info(self): """调用findip API并解析返回结果""" try: # 构建API请求URL url = f"https://api.findip.net/{self.ip}/?token={self.token}" response = requests.get(url, timeout=10) # 检查请求是否成功 if response.status_code == 200: data = response.json() self._parse_response(data) else: raise Exception(f"API请求失败,状态码: {response.status_code}") except requests.RequestException as re: raise Exception(f"请求异常: {str(re)}") except json.JSONDecodeError as jde: raise Exception(f"JSON解析异常: {str(jde)}") def _parse_response(self, data): """解析API返回的JSON数据并提取所需字段""" # 提取经纬度并保留两位小数 if 'location' in data and data['location']: self.lat = round(data['location'].get('latitude', 0.0), 2) self.lon = round(data['location'].get('longitude', 0.0), 2) # 提取国家代码 if 'country' in data and data['country'] and 'iso_code' in data['country']: self.country_code = data['country']['iso_code'] # 提取城市名称(优先中文,其次英文) if 'city' in data and data['city'] and 'names' in data['city']: names = data['city']['names'] self.city = names.get('zh-CN', names.get('en', '')) # 提取地区名称(优先中文,其次英文) if 'subdivisions' in data and data['subdivisions']: # 取第一个细分区域(通常是省/州级别) first_subdivision = data['subdivisions'][0] if 'names' in first_subdivision: names = first_subdivision['names'] self.region = names.get('zh-CN', names.get('en', ''))