import redis from Ansjer.config import SERVER_HOST # 本地调试把注释打开 # SERVER_HOST = '127.0.0.1' class RedisObject: def __init__(self, db=0, host=SERVER_HOST): self.POOL = redis.ConnectionPool(host=host, port=6379, db=db) self.CONN = redis.Redis(connection_pool=self.POOL) def set_data(self, key, val, expire=0): try: self.CONN.set(key, val) if expire > 0: self.CONN.expire(key, expire) except Exception as e: return False else: return True def set_expire(self, key, ttl): self.CONN.expire(key, ttl) def get_data(self, key): try: val = self.CONN.get(key) except Exception as e: print(repr(e)) return False else: if val: return val.decode('utf-8') else: return False def del_data(self, key): try: val = self.CONN.delete(key) except Exception as e: print(repr(e)) return False else: return True def get_size(self): return self.CONN.dbsize() # 向列表插入数据 def rpush(self, name, val): self.CONN.rpush(name, val) # 向列表插入列表数据 def rpush_list(self, name, list_val): self.CONN.rpush(name, *list_val) def lpop(self, name): val = self.CONN.lpop(name) if val: return val.decode('utf-8') else: return False # 获取列表长度 def llen(self, name): return self.CONN.llen(name=name) # 获取列表所有数据 def lrange(self, name, start, end): return self.CONN.lrange(name, start, end) # 删除列表指定数据 def lrem(self, name, num, value): """ num:列表方向,删除个数(0:所有) value:删除的值 """ return self.CONN.lrem(name, num, value) def get_ttl(self, key): ttl = self.CONN.ttl(key) if ttl: return ttl else: return 0 def get_keys(self, key): keys = self.CONN.keys(key) if keys: return keys else: return False def set_ex_data(self, key, val, expire=0): try: self.CONN.setex(name=key, time=expire, value=val) except Exception as e: return False else: return True def set_hash_data(self, key, kwargs): self.CONN.hmset(key, kwargs) def get_hash_data(self, key, file): value = self.CONN.hmget(key, file)[0] if value: return value.decode('utf-8') else: return False def get_all_hash_data(self, key): return self.CONN.hgetall(key) def set_persist(self, key): return self.CONN.persist(key) def try_lock(self, lock_key, request_id, expire, time_unit_second): """ 获取分布式锁 原子性 :param lock_key: 锁的key :param request_id: 请求id :param expire: 锁过期时间 :param time_unit_second: 时间单位(秒) :return: True or False """ try: # 使用set命令尝试获取锁,并设置nx=True result = self.CONN.set(lock_key, request_id, ex=time_unit_second * expire, nx=True) return result is not None except Exception as e: print("redis lock error.", e) return False def release_lock(self, lock_key, request_id): """ 释放锁 :param lock_key: 锁的key :param request_id: 请求id :return: True or False """ try: # 使用Lua脚本来释放锁,避免在执行脚本时发生异常导致锁无法释放 lua_script = """ if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end """ result = self.CONN.eval(lua_script, 1, lock_key, request_id) return result == 1 except Exception as e: print("redis unlock error.", e) return False