Module jumpscale.clients.zdb.client
To get a new ZDB instance
zcl = j.clients.zdb.get("instance", admin=True, mode="user")
This will gets an instance with admin capabilities in user mode
Expand source code
"""
# To get a new ZDB instance
zcl = j.clients.zdb.get("instance", admin=True, mode="user")
# This will gets an instance with admin capabilities in user mode
"""
from jumpscale.clients.base import Client
from jumpscale.core.base import fields
from enum import Enum
import redis
import struct
class Mode(Enum):
    SEQ = "seq"
    USER = "user"
class ZDBClient(Client):
    addr = fields.String(default="localhost")
    port = fields.Integer(default=9900)
    secret_ = fields.String(default="1234567")
    nsname = fields.String(default="test")
    admin = fields.Boolean(default=False)
    mode = fields.Enum(Mode)
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        # if not self.secret_:
        #     self.secret_ = j.core.myenv.adminsecret
        assert len(self.secret_) > 5
        if self.admin:
            self.nsname = "default"
        self.type = "ZDB"
        self._redis = None
        self.nsname = self.nsname.lower().strip()
        # if j.data.bcdb._master:
        #     self._model.trigger_add(self._update_trigger)
    def _update_trigger(self, obj, action, **kwargs):
        if action in ["save", "change"]:
            self._redis = None
    @property
    def redis(self):
        if not self._redis:
            pool = redis.ConnectionPool(
                host=self.addr,
                port=self.port,
                password=self.secret_,
                connection_class=ZDBConnection,
                namespace=self.nsname,
                namespace_password=self.secret_,
                admin=self.admin,
            )
            self._redis = _patch_redis_client(redis.Redis(connection_pool=pool))
        return self._redis
    def _key_encode(self, key):
        if self.mode.value == Mode.SEQ.value:
            if key is None:
                key = ""
            else:
                key = struct.pack("<I", key)
        return key
    def _key_decode(self, key):
        if self.mode.value == Mode.SEQ.value:
            key = struct.unpack("<I", key)[0]
        return key
    def set(self, data, key=None):
        key = key or ""
        key = self._key_encode(key)
        res = self.redis.execute_command("SET", key, data)
        if not res:
            return res
        return self._key_decode(res)
    def get(self, key):
        key = self._key_encode(key)
        return self.redis.execute_command("GET", key)
    def exists(self, key):
        key = self._key_encode(key)
        return self.redis.execute_command("EXISTS", key) == 1
    def delete(self, key):
        if not key:
            raise j.exceptions.Value("key must be provided")
        key = self._key_encode(key)
        self.redis.execute_command("DEL", key)
    def flush(self):
        """
        will remove all data from the database DANGEROUS !!!!
        This is only allowed on private and password protected namespace
        You need to select the namespace before running the command.
        :return:
        """
        if not self.nsname in ["default", "system"]:
            self.redis.execute_command("FLUSH")
    def stop(self):
        pass
    @property
    def nsinfo(self):
        cmd = self.redis.execute_command("NSINFO", self.nsname)
        return _parse_nsinfo(cmd.decode())
    def list(self, key_start=None, reverse=False):
        """
        list all the keys in the namespace
        :param key_start: if specified start to walk from that key instead of the first one, defaults to None
        :param key_start: str, optional
        :param reverse: decide how to walk the namespace
                        if False, walk from older to newer keys
                        if True walk from newer to older keys
                        defaults to False
        :param reverse: bool, optional
        :return: list of keys
        :rtype: [str]
        """
        result = []
        for key, data in self.iterate(key_start=key_start, reverse=reverse, keyonly=True):
            result.append(key)
        return result
    def iterate(self, key_start=None, reverse=False, keyonly=False):
        """
        walk over all the namespace and yield (key,data) for each entries in a namespace
        :param key_start: if specified start to walk from that key instead of the first one, defaults to None
        :param key_start: str, optional
        :param reverse: decide how to walk the namespace
                if False, walk from older to newer keys
                if True walk from newer to older keys
                defaults to False
        :param reverse: bool, optional
        :param keyonly: [description], defaults to False
        :param keyonly: bool, optional
        :raises e: [description]
        """
        next = None
        data = None
        if key_start is not None:
            next = self.redis.execute_command("KEYCUR", self._key_encode(key_start))
            if not keyonly:
                data = self.get(key_start)
            yield (key_start, data)
        CMD = "SCANX" if not reverse else "RSCAN"
        while True:
            try:
                if not next:
                    response = self.redis.execute_command(CMD)
                else:
                    response = self.redis.execute_command(CMD, next)
                # format of the response
                # see https://github.com/threefoldtech/0-db/tree/development#scan
            except redis.ResponseError as e:
                if e.args[0] == "No more data":
                    return
                raise e
            (next, results) = response
            for item in results:
                keyb, size, epoch = item
                key_new = self._key_decode(keyb)
                data = None
                if not keyonly:
                    data = self.redis.execute_command("GET", keyb)
                yield (key_new, data)
    @property
    def count(self):
        """
        :return: return the number of entries in the namespace
        :rtype: int
        """
        return self.nsinfo["entries"]
    def ping(self):
        """
        go to default namespace & ping
        :return:
        """
        return self.redis.ping()
    @property
    def next_id(self):
        """
        :return: return the next id
        :rtype: int
        """
        id_bytes = struct.pack("<I", int(self.nsinfo["next_internal_id"], 16))
        return int.from_bytes(id_bytes, byteorder="big", signed=True)
# Helper functions
def _patch_redis_client(redis):
    # don't auto parse response for set, it's not 100% redis compatible
    # 0-db does return a key after in set
    for cmd in ["SET", "DEL"]:
        if cmd in redis.response_callbacks:
            del redis.response_callbacks[cmd]
    return redis
def _parse_nsinfo(raw):
    def empty(line):
        line = line.strip()
        if len(line) <= 0 or line[0] == "#" or ":" not in line:
            return False
        return True
    info = {}
    for line in filter(empty, raw.splitlines()):
        key, val = line.split(":")
        try:
            val = int(val)
            info[key] = val
            continue
        except ValueError:
            pass
        try:
            val = float(val)
            info[key] = val
            continue
        except ValueError:
            pass
        info[key] = str(val).strip()
    return info
class ZDBConnection(redis.Connection):
    """
    ZDBConnection implement the custom selection of namespace
    on 0-DB
    """
    def __init__(self, namespace=None, namespace_password=None, admin=False, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.namespace = namespace or "default"
        self.namespace_password = namespace_password
        self.admin = admin
    def on_connect(self):
        """
        when a new connection is created, switch to the proper namespace
        """
        self._parser.on_connect(self)
        # if a password is specified, authenticate
        if self.admin and self.password:
            # avoid checking health here -- PING will fail if we try
            # to check the health prior to the AUTH
            self.send_command("AUTH", self.password, check_health=False)
            if redis.connection.nativestr(self.read_response()) != "OK":
                raise redis.connection.AuthenticationError("Invalid Password")
        # if a namespace is specified and it's not default, switch to it
        if self.namespace and not self.admin:
            args = [self.namespace]
            if self.namespace_password:
                args.append(self.namespace_password)
            self.send_command("SELECT", *args)
            if redis.connection.nativestr(self.read_response()) != "OK":
                raise redis.connection.ConnectionError(f"Failed to select namespace {self.namespace}")
class ZDBAdminClient(ZDBClient):
    def auth(self):
        assert self.admin
        if self.secret_:
            # authentication should only happen in zdbadmin client
            self.redis.execute_command("AUTH", self.secret_)
    def namespace_exists(self, name):
        assert self.admin
        self.auth()
        try:
            self.redis.execute_command("NSINFO", name)
            return True
        except Exception as e:
            if not "Namespace not found" in str(e):
                raise j.exceptions.Base("could not check namespace:%s, error:%s" % (name, e))
            return False
    def namespaces_list(self):
        assert self.admin
        self.auth()
        res = self.redis.execute_command("NSLIST")
        return [i.decode() for i in res]
    def namespace_new(self, name, secret=None, maxsize=0, die=False):
        """
        check namespace exists & will return zdb client to that namespace
        :param name:
        :param secret:
        :param maxsize:
        :param die:
        :return:
        """
        assert self.admin
        self.auth()
        if not self.namespace_exists(name):
            self.redis.execute_command("NSNEW", name)
        else:
            if die:
                raise j.exceptions.Base("namespace already exists:%s" % name)
        if secret:
            self.redis.execute_command("NSSET", name, "password", secret)
            self.redis.execute_command("NSSET", name, "public", "no")
        if maxsize != 0:
            self.redis.execute_command("NSSET", name, "maxsize", maxsize)
        ns = j.clients.zdb.client_get(
            name="temp_%s" % name, addr=self.addr, port=self.port, mode=self.mode, secret=secret, namespace=name
        )
        assert ns.ping()
        if secret:
            assert ns.nsinfo["public"] == "no"
        else:
            assert ns.nsinfo["public"] == "yes"
        ns._data.delete()
        return ns
    def namespace_get(self, name, secret=""):
        assert self.admin
        self.auth()
        return self.namespace_new(name, secret)
    def namespace_delete(self, name):
        assert self.admin
        self.auth()
        if self.namespace_exists(name):
            self.redis.execute_command("NSDEL", name)
    def reset(self, ignore=[]):
        """
        dangerous, will remove all namespaces & all data
        :param: list of namespace names not to reset
        :return:
        """
        assert self.admin
        self.auth()
        for name in self.namespaces_list():
            if name not in ["default"] and name not in ignore:
                self.namespace_delete(name)
Classes
class Mode (value, names=None, *, module=None, qualname=None, type=None, start=1)- 
An enumeration.
Expand source code
class Mode(Enum): SEQ = "seq" USER = "user"Ancestors
- enum.Enum
 
Class variables
var SEQvar USER
 class ZDBAdminClient (*args, **kwargs)- 
A simple attribute-based namespace.
SimpleNamespace(**kwargs)
base class implementation for any class with fields which supports getting/setting raw data for any instance fields.
any instance can have an optional name and a parent.
class Person(Base): name = fields.String() age = fields.Float() p = Person(name="ahmed", age="19") print(p.name, p.age)Args
parent_:Base, optional- parent instance. Defaults to None.
 instance_name_:str, optional- instance name. Defaults to None.
 **values- any given field values to initiate the instance with
 
Expand source code
class ZDBAdminClient(ZDBClient): def auth(self): assert self.admin if self.secret_: # authentication should only happen in zdbadmin client self.redis.execute_command("AUTH", self.secret_) def namespace_exists(self, name): assert self.admin self.auth() try: self.redis.execute_command("NSINFO", name) return True except Exception as e: if not "Namespace not found" in str(e): raise j.exceptions.Base("could not check namespace:%s, error:%s" % (name, e)) return False def namespaces_list(self): assert self.admin self.auth() res = self.redis.execute_command("NSLIST") return [i.decode() for i in res] def namespace_new(self, name, secret=None, maxsize=0, die=False): """ check namespace exists & will return zdb client to that namespace :param name: :param secret: :param maxsize: :param die: :return: """ assert self.admin self.auth() if not self.namespace_exists(name): self.redis.execute_command("NSNEW", name) else: if die: raise j.exceptions.Base("namespace already exists:%s" % name) if secret: self.redis.execute_command("NSSET", name, "password", secret) self.redis.execute_command("NSSET", name, "public", "no") if maxsize != 0: self.redis.execute_command("NSSET", name, "maxsize", maxsize) ns = j.clients.zdb.client_get( name="temp_%s" % name, addr=self.addr, port=self.port, mode=self.mode, secret=secret, namespace=name ) assert ns.ping() if secret: assert ns.nsinfo["public"] == "no" else: assert ns.nsinfo["public"] == "yes" ns._data.delete() return ns def namespace_get(self, name, secret=""): assert self.admin self.auth() return self.namespace_new(name, secret) def namespace_delete(self, name): assert self.admin self.auth() if self.namespace_exists(name): self.redis.execute_command("NSDEL", name) def reset(self, ignore=[]): """ dangerous, will remove all namespaces & all data :param: list of namespace names not to reset :return: """ assert self.admin self.auth() for name in self.namespaces_list(): if name not in ["default"] and name not in ignore: self.namespace_delete(name)Ancestors
Methods
def auth(self)- 
Expand source code
def auth(self): assert self.admin if self.secret_: # authentication should only happen in zdbadmin client self.redis.execute_command("AUTH", self.secret_) def namespace_delete(self, name)- 
Expand source code
def namespace_delete(self, name): assert self.admin self.auth() if self.namespace_exists(name): self.redis.execute_command("NSDEL", name) def namespace_exists(self, name)- 
Expand source code
def namespace_exists(self, name): assert self.admin self.auth() try: self.redis.execute_command("NSINFO", name) return True except Exception as e: if not "Namespace not found" in str(e): raise j.exceptions.Base("could not check namespace:%s, error:%s" % (name, e)) return False def namespace_get(self, name, secret='')- 
Expand source code
def namespace_get(self, name, secret=""): assert self.admin self.auth() return self.namespace_new(name, secret) def namespace_new(self, name, secret=None, maxsize=0, die=False)- 
check namespace exists & will return zdb client to that namespace
:param name: :param secret: :param maxsize: :param die: :return:
Expand source code
def namespace_new(self, name, secret=None, maxsize=0, die=False): """ check namespace exists & will return zdb client to that namespace :param name: :param secret: :param maxsize: :param die: :return: """ assert self.admin self.auth() if not self.namespace_exists(name): self.redis.execute_command("NSNEW", name) else: if die: raise j.exceptions.Base("namespace already exists:%s" % name) if secret: self.redis.execute_command("NSSET", name, "password", secret) self.redis.execute_command("NSSET", name, "public", "no") if maxsize != 0: self.redis.execute_command("NSSET", name, "maxsize", maxsize) ns = j.clients.zdb.client_get( name="temp_%s" % name, addr=self.addr, port=self.port, mode=self.mode, secret=secret, namespace=name ) assert ns.ping() if secret: assert ns.nsinfo["public"] == "no" else: assert ns.nsinfo["public"] == "yes" ns._data.delete() return ns def namespaces_list(self)- 
Expand source code
def namespaces_list(self): assert self.admin self.auth() res = self.redis.execute_command("NSLIST") return [i.decode() for i in res] def reset(self, ignore=[])- 
dangerous, will remove all namespaces & all data :param: list of namespace names not to reset :return:
Expand source code
def reset(self, ignore=[]): """ dangerous, will remove all namespaces & all data :param: list of namespace names not to reset :return: """ assert self.admin self.auth() for name in self.namespaces_list(): if name not in ["default"] and name not in ignore: self.namespace_delete(name) 
Inherited members
 class ZDBClient (*args, **kwargs)- 
A simple attribute-based namespace.
SimpleNamespace(**kwargs)
base class implementation for any class with fields which supports getting/setting raw data for any instance fields.
any instance can have an optional name and a parent.
class Person(Base): name = fields.String() age = fields.Float() p = Person(name="ahmed", age="19") print(p.name, p.age)Args
parent_:Base, optional- parent instance. Defaults to None.
 instance_name_:str, optional- instance name. Defaults to None.
 **values- any given field values to initiate the instance with
 
Expand source code
class ZDBClient(Client): addr = fields.String(default="localhost") port = fields.Integer(default=9900) secret_ = fields.String(default="1234567") nsname = fields.String(default="test") admin = fields.Boolean(default=False) mode = fields.Enum(Mode) def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # if not self.secret_: # self.secret_ = j.core.myenv.adminsecret assert len(self.secret_) > 5 if self.admin: self.nsname = "default" self.type = "ZDB" self._redis = None self.nsname = self.nsname.lower().strip() # if j.data.bcdb._master: # self._model.trigger_add(self._update_trigger) def _update_trigger(self, obj, action, **kwargs): if action in ["save", "change"]: self._redis = None @property def redis(self): if not self._redis: pool = redis.ConnectionPool( host=self.addr, port=self.port, password=self.secret_, connection_class=ZDBConnection, namespace=self.nsname, namespace_password=self.secret_, admin=self.admin, ) self._redis = _patch_redis_client(redis.Redis(connection_pool=pool)) return self._redis def _key_encode(self, key): if self.mode.value == Mode.SEQ.value: if key is None: key = "" else: key = struct.pack("<I", key) return key def _key_decode(self, key): if self.mode.value == Mode.SEQ.value: key = struct.unpack("<I", key)[0] return key def set(self, data, key=None): key = key or "" key = self._key_encode(key) res = self.redis.execute_command("SET", key, data) if not res: return res return self._key_decode(res) def get(self, key): key = self._key_encode(key) return self.redis.execute_command("GET", key) def exists(self, key): key = self._key_encode(key) return self.redis.execute_command("EXISTS", key) == 1 def delete(self, key): if not key: raise j.exceptions.Value("key must be provided") key = self._key_encode(key) self.redis.execute_command("DEL", key) def flush(self): """ will remove all data from the database DANGEROUS !!!! This is only allowed on private and password protected namespace You need to select the namespace before running the command. :return: """ if not self.nsname in ["default", "system"]: self.redis.execute_command("FLUSH") def stop(self): pass @property def nsinfo(self): cmd = self.redis.execute_command("NSINFO", self.nsname) return _parse_nsinfo(cmd.decode()) def list(self, key_start=None, reverse=False): """ list all the keys in the namespace :param key_start: if specified start to walk from that key instead of the first one, defaults to None :param key_start: str, optional :param reverse: decide how to walk the namespace if False, walk from older to newer keys if True walk from newer to older keys defaults to False :param reverse: bool, optional :return: list of keys :rtype: [str] """ result = [] for key, data in self.iterate(key_start=key_start, reverse=reverse, keyonly=True): result.append(key) return result def iterate(self, key_start=None, reverse=False, keyonly=False): """ walk over all the namespace and yield (key,data) for each entries in a namespace :param key_start: if specified start to walk from that key instead of the first one, defaults to None :param key_start: str, optional :param reverse: decide how to walk the namespace if False, walk from older to newer keys if True walk from newer to older keys defaults to False :param reverse: bool, optional :param keyonly: [description], defaults to False :param keyonly: bool, optional :raises e: [description] """ next = None data = None if key_start is not None: next = self.redis.execute_command("KEYCUR", self._key_encode(key_start)) if not keyonly: data = self.get(key_start) yield (key_start, data) CMD = "SCANX" if not reverse else "RSCAN" while True: try: if not next: response = self.redis.execute_command(CMD) else: response = self.redis.execute_command(CMD, next) # format of the response # see https://github.com/threefoldtech/0-db/tree/development#scan except redis.ResponseError as e: if e.args[0] == "No more data": return raise e (next, results) = response for item in results: keyb, size, epoch = item key_new = self._key_decode(keyb) data = None if not keyonly: data = self.redis.execute_command("GET", keyb) yield (key_new, data) @property def count(self): """ :return: return the number of entries in the namespace :rtype: int """ return self.nsinfo["entries"] def ping(self): """ go to default namespace & ping :return: """ return self.redis.ping() @property def next_id(self): """ :return: return the next id :rtype: int """ id_bytes = struct.pack("<I", int(self.nsinfo["next_internal_id"], 16)) return int.from_bytes(id_bytes, byteorder="big", signed=True)Ancestors
Subclasses
Instance variables
var addr- 
getter method this property
will call
_get_value, which would if the value is already defined and will get the default value if notReturns
any- the field value
 
Expand source code
def getter(self): """ getter method this property will call `_get_value`, which would if the value is already defined and will get the default value if not Returns: any: the field value """ return self._get_value(name, field) var admin- 
getter method this property
will call
_get_value, which would if the value is already defined and will get the default value if notReturns
any- the field value
 
Expand source code
def getter(self): """ getter method this property will call `_get_value`, which would if the value is already defined and will get the default value if not Returns: any: the field value """ return self._get_value(name, field) var count- 
:return: return the number of entries in the namespace :rtype: int
Expand source code
@property def count(self): """ :return: return the number of entries in the namespace :rtype: int """ return self.nsinfo["entries"] var mode- 
getter method this property
will call
_get_value, which would if the value is already defined and will get the default value if notReturns
any- the field value
 
Expand source code
def getter(self): """ getter method this property will call `_get_value`, which would if the value is already defined and will get the default value if not Returns: any: the field value """ return self._get_value(name, field) var next_id- 
:return: return the next id :rtype: int
Expand source code
@property def next_id(self): """ :return: return the next id :rtype: int """ id_bytes = struct.pack("<I", int(self.nsinfo["next_internal_id"], 16)) return int.from_bytes(id_bytes, byteorder="big", signed=True) var nsinfo- 
Expand source code
@property def nsinfo(self): cmd = self.redis.execute_command("NSINFO", self.nsname) return _parse_nsinfo(cmd.decode()) var nsname- 
getter method this property
will call
_get_value, which would if the value is already defined and will get the default value if notReturns
any- the field value
 
Expand source code
def getter(self): """ getter method this property will call `_get_value`, which would if the value is already defined and will get the default value if not Returns: any: the field value """ return self._get_value(name, field) var port- 
getter method this property
will call
_get_value, which would if the value is already defined and will get the default value if notReturns
any- the field value
 
Expand source code
def getter(self): """ getter method this property will call `_get_value`, which would if the value is already defined and will get the default value if not Returns: any: the field value """ return self._get_value(name, field) var redis- 
Expand source code
@property def redis(self): if not self._redis: pool = redis.ConnectionPool( host=self.addr, port=self.port, password=self.secret_, connection_class=ZDBConnection, namespace=self.nsname, namespace_password=self.secret_, admin=self.admin, ) self._redis = _patch_redis_client(redis.Redis(connection_pool=pool)) return self._redis var secret_- 
getter method this property
will call
_get_value, which would if the value is already defined and will get the default value if notReturns
any- the field value
 
Expand source code
def getter(self): """ getter method this property will call `_get_value`, which would if the value is already defined and will get the default value if not Returns: any: the field value """ return self._get_value(name, field) 
Methods
def delete(self, key)- 
Expand source code
def delete(self, key): if not key: raise j.exceptions.Value("key must be provided") key = self._key_encode(key) self.redis.execute_command("DEL", key) def exists(self, key)- 
Expand source code
def exists(self, key): key = self._key_encode(key) return self.redis.execute_command("EXISTS", key) == 1 def flush(self)- 
will remove all data from the database DANGEROUS !!!! This is only allowed on private and password protected namespace You need to select the namespace before running the command. :return:
Expand source code
def flush(self): """ will remove all data from the database DANGEROUS !!!! This is only allowed on private and password protected namespace You need to select the namespace before running the command. :return: """ if not self.nsname in ["default", "system"]: self.redis.execute_command("FLUSH") def get(self, key)- 
Expand source code
def get(self, key): key = self._key_encode(key) return self.redis.execute_command("GET", key) def iterate(self, key_start=None, reverse=False, keyonly=False)- 
walk over all the namespace and yield (key,data) for each entries in a namespace
:param key_start: if specified start to walk from that key instead of the first one, defaults to None :param key_start: str, optional :param reverse: decide how to walk the namespace if False, walk from older to newer keys if True walk from newer to older keys defaults to False :param reverse: bool, optional :param keyonly: [description], defaults to False :param keyonly: bool, optional :raises e: [description]
Expand source code
def iterate(self, key_start=None, reverse=False, keyonly=False): """ walk over all the namespace and yield (key,data) for each entries in a namespace :param key_start: if specified start to walk from that key instead of the first one, defaults to None :param key_start: str, optional :param reverse: decide how to walk the namespace if False, walk from older to newer keys if True walk from newer to older keys defaults to False :param reverse: bool, optional :param keyonly: [description], defaults to False :param keyonly: bool, optional :raises e: [description] """ next = None data = None if key_start is not None: next = self.redis.execute_command("KEYCUR", self._key_encode(key_start)) if not keyonly: data = self.get(key_start) yield (key_start, data) CMD = "SCANX" if not reverse else "RSCAN" while True: try: if not next: response = self.redis.execute_command(CMD) else: response = self.redis.execute_command(CMD, next) # format of the response # see https://github.com/threefoldtech/0-db/tree/development#scan except redis.ResponseError as e: if e.args[0] == "No more data": return raise e (next, results) = response for item in results: keyb, size, epoch = item key_new = self._key_decode(keyb) data = None if not keyonly: data = self.redis.execute_command("GET", keyb) yield (key_new, data) def list(self, key_start=None, reverse=False)- 
list all the keys in the namespace
:param key_start: if specified start to walk from that key instead of the first one, defaults to None :param key_start: str, optional :param reverse: decide how to walk the namespace if False, walk from older to newer keys if True walk from newer to older keys defaults to False :param reverse: bool, optional :return: list of keys :rtype: [str]
Expand source code
def list(self, key_start=None, reverse=False): """ list all the keys in the namespace :param key_start: if specified start to walk from that key instead of the first one, defaults to None :param key_start: str, optional :param reverse: decide how to walk the namespace if False, walk from older to newer keys if True walk from newer to older keys defaults to False :param reverse: bool, optional :return: list of keys :rtype: [str] """ result = [] for key, data in self.iterate(key_start=key_start, reverse=reverse, keyonly=True): result.append(key) return result def ping(self)- 
go to default namespace & ping :return:
Expand source code
def ping(self): """ go to default namespace & ping :return: """ return self.redis.ping() def set(self, data, key=None)- 
Expand source code
def set(self, data, key=None): key = key or "" key = self._key_encode(key) res = self.redis.execute_command("SET", key, data) if not res: return res return self._key_decode(res) def stop(self)- 
Expand source code
def stop(self): pass 
Inherited members
 class ZDBConnection (namespace=None, namespace_password=None, admin=False, *args, **kwargs)- 
ZDBConnection implement the custom selection of namespace on 0-DB
Expand source code
class ZDBConnection(redis.Connection): """ ZDBConnection implement the custom selection of namespace on 0-DB """ def __init__(self, namespace=None, namespace_password=None, admin=False, *args, **kwargs): super().__init__(*args, **kwargs) self.namespace = namespace or "default" self.namespace_password = namespace_password self.admin = admin def on_connect(self): """ when a new connection is created, switch to the proper namespace """ self._parser.on_connect(self) # if a password is specified, authenticate if self.admin and self.password: # avoid checking health here -- PING will fail if we try # to check the health prior to the AUTH self.send_command("AUTH", self.password, check_health=False) if redis.connection.nativestr(self.read_response()) != "OK": raise redis.connection.AuthenticationError("Invalid Password") # if a namespace is specified and it's not default, switch to it if self.namespace and not self.admin: args = [self.namespace] if self.namespace_password: args.append(self.namespace_password) self.send_command("SELECT", *args) if redis.connection.nativestr(self.read_response()) != "OK": raise redis.connection.ConnectionError(f"Failed to select namespace {self.namespace}")Ancestors
- redis.connection.Connection
 
Methods
def on_connect(self)- 
when a new connection is created, switch to the proper namespace
Expand source code
def on_connect(self): """ when a new connection is created, switch to the proper namespace """ self._parser.on_connect(self) # if a password is specified, authenticate if self.admin and self.password: # avoid checking health here -- PING will fail if we try # to check the health prior to the AUTH self.send_command("AUTH", self.password, check_health=False) if redis.connection.nativestr(self.read_response()) != "OK": raise redis.connection.AuthenticationError("Invalid Password") # if a namespace is specified and it's not default, switch to it if self.namespace and not self.admin: args = [self.namespace] if self.namespace_password: args.append(self.namespace_password) self.send_command("SELECT", *args) if redis.connection.nativestr(self.read_response()) != "OK": raise redis.connection.ConnectionError(f"Failed to select namespace {self.namespace}")