123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110 |
- import redis
- from Ansjer.config import SERVER_HOST
- # 本地调试把注释打开
- # SERVER_HOST = '127.0.0.1'
- class RedisObject:
- def __init__(self, db=0):
- self.POOL = redis.ConnectionPool(host=SERVER_HOST, port=6379, db=db)
- self.CONN = redis.Redis(connection_pool=self.POOL)
- def set_data(self, key, val, expire=0):
- try:
- self.CONN.set(key, val)
- if expire > 0:
- self.CONN.expire(key, expire)
- except Exception as e:
- return False
- else:
- return True
- def set_expire(self, key, ttl):
- self.CONN.expire(key, ttl)
- def get_data(self, key):
- try:
- val = self.CONN.get(key)
- except Exception as e:
- print(repr(e))
- return False
- else:
- if val:
- return val.decode('utf-8')
- else:
- return False
- def del_data(self, key):
- try:
- val = self.CONN.delete(key)
- except Exception as e:
- print(repr(e))
- return False
- else:
- return True
- def get_size(self):
- return self.CONN.dbsize()
- # 向列表插入数据
- def rpush(self, name, val):
- self.CONN.rpush(name, val)
- def lpop(self, name):
- val = self.CONN.lpop(name)
- if val:
- return val.decode('utf-8')
- else:
- return False
- # 获取列表长度
- def llen(self, name):
- return self.CONN.llen(name=name)
- # 获取列表所有数据
- def lrange(self, name, start, end):
- return self.CONN.lrange(name, start, end)
- # 删除列表指定数据
- def lrem(self, name, num, value):
- """
- num:列表方向,删除个数(0:所有)
- value:删除的值
- """
- return self.CONN.lrem(name, num, value)
- def get_ttl(self, key):
- ttl = self.CONN.ttl(key)
- if ttl:
- return ttl
- else:
- return 0
- def get_keys(self, key):
- keys = self.CONN.keys(key)
- if keys:
- return keys
- else:
- return False
- def set_ex_data(self, key, val, expire=0):
- try:
- self.CONN.setex(name=key, time=expire, value=val)
- except Exception as e:
- return False
- else:
- return True
- def set_hash_data(self, key, kwargs):
- self.CONN.hmset(key, kwargs)
- def get_hash_data(self, key, file):
- value = self.CONN.hmget(key, file)[0]
- if value:
- return value.decode('utf-8')
- else:
- return False
- def get_all_hash_data(self, key):
- return self.CONN.hgetall(key)
|