Module jumpscale.clients.zdb.client

To get a new ZDB instance

zcl = j.clients.zdb.get("instance", admin=True, mode="user")

This will gets an instance with admin capabilities in user mode

Expand source code
"""
# To get a new ZDB instance

zcl = j.clients.zdb.get("instance", admin=True, mode="user")

# This will gets an instance with admin capabilities in user mode

"""
from jumpscale.clients.base import Client
from jumpscale.core.base import fields
from enum import Enum


import redis
import struct


class Mode(Enum):
    SEQ = "seq"
    USER = "user"


class ZDBClient(Client):
    addr = fields.String(default="localhost")
    port = fields.Integer(default=9900)
    secret_ = fields.String(default="1234567")
    nsname = fields.String(default="test")
    admin = fields.Boolean(default=False)
    mode = fields.Enum(Mode)

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        # if not self.secret_:
        #     self.secret_ = j.core.myenv.adminsecret

        assert len(self.secret_) > 5

        if self.admin:
            self.nsname = "default"
        self.type = "ZDB"
        self._redis = None
        self.nsname = self.nsname.lower().strip()

        # if j.data.bcdb._master:
        #     self._model.trigger_add(self._update_trigger)

    def _update_trigger(self, obj, action, **kwargs):
        if action in ["save", "change"]:
            self._redis = None

    @property
    def redis(self):
        if not self._redis:
            pool = redis.ConnectionPool(
                host=self.addr,
                port=self.port,
                password=self.secret_,
                connection_class=ZDBConnection,
                namespace=self.nsname,
                namespace_password=self.secret_,
                admin=self.admin,
            )
            self._redis = _patch_redis_client(redis.Redis(connection_pool=pool))
        return self._redis

    def _key_encode(self, key):
        if self.mode.value == Mode.SEQ.value:
            if key is None:
                key = ""
            else:
                key = struct.pack("<I", key)
        return key

    def _key_decode(self, key):
        if self.mode.value == Mode.SEQ.value:
            key = struct.unpack("<I", key)[0]
        return key

    def set(self, data, key=None):
        key = key or ""
        key = self._key_encode(key)
        res = self.redis.execute_command("SET", key, data)
        if not res:
            return res
        return self._key_decode(res)

    def get(self, key):
        key = self._key_encode(key)
        return self.redis.execute_command("GET", key)

    def exists(self, key):
        key = self._key_encode(key)
        return self.redis.execute_command("EXISTS", key) == 1

    def delete(self, key):
        if not key:
            raise j.exceptions.Value("key must be provided")
        key = self._key_encode(key)
        self.redis.execute_command("DEL", key)

    def flush(self):
        """
        will remove all data from the database DANGEROUS !!!!
        This is only allowed on private and password protected namespace
        You need to select the namespace before running the command.
        :return:
        """
        if not self.nsname in ["default", "system"]:
            self.redis.execute_command("FLUSH")

    def stop(self):
        pass

    @property
    def nsinfo(self):
        cmd = self.redis.execute_command("NSINFO", self.nsname)
        return _parse_nsinfo(cmd.decode())

    def list(self, key_start=None, reverse=False):
        """
        list all the keys in the namespace

        :param key_start: if specified start to walk from that key instead of the first one, defaults to None
        :param key_start: str, optional
        :param reverse: decide how to walk the namespace
                        if False, walk from older to newer keys
                        if True walk from newer to older keys
                        defaults to False
        :param reverse: bool, optional
        :return: list of keys
        :rtype: [str]
        """
        result = []
        for key, data in self.iterate(key_start=key_start, reverse=reverse, keyonly=True):
            result.append(key)
        return result

    def iterate(self, key_start=None, reverse=False, keyonly=False):
        """
        walk over all the namespace and yield (key,data) for each entries in a namespace

        :param key_start: if specified start to walk from that key instead of the first one, defaults to None
        :param key_start: str, optional
        :param reverse: decide how to walk the namespace
                if False, walk from older to newer keys
                if True walk from newer to older keys
                defaults to False
        :param reverse: bool, optional
        :param keyonly: [description], defaults to False
        :param keyonly: bool, optional
        :raises e: [description]
        """

        next = None
        data = None

        if key_start is not None:
            next = self.redis.execute_command("KEYCUR", self._key_encode(key_start))
            if not keyonly:
                data = self.get(key_start)
            yield (key_start, data)

        CMD = "SCANX" if not reverse else "RSCAN"

        while True:
            try:
                if not next:
                    response = self.redis.execute_command(CMD)
                else:
                    response = self.redis.execute_command(CMD, next)
                # format of the response
                # see https://github.com/threefoldtech/0-db/tree/development#scan
            except redis.ResponseError as e:
                if e.args[0] == "No more data":
                    return
                raise e

            (next, results) = response
            for item in results:
                keyb, size, epoch = item
                key_new = self._key_decode(keyb)
                data = None
                if not keyonly:
                    data = self.redis.execute_command("GET", keyb)
                yield (key_new, data)

    @property
    def count(self):
        """
        :return: return the number of entries in the namespace
        :rtype: int
        """
        return self.nsinfo["entries"]

    def ping(self):
        """
        go to default namespace & ping
        :return:
        """
        return self.redis.ping()

    @property
    def next_id(self):
        """
        :return: return the next id
        :rtype: int
        """
        id_bytes = struct.pack("<I", int(self.nsinfo["next_internal_id"], 16))
        return int.from_bytes(id_bytes, byteorder="big", signed=True)


# Helper functions
def _patch_redis_client(redis):
    # don't auto parse response for set, it's not 100% redis compatible
    # 0-db does return a key after in set
    for cmd in ["SET", "DEL"]:
        if cmd in redis.response_callbacks:
            del redis.response_callbacks[cmd]
    return redis


def _parse_nsinfo(raw):
    def empty(line):
        line = line.strip()
        if len(line) <= 0 or line[0] == "#" or ":" not in line:
            return False
        return True

    info = {}
    for line in filter(empty, raw.splitlines()):
        key, val = line.split(":")
        try:
            val = int(val)
            info[key] = val
            continue
        except ValueError:
            pass
        try:
            val = float(val)
            info[key] = val
            continue
        except ValueError:
            pass

        info[key] = str(val).strip()
    return info


class ZDBConnection(redis.Connection):
    """
    ZDBConnection implement the custom selection of namespace
    on 0-DB
    """

    def __init__(self, namespace=None, namespace_password=None, admin=False, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.namespace = namespace or "default"
        self.namespace_password = namespace_password
        self.admin = admin

    def on_connect(self):
        """
        when a new connection is created, switch to the proper namespace
        """
        self._parser.on_connect(self)

        # if a password is specified, authenticate
        if self.admin and self.password:
            # avoid checking health here -- PING will fail if we try
            # to check the health prior to the AUTH
            self.send_command("AUTH", self.password, check_health=False)
            if redis.connection.nativestr(self.read_response()) != "OK":
                raise redis.connection.AuthenticationError("Invalid Password")

        # if a namespace is specified and it's not default, switch to it
        if self.namespace and not self.admin:
            args = [self.namespace]
            if self.namespace_password:
                args.append(self.namespace_password)

            self.send_command("SELECT", *args)
            if redis.connection.nativestr(self.read_response()) != "OK":
                raise redis.connection.ConnectionError(f"Failed to select namespace {self.namespace}")


class ZDBAdminClient(ZDBClient):
    def auth(self):
        assert self.admin
        if self.secret_:
            # authentication should only happen in zdbadmin client
            self.redis.execute_command("AUTH", self.secret_)

    def namespace_exists(self, name):
        assert self.admin
        self.auth()
        try:
            self.redis.execute_command("NSINFO", name)
            return True
        except Exception as e:
            if not "Namespace not found" in str(e):
                raise j.exceptions.Base("could not check namespace:%s, error:%s" % (name, e))
            return False

    def namespaces_list(self):
        assert self.admin
        self.auth()
        res = self.redis.execute_command("NSLIST")
        return [i.decode() for i in res]

    def namespace_new(self, name, secret=None, maxsize=0, die=False):
        """
        check namespace exists & will return zdb client to that namespace

        :param name:
        :param secret:
        :param maxsize:
        :param die:
        :return:
        """
        assert self.admin
        self.auth()
        if not self.namespace_exists(name):
            self.redis.execute_command("NSNEW", name)
        else:
            if die:
                raise j.exceptions.Base("namespace already exists:%s" % name)

        if secret:
            self.redis.execute_command("NSSET", name, "password", secret)
            self.redis.execute_command("NSSET", name, "public", "no")

        if maxsize != 0:
            self.redis.execute_command("NSSET", name, "maxsize", maxsize)

        ns = j.clients.zdb.client_get(
            name="temp_%s" % name, addr=self.addr, port=self.port, mode=self.mode, secret=secret, namespace=name
        )

        assert ns.ping()
        if secret:
            assert ns.nsinfo["public"] == "no"
        else:
            assert ns.nsinfo["public"] == "yes"

        ns._data.delete()
        return ns

    def namespace_get(self, name, secret=""):
        assert self.admin
        self.auth()
        return self.namespace_new(name, secret)

    def namespace_delete(self, name):
        assert self.admin
        self.auth()
        if self.namespace_exists(name):
            self.redis.execute_command("NSDEL", name)

    def reset(self, ignore=[]):
        """
        dangerous, will remove all namespaces & all data
        :param: list of namespace names not to reset
        :return:
        """
        assert self.admin
        self.auth()
        for name in self.namespaces_list():
            if name not in ["default"] and name not in ignore:
                self.namespace_delete(name)

Classes

class Mode (value, names=None, *, module=None, qualname=None, type=None, start=1)

An enumeration.

Expand source code
class Mode(Enum):
    SEQ = "seq"
    USER = "user"

Ancestors

  • enum.Enum

Class variables

var SEQ
var USER
class ZDBAdminClient (*args, **kwargs)

A simple attribute-based namespace.

SimpleNamespace(**kwargs)

base class implementation for any class with fields which supports getting/setting raw data for any instance fields.

any instance can have an optional name and a parent.

class Person(Base):
    name = fields.String()
    age = fields.Float()

p = Person(name="ahmed", age="19")
print(p.name, p.age)

Args

parent_ : Base, optional
parent instance. Defaults to None.
instance_name_ : str, optional
instance name. Defaults to None.
**values
any given field values to initiate the instance with
Expand source code
class ZDBAdminClient(ZDBClient):
    def auth(self):
        assert self.admin
        if self.secret_:
            # authentication should only happen in zdbadmin client
            self.redis.execute_command("AUTH", self.secret_)

    def namespace_exists(self, name):
        assert self.admin
        self.auth()
        try:
            self.redis.execute_command("NSINFO", name)
            return True
        except Exception as e:
            if not "Namespace not found" in str(e):
                raise j.exceptions.Base("could not check namespace:%s, error:%s" % (name, e))
            return False

    def namespaces_list(self):
        assert self.admin
        self.auth()
        res = self.redis.execute_command("NSLIST")
        return [i.decode() for i in res]

    def namespace_new(self, name, secret=None, maxsize=0, die=False):
        """
        check namespace exists & will return zdb client to that namespace

        :param name:
        :param secret:
        :param maxsize:
        :param die:
        :return:
        """
        assert self.admin
        self.auth()
        if not self.namespace_exists(name):
            self.redis.execute_command("NSNEW", name)
        else:
            if die:
                raise j.exceptions.Base("namespace already exists:%s" % name)

        if secret:
            self.redis.execute_command("NSSET", name, "password", secret)
            self.redis.execute_command("NSSET", name, "public", "no")

        if maxsize != 0:
            self.redis.execute_command("NSSET", name, "maxsize", maxsize)

        ns = j.clients.zdb.client_get(
            name="temp_%s" % name, addr=self.addr, port=self.port, mode=self.mode, secret=secret, namespace=name
        )

        assert ns.ping()
        if secret:
            assert ns.nsinfo["public"] == "no"
        else:
            assert ns.nsinfo["public"] == "yes"

        ns._data.delete()
        return ns

    def namespace_get(self, name, secret=""):
        assert self.admin
        self.auth()
        return self.namespace_new(name, secret)

    def namespace_delete(self, name):
        assert self.admin
        self.auth()
        if self.namespace_exists(name):
            self.redis.execute_command("NSDEL", name)

    def reset(self, ignore=[]):
        """
        dangerous, will remove all namespaces & all data
        :param: list of namespace names not to reset
        :return:
        """
        assert self.admin
        self.auth()
        for name in self.namespaces_list():
            if name not in ["default"] and name not in ignore:
                self.namespace_delete(name)

Ancestors

Methods

def auth(self)
Expand source code
def auth(self):
    assert self.admin
    if self.secret_:
        # authentication should only happen in zdbadmin client
        self.redis.execute_command("AUTH", self.secret_)
def namespace_delete(self, name)
Expand source code
def namespace_delete(self, name):
    assert self.admin
    self.auth()
    if self.namespace_exists(name):
        self.redis.execute_command("NSDEL", name)
def namespace_exists(self, name)
Expand source code
def namespace_exists(self, name):
    assert self.admin
    self.auth()
    try:
        self.redis.execute_command("NSINFO", name)
        return True
    except Exception as e:
        if not "Namespace not found" in str(e):
            raise j.exceptions.Base("could not check namespace:%s, error:%s" % (name, e))
        return False
def namespace_get(self, name, secret='')
Expand source code
def namespace_get(self, name, secret=""):
    assert self.admin
    self.auth()
    return self.namespace_new(name, secret)
def namespace_new(self, name, secret=None, maxsize=0, die=False)

check namespace exists & will return zdb client to that namespace

:param name: :param secret: :param maxsize: :param die: :return:

Expand source code
def namespace_new(self, name, secret=None, maxsize=0, die=False):
    """
    check namespace exists & will return zdb client to that namespace

    :param name:
    :param secret:
    :param maxsize:
    :param die:
    :return:
    """
    assert self.admin
    self.auth()
    if not self.namespace_exists(name):
        self.redis.execute_command("NSNEW", name)
    else:
        if die:
            raise j.exceptions.Base("namespace already exists:%s" % name)

    if secret:
        self.redis.execute_command("NSSET", name, "password", secret)
        self.redis.execute_command("NSSET", name, "public", "no")

    if maxsize != 0:
        self.redis.execute_command("NSSET", name, "maxsize", maxsize)

    ns = j.clients.zdb.client_get(
        name="temp_%s" % name, addr=self.addr, port=self.port, mode=self.mode, secret=secret, namespace=name
    )

    assert ns.ping()
    if secret:
        assert ns.nsinfo["public"] == "no"
    else:
        assert ns.nsinfo["public"] == "yes"

    ns._data.delete()
    return ns
def namespaces_list(self)
Expand source code
def namespaces_list(self):
    assert self.admin
    self.auth()
    res = self.redis.execute_command("NSLIST")
    return [i.decode() for i in res]
def reset(self, ignore=[])

dangerous, will remove all namespaces & all data :param: list of namespace names not to reset :return:

Expand source code
def reset(self, ignore=[]):
    """
    dangerous, will remove all namespaces & all data
    :param: list of namespace names not to reset
    :return:
    """
    assert self.admin
    self.auth()
    for name in self.namespaces_list():
        if name not in ["default"] and name not in ignore:
            self.namespace_delete(name)

Inherited members

class ZDBClient (*args, **kwargs)

A simple attribute-based namespace.

SimpleNamespace(**kwargs)

base class implementation for any class with fields which supports getting/setting raw data for any instance fields.

any instance can have an optional name and a parent.

class Person(Base):
    name = fields.String()
    age = fields.Float()

p = Person(name="ahmed", age="19")
print(p.name, p.age)

Args

parent_ : Base, optional
parent instance. Defaults to None.
instance_name_ : str, optional
instance name. Defaults to None.
**values
any given field values to initiate the instance with
Expand source code
class ZDBClient(Client):
    addr = fields.String(default="localhost")
    port = fields.Integer(default=9900)
    secret_ = fields.String(default="1234567")
    nsname = fields.String(default="test")
    admin = fields.Boolean(default=False)
    mode = fields.Enum(Mode)

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        # if not self.secret_:
        #     self.secret_ = j.core.myenv.adminsecret

        assert len(self.secret_) > 5

        if self.admin:
            self.nsname = "default"
        self.type = "ZDB"
        self._redis = None
        self.nsname = self.nsname.lower().strip()

        # if j.data.bcdb._master:
        #     self._model.trigger_add(self._update_trigger)

    def _update_trigger(self, obj, action, **kwargs):
        if action in ["save", "change"]:
            self._redis = None

    @property
    def redis(self):
        if not self._redis:
            pool = redis.ConnectionPool(
                host=self.addr,
                port=self.port,
                password=self.secret_,
                connection_class=ZDBConnection,
                namespace=self.nsname,
                namespace_password=self.secret_,
                admin=self.admin,
            )
            self._redis = _patch_redis_client(redis.Redis(connection_pool=pool))
        return self._redis

    def _key_encode(self, key):
        if self.mode.value == Mode.SEQ.value:
            if key is None:
                key = ""
            else:
                key = struct.pack("<I", key)
        return key

    def _key_decode(self, key):
        if self.mode.value == Mode.SEQ.value:
            key = struct.unpack("<I", key)[0]
        return key

    def set(self, data, key=None):
        key = key or ""
        key = self._key_encode(key)
        res = self.redis.execute_command("SET", key, data)
        if not res:
            return res
        return self._key_decode(res)

    def get(self, key):
        key = self._key_encode(key)
        return self.redis.execute_command("GET", key)

    def exists(self, key):
        key = self._key_encode(key)
        return self.redis.execute_command("EXISTS", key) == 1

    def delete(self, key):
        if not key:
            raise j.exceptions.Value("key must be provided")
        key = self._key_encode(key)
        self.redis.execute_command("DEL", key)

    def flush(self):
        """
        will remove all data from the database DANGEROUS !!!!
        This is only allowed on private and password protected namespace
        You need to select the namespace before running the command.
        :return:
        """
        if not self.nsname in ["default", "system"]:
            self.redis.execute_command("FLUSH")

    def stop(self):
        pass

    @property
    def nsinfo(self):
        cmd = self.redis.execute_command("NSINFO", self.nsname)
        return _parse_nsinfo(cmd.decode())

    def list(self, key_start=None, reverse=False):
        """
        list all the keys in the namespace

        :param key_start: if specified start to walk from that key instead of the first one, defaults to None
        :param key_start: str, optional
        :param reverse: decide how to walk the namespace
                        if False, walk from older to newer keys
                        if True walk from newer to older keys
                        defaults to False
        :param reverse: bool, optional
        :return: list of keys
        :rtype: [str]
        """
        result = []
        for key, data in self.iterate(key_start=key_start, reverse=reverse, keyonly=True):
            result.append(key)
        return result

    def iterate(self, key_start=None, reverse=False, keyonly=False):
        """
        walk over all the namespace and yield (key,data) for each entries in a namespace

        :param key_start: if specified start to walk from that key instead of the first one, defaults to None
        :param key_start: str, optional
        :param reverse: decide how to walk the namespace
                if False, walk from older to newer keys
                if True walk from newer to older keys
                defaults to False
        :param reverse: bool, optional
        :param keyonly: [description], defaults to False
        :param keyonly: bool, optional
        :raises e: [description]
        """

        next = None
        data = None

        if key_start is not None:
            next = self.redis.execute_command("KEYCUR", self._key_encode(key_start))
            if not keyonly:
                data = self.get(key_start)
            yield (key_start, data)

        CMD = "SCANX" if not reverse else "RSCAN"

        while True:
            try:
                if not next:
                    response = self.redis.execute_command(CMD)
                else:
                    response = self.redis.execute_command(CMD, next)
                # format of the response
                # see https://github.com/threefoldtech/0-db/tree/development#scan
            except redis.ResponseError as e:
                if e.args[0] == "No more data":
                    return
                raise e

            (next, results) = response
            for item in results:
                keyb, size, epoch = item
                key_new = self._key_decode(keyb)
                data = None
                if not keyonly:
                    data = self.redis.execute_command("GET", keyb)
                yield (key_new, data)

    @property
    def count(self):
        """
        :return: return the number of entries in the namespace
        :rtype: int
        """
        return self.nsinfo["entries"]

    def ping(self):
        """
        go to default namespace & ping
        :return:
        """
        return self.redis.ping()

    @property
    def next_id(self):
        """
        :return: return the next id
        :rtype: int
        """
        id_bytes = struct.pack("<I", int(self.nsinfo["next_internal_id"], 16))
        return int.from_bytes(id_bytes, byteorder="big", signed=True)

Ancestors

Subclasses

Instance variables

var addr

getter method this property

will call _get_value, which would if the value is already defined and will get the default value if not

Returns

any
the field value
Expand source code
def getter(self):
    """
    getter method this property

    will call `_get_value`, which would if the value is already defined
    and will get the default value if not

    Returns:
        any: the field value
    """
    return self._get_value(name, field)
var admin

getter method this property

will call _get_value, which would if the value is already defined and will get the default value if not

Returns

any
the field value
Expand source code
def getter(self):
    """
    getter method this property

    will call `_get_value`, which would if the value is already defined
    and will get the default value if not

    Returns:
        any: the field value
    """
    return self._get_value(name, field)
var count

:return: return the number of entries in the namespace :rtype: int

Expand source code
@property
def count(self):
    """
    :return: return the number of entries in the namespace
    :rtype: int
    """
    return self.nsinfo["entries"]
var mode

getter method this property

will call _get_value, which would if the value is already defined and will get the default value if not

Returns

any
the field value
Expand source code
def getter(self):
    """
    getter method this property

    will call `_get_value`, which would if the value is already defined
    and will get the default value if not

    Returns:
        any: the field value
    """
    return self._get_value(name, field)
var next_id

:return: return the next id :rtype: int

Expand source code
@property
def next_id(self):
    """
    :return: return the next id
    :rtype: int
    """
    id_bytes = struct.pack("<I", int(self.nsinfo["next_internal_id"], 16))
    return int.from_bytes(id_bytes, byteorder="big", signed=True)
var nsinfo
Expand source code
@property
def nsinfo(self):
    cmd = self.redis.execute_command("NSINFO", self.nsname)
    return _parse_nsinfo(cmd.decode())
var nsname

getter method this property

will call _get_value, which would if the value is already defined and will get the default value if not

Returns

any
the field value
Expand source code
def getter(self):
    """
    getter method this property

    will call `_get_value`, which would if the value is already defined
    and will get the default value if not

    Returns:
        any: the field value
    """
    return self._get_value(name, field)
var port

getter method this property

will call _get_value, which would if the value is already defined and will get the default value if not

Returns

any
the field value
Expand source code
def getter(self):
    """
    getter method this property

    will call `_get_value`, which would if the value is already defined
    and will get the default value if not

    Returns:
        any: the field value
    """
    return self._get_value(name, field)
var redis
Expand source code
@property
def redis(self):
    if not self._redis:
        pool = redis.ConnectionPool(
            host=self.addr,
            port=self.port,
            password=self.secret_,
            connection_class=ZDBConnection,
            namespace=self.nsname,
            namespace_password=self.secret_,
            admin=self.admin,
        )
        self._redis = _patch_redis_client(redis.Redis(connection_pool=pool))
    return self._redis
var secret_

getter method this property

will call _get_value, which would if the value is already defined and will get the default value if not

Returns

any
the field value
Expand source code
def getter(self):
    """
    getter method this property

    will call `_get_value`, which would if the value is already defined
    and will get the default value if not

    Returns:
        any: the field value
    """
    return self._get_value(name, field)

Methods

def delete(self, key)
Expand source code
def delete(self, key):
    if not key:
        raise j.exceptions.Value("key must be provided")
    key = self._key_encode(key)
    self.redis.execute_command("DEL", key)
def exists(self, key)
Expand source code
def exists(self, key):
    key = self._key_encode(key)
    return self.redis.execute_command("EXISTS", key) == 1
def flush(self)

will remove all data from the database DANGEROUS !!!! This is only allowed on private and password protected namespace You need to select the namespace before running the command. :return:

Expand source code
def flush(self):
    """
    will remove all data from the database DANGEROUS !!!!
    This is only allowed on private and password protected namespace
    You need to select the namespace before running the command.
    :return:
    """
    if not self.nsname in ["default", "system"]:
        self.redis.execute_command("FLUSH")
def get(self, key)
Expand source code
def get(self, key):
    key = self._key_encode(key)
    return self.redis.execute_command("GET", key)
def iterate(self, key_start=None, reverse=False, keyonly=False)

walk over all the namespace and yield (key,data) for each entries in a namespace

:param key_start: if specified start to walk from that key instead of the first one, defaults to None :param key_start: str, optional :param reverse: decide how to walk the namespace if False, walk from older to newer keys if True walk from newer to older keys defaults to False :param reverse: bool, optional :param keyonly: [description], defaults to False :param keyonly: bool, optional :raises e: [description]

Expand source code
def iterate(self, key_start=None, reverse=False, keyonly=False):
    """
    walk over all the namespace and yield (key,data) for each entries in a namespace

    :param key_start: if specified start to walk from that key instead of the first one, defaults to None
    :param key_start: str, optional
    :param reverse: decide how to walk the namespace
            if False, walk from older to newer keys
            if True walk from newer to older keys
            defaults to False
    :param reverse: bool, optional
    :param keyonly: [description], defaults to False
    :param keyonly: bool, optional
    :raises e: [description]
    """

    next = None
    data = None

    if key_start is not None:
        next = self.redis.execute_command("KEYCUR", self._key_encode(key_start))
        if not keyonly:
            data = self.get(key_start)
        yield (key_start, data)

    CMD = "SCANX" if not reverse else "RSCAN"

    while True:
        try:
            if not next:
                response = self.redis.execute_command(CMD)
            else:
                response = self.redis.execute_command(CMD, next)
            # format of the response
            # see https://github.com/threefoldtech/0-db/tree/development#scan
        except redis.ResponseError as e:
            if e.args[0] == "No more data":
                return
            raise e

        (next, results) = response
        for item in results:
            keyb, size, epoch = item
            key_new = self._key_decode(keyb)
            data = None
            if not keyonly:
                data = self.redis.execute_command("GET", keyb)
            yield (key_new, data)
def list(self, key_start=None, reverse=False)

list all the keys in the namespace

:param key_start: if specified start to walk from that key instead of the first one, defaults to None :param key_start: str, optional :param reverse: decide how to walk the namespace if False, walk from older to newer keys if True walk from newer to older keys defaults to False :param reverse: bool, optional :return: list of keys :rtype: [str]

Expand source code
def list(self, key_start=None, reverse=False):
    """
    list all the keys in the namespace

    :param key_start: if specified start to walk from that key instead of the first one, defaults to None
    :param key_start: str, optional
    :param reverse: decide how to walk the namespace
                    if False, walk from older to newer keys
                    if True walk from newer to older keys
                    defaults to False
    :param reverse: bool, optional
    :return: list of keys
    :rtype: [str]
    """
    result = []
    for key, data in self.iterate(key_start=key_start, reverse=reverse, keyonly=True):
        result.append(key)
    return result
def ping(self)

go to default namespace & ping :return:

Expand source code
def ping(self):
    """
    go to default namespace & ping
    :return:
    """
    return self.redis.ping()
def set(self, data, key=None)
Expand source code
def set(self, data, key=None):
    key = key or ""
    key = self._key_encode(key)
    res = self.redis.execute_command("SET", key, data)
    if not res:
        return res
    return self._key_decode(res)
def stop(self)
Expand source code
def stop(self):
    pass

Inherited members

class ZDBConnection (namespace=None, namespace_password=None, admin=False, *args, **kwargs)

ZDBConnection implement the custom selection of namespace on 0-DB

Expand source code
class ZDBConnection(redis.Connection):
    """
    ZDBConnection implement the custom selection of namespace
    on 0-DB
    """

    def __init__(self, namespace=None, namespace_password=None, admin=False, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.namespace = namespace or "default"
        self.namespace_password = namespace_password
        self.admin = admin

    def on_connect(self):
        """
        when a new connection is created, switch to the proper namespace
        """
        self._parser.on_connect(self)

        # if a password is specified, authenticate
        if self.admin and self.password:
            # avoid checking health here -- PING will fail if we try
            # to check the health prior to the AUTH
            self.send_command("AUTH", self.password, check_health=False)
            if redis.connection.nativestr(self.read_response()) != "OK":
                raise redis.connection.AuthenticationError("Invalid Password")

        # if a namespace is specified and it's not default, switch to it
        if self.namespace and not self.admin:
            args = [self.namespace]
            if self.namespace_password:
                args.append(self.namespace_password)

            self.send_command("SELECT", *args)
            if redis.connection.nativestr(self.read_response()) != "OK":
                raise redis.connection.ConnectionError(f"Failed to select namespace {self.namespace}")

Ancestors

  • redis.connection.Connection

Methods

def on_connect(self)

when a new connection is created, switch to the proper namespace

Expand source code
def on_connect(self):
    """
    when a new connection is created, switch to the proper namespace
    """
    self._parser.on_connect(self)

    # if a password is specified, authenticate
    if self.admin and self.password:
        # avoid checking health here -- PING will fail if we try
        # to check the health prior to the AUTH
        self.send_command("AUTH", self.password, check_health=False)
        if redis.connection.nativestr(self.read_response()) != "OK":
            raise redis.connection.AuthenticationError("Invalid Password")

    # if a namespace is specified and it's not default, switch to it
    if self.namespace and not self.admin:
        args = [self.namespace]
        if self.namespace_password:
            args.append(self.namespace_password)

        self.send_command("SELECT", *args)
        if redis.connection.nativestr(self.read_response()) != "OK":
            raise redis.connection.ConnectionError(f"Failed to select namespace {self.namespace}")