| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212 | import redisfrom Ansjer.config import SERVER_HOST# 本地调试把注释打开# SERVER_HOST = '127.0.0.1'class RedisObject:    def __init__(self, db=0, host=SERVER_HOST):        self.POOL = redis.ConnectionPool(host=host, port=6379, db=db)        self.CONN = redis.Redis(connection_pool=self.POOL)    def set_data(self, key, val, expire=0):        try:            self.CONN.set(key, val)            if expire > 0:                self.CONN.expire(key, expire)        except Exception as e:            return False        else:            return True    def set_expire(self, key, ttl):        self.CONN.expire(key, ttl)    def get_data(self, key):        try:            val = self.CONN.get(key)        except Exception as e:            print(repr(e))            return False        else:            if val:                return val.decode('utf-8')            else:                return False    def del_data(self, key):        try:            val = self.CONN.delete(key)        except Exception as e:            print(repr(e))            return False        else:            return True    def get_size(self):        return self.CONN.dbsize()    # 向列表插入数据    def rpush(self, name, val):        self.CONN.rpush(name, val)    # 向列表插入列表数据    def rpush_list(self, name, list_val):        self.CONN.rpush(name, *list_val)    def lpop(self, name):        val = self.CONN.lpop(name)        if val:            return val.decode('utf-8')        else:            return False    # 获取列表长度    def llen(self, name):        return self.CONN.llen(name=name)    # 获取列表所有数据    def lrange(self, name, start, end):        return self.CONN.lrange(name, start, end)    # 删除列表指定数据    def lrem(self, name, num, value):        """        num:列表方向,删除个数(0:所有)        value:删除的值        """        return self.CONN.lrem(name, num, value)    def get_ttl(self, key):        ttl = self.CONN.ttl(key)        if ttl:            return ttl        else:            return 0    def get_keys(self, key):        keys = self.CONN.keys(key)        if keys:            return keys        else:            return False    def set_ex_data(self, key, val, expire=0):        try:            self.CONN.setex(name=key, time=expire, value=val)        except Exception as e:            return False        else:            return True    def set_hash_data(self, key, kwargs):        self.CONN.hmset(key, kwargs)    def get_hash_data(self, key, file):        value = self.CONN.hmget(key, file)[0]        if value:            return value.decode('utf-8')        else:            return False    def get_all_hash_data(self, key):        return self.CONN.hgetall(key)    def set_persist(self, key):        return self.CONN.persist(key)    def try_lock(self, lock_key, request_id, expire, time_unit_second):        """        获取分布式锁 原子性        :param lock_key: 锁的key        :param request_id: 请求id        :param expire: 锁过期时间        :param time_unit_second: 时间单位(秒)        :return: True or False        """        try:            # 使用set命令尝试获取锁,并设置nx=True            result = self.CONN.set(lock_key, request_id, ex=time_unit_second * expire, nx=True)            return result is not None        except Exception as e:            print("redis lock error.", e)            return False    def release_lock(self, lock_key, request_id):        """        释放锁        :param lock_key: 锁的key        :param request_id: 请求id        :return: True or False        """        try:            # 使用Lua脚本来释放锁,避免在执行脚本时发生异常导致锁无法释放            lua_script = """            if redis.call('get', KEYS[1]) == ARGV[1] then                return redis.call('del', KEYS[1])            else                return 0            end            """            result = self.CONN.eval(lua_script, 1, lock_key, request_id)            return result == 1        except Exception as e:            print("redis unlock error.", e)            return False    def incr(self, key, amount=1, ttl=0):        """        增加计数器的值        :param key: 键名,用于存储计数器的 Redis 键        :param amount: 增加的数量,默认为 1        :param ttl: 键的过期时间(秒)        :return: 更新后的计数值,若发生异常则返回 False        """        try:            # 增加计数器            result = self.CONN.incrby(key, amount)            # 设置过期时间            if ttl > 0:                self.CONN.expire(key, ttl)            return result        except Exception as e:            print(repr(e))            return False    def hash_field_increment(self, key, field, val=1, ttl=0):        """        哈希表中字段增量        :param key: redis的key        :param field: 哈希表字段        :param val: 增量        :param ttl: 键的过期时间(秒)        :return: 成功返回True 异常返回 False        """        try:            # 增加哈希表中指定字段的值            self.CONN.hincrby(key, field, val)            # 如果 ttl 大于 0,则设置过期时间(单位:秒)            if ttl > 0:                self.CONN.expire(key, ttl)            return True        except Exception as e:            # 发生异常时打印错误信息,便于调试            print(f"增值或设置过期时间时发生错误: {repr(e)}")            return False    def cleanup_hash_fields(self, key, field):        """        清理哈希表字段        :param key: redis的key        :param field: 字段名        :return: None        """        try:            self.CONN.hdel(key, field)            return True        except Exception as e:            print(f"清理过期字段时发生错误: {repr(e)}")            return False
 |