Module jumpscale.core.base.store
Store defines the interface for the backend storage, let it be filesystem or redis.
This module also defines the abstractions needed for any storage backend.
Every backend should be able to organize configuration for multiple instance given a location, also read/write the config data in raw (string) format.
Expand source code
"""
Store defines the interface for the backend storage, let it be filesystem or redis.
This module also defines the abstractions needed for any storage backend.
Every backend should be able to organize configuration for multiple instance given a location, also
read/write the config data in raw (string) format.
"""
import os
from abc import ABC, abstractmethod
from enum import Enum
from jumpscale.data.nacl import NACL
from jumpscale.data.serializers import base64
from jumpscale.core.config import Environment
# we will use this as a key field name to get it when searching
KEY_FIELD_NAME = "instance_name_"
class InvalidPrivateKey(Exception):
"""
raised when the private key configured is invalid
"""
class StoreException(Exception):
"""
raised by store backends
"""
class ConfigNotFound(Exception):
"""
raised when a config is not found for an instance
"""
class Location:
"""
dot-separated auto-location for any type
for example, if we have a class in jumpscale/clients/redis/<type>
location name will be jumpscale.clients.redis.<type>
"""
def __init__(self, *name_list, type_=None):
self.type = type_
self.name_list = [name for name in name_list if name]
@property
def name(self):
"""
get dot seprated string from current name list
Returns:
str: dot separated string
"""
return ".".join(self.name_list)
@property
def path(self):
"""
get a filesystem path with from `name`, where dots are replaced by `os.sep`
Returns:
str: path
"""
return os.path.join(*self.name.split("."))
@classmethod
def from_type(cls, type_):
"""
get a location from any type/class
Args:
type_ (type): any type (class)
Returns:
Location: a location object
"""
return cls(type_.__module__, type_.__name__)
def __str__(self):
args = "', '".join(self.name_list)
cls_name = self.__class__.__name__
return f"{cls_name}('{args}')"
__repr__ = __str__
def __eq__(self, other):
return self.name == other.name
def __hash__(self):
return hash(self.name)
class EncryptionMode(Enum):
"""
An enum to select encryption mode based on loading or storing the data
"""
Encrypt = 0
Decrypt = 1
class EncryptionMixin:
"""
A mixin that provides encrypt and decrypt methods, which can be used in any store
"""
def encrypt(self, data):
"""encrypt data
Args:
data (str): input string
Returns:
bytes: encrypted data as byte string
"""
if not isinstance(data, bytes):
data = data.encode()
return self.nacl.encrypt(data, self.public_key)
def decrypt(self, data):
"""decrypt data
Args:
data (bytes): encrypted byte string
Returns:
str: decrypted data
"""
return self.nacl.decrypt(data, self.public_key).decode()
class ConfigStore(ABC):
"""
the interface every config store should implement:
- `read(instance_name)`: reads the data of this instance name
- `write(instance_name, data)`: writes the data of this instance
- `list_all(instance_name)`: lists all instance names
- `delete(instance_name)`: delete instance data
- `find(self, cursor_=None, limit_=None, **query)`: optional search method with query as field mapping
"""
@abstractmethod
def read(self, instance_name):
pass
@abstractmethod
def write(self, instance_name, data):
pass
@abstractmethod
def list_all(self):
pass
@abstractmethod
def find(self, cursor_=None, limit_=None, **query):
pass
@abstractmethod
def delete(self, instance_name):
pass
class EncryptedConfigStore(ConfigStore, EncryptionMixin):
"""the base class for any config store backend"""
def __init__(self, location, serializer):
"""
the base for encrypted config store
Args:
location (Location)
serializer (Serializer)
Raises:
InvalidPrivateKey: in case the private key is not configured
"""
self.location = location
self.serializer = serializer
self.config_env = Environment()
self.priv_key = base64.decode(self.config_env.get_private_key())
self.nacl = NACL(private_key=self.priv_key)
self.public_key = self.nacl.public_key.encode()
if not self.priv_key:
raise InvalidPrivateKey
def _encrypt_value(self, value):
"""
encrypt a single value
Args:
value (str): value
Returns:
str: decrypted value
"""
return base64.encode(self.encrypt(value)).decode("ascii")
def _decrypt_value(self, value):
"""
decrypt a single value
Args:
value (str): value
Returns:
str: decrypted value
"""
return self.decrypt(base64.decode(value))
def _process_config(self, config, mode):
"""
process current config according to encryption mode
Args:
config (dict): config dict (can be nested)
mode (EncryptionMode)
"""
new_config = {}
for name, value in config.items():
if name.startswith("__") and value is not None:
if mode == EncryptionMode.Decrypt:
new_config[name.lstrip("__")] = self._decrypt_value(value)
else:
# preserve __ to know it's an encrypted value
new_config[name] = self._encrypt_value(value)
elif isinstance(value, dict):
new_config[name] = self._process_config(value, mode)
else:
new_config[name] = value
return new_config
def get(self, instance_name):
"""
get instance config
Args:
instance_name (str): instance name
Returns:
dict: instance config as dict
"""
config = self.serializer.deserialize(self.read(instance_name))
return self._process_config(config, EncryptionMode.Decrypt)
def find(self, cursor_=None, limit_=None, **query):
"""
a generic find, which do a linear search over all items
if you want a better way, use a store which provides search
Args:
cursor_ (any, optional): an optional cursor, to start searching from. Defaults to None.
limit_ (int, optional): results limit. Defaults to None.
query: a mapping between field and value fo search by
Returns:
tuple: the new cursor, total result count and a generator for results
"""
all_names = self.list_all()
if not all_names:
# empty result
return cursor_, 0, (name for name in all_names)
all_count = len(all_names)
if not limit_:
limit_ = all_count
start_search = not bool(cursor_)
found = []
for index in range(all_count):
instance_name = all_names[index]
if instance_name == cursor_:
start_search = True
if not start_search:
continue
data = self.get(instance_name)
for name, value in query.items():
if name in data:
data[KEY_FIELD_NAME] = instance_name
target_value = data[name]
if isinstance(target_value, str):
# just a simple normalization
value = str(value).lower()
target_value = target_value.lower()
if value == target_value and data not in found:
found.append(data)
if len(found) >= limit_:
break
if index == all_count - 1:
new_cursor = None
else:
new_cursor = all_names[index + 1]
# return the new cursor, total found and a generator
return new_cursor, len(found), (config for config in found)
def save(self, instance_name, config):
"""
save instance config
Args:
instance_name (str): name
config (dict): config data, any key that starts with `__` will be encrypted
Returns:
bool: written or not
"""
new_config = self._process_config(config, EncryptionMode.Encrypt)
return self.write(instance_name, self.serializer.serialize(new_config))
Sub-modules
jumpscale.core.base.store.filesystem
jumpscale.core.base.store.redis
jumpscale.core.base.store.serializers
jumpscale.core.base.store.whooshfts
Classes
class ConfigNotFound (*args, **kwargs)
-
raised when a config is not found for an instance
Expand source code
class ConfigNotFound(Exception): """ raised when a config is not found for an instance """
Ancestors
- builtins.Exception
- builtins.BaseException
class ConfigStore
-
the interface every config store should implement:
read(instance_name)
: reads the data of this instance namewrite(instance_name, data)
: writes the data of this instancelist_all(instance_name)
: lists all instance namesdelete(instance_name)
: delete instance datafind(self, cursor_=None, limit_=None, **query)
: optional search method with query as field mapping
Expand source code
class ConfigStore(ABC): """ the interface every config store should implement: - `read(instance_name)`: reads the data of this instance name - `write(instance_name, data)`: writes the data of this instance - `list_all(instance_name)`: lists all instance names - `delete(instance_name)`: delete instance data - `find(self, cursor_=None, limit_=None, **query)`: optional search method with query as field mapping """ @abstractmethod def read(self, instance_name): pass @abstractmethod def write(self, instance_name, data): pass @abstractmethod def list_all(self): pass @abstractmethod def find(self, cursor_=None, limit_=None, **query): pass @abstractmethod def delete(self, instance_name): pass
Ancestors
- abc.ABC
Subclasses
Methods
def delete(self, instance_name)
-
Expand source code
@abstractmethod def delete(self, instance_name): pass
def find(self, cursor_=None, limit_=None, **query)
-
Expand source code
@abstractmethod def find(self, cursor_=None, limit_=None, **query): pass
def list_all(self)
-
Expand source code
@abstractmethod def list_all(self): pass
def read(self, instance_name)
-
Expand source code
@abstractmethod def read(self, instance_name): pass
def write(self, instance_name, data)
-
Expand source code
@abstractmethod def write(self, instance_name, data): pass
class EncryptedConfigStore (location, serializer)
-
the base class for any config store backend
the base for encrypted config store
Args
location (Location) serializer (Serializer)
Raises
InvalidPrivateKey
- in case the private key is not configured
Expand source code
class EncryptedConfigStore(ConfigStore, EncryptionMixin): """the base class for any config store backend""" def __init__(self, location, serializer): """ the base for encrypted config store Args: location (Location) serializer (Serializer) Raises: InvalidPrivateKey: in case the private key is not configured """ self.location = location self.serializer = serializer self.config_env = Environment() self.priv_key = base64.decode(self.config_env.get_private_key()) self.nacl = NACL(private_key=self.priv_key) self.public_key = self.nacl.public_key.encode() if not self.priv_key: raise InvalidPrivateKey def _encrypt_value(self, value): """ encrypt a single value Args: value (str): value Returns: str: decrypted value """ return base64.encode(self.encrypt(value)).decode("ascii") def _decrypt_value(self, value): """ decrypt a single value Args: value (str): value Returns: str: decrypted value """ return self.decrypt(base64.decode(value)) def _process_config(self, config, mode): """ process current config according to encryption mode Args: config (dict): config dict (can be nested) mode (EncryptionMode) """ new_config = {} for name, value in config.items(): if name.startswith("__") and value is not None: if mode == EncryptionMode.Decrypt: new_config[name.lstrip("__")] = self._decrypt_value(value) else: # preserve __ to know it's an encrypted value new_config[name] = self._encrypt_value(value) elif isinstance(value, dict): new_config[name] = self._process_config(value, mode) else: new_config[name] = value return new_config def get(self, instance_name): """ get instance config Args: instance_name (str): instance name Returns: dict: instance config as dict """ config = self.serializer.deserialize(self.read(instance_name)) return self._process_config(config, EncryptionMode.Decrypt) def find(self, cursor_=None, limit_=None, **query): """ a generic find, which do a linear search over all items if you want a better way, use a store which provides search Args: cursor_ (any, optional): an optional cursor, to start searching from. Defaults to None. limit_ (int, optional): results limit. Defaults to None. query: a mapping between field and value fo search by Returns: tuple: the new cursor, total result count and a generator for results """ all_names = self.list_all() if not all_names: # empty result return cursor_, 0, (name for name in all_names) all_count = len(all_names) if not limit_: limit_ = all_count start_search = not bool(cursor_) found = [] for index in range(all_count): instance_name = all_names[index] if instance_name == cursor_: start_search = True if not start_search: continue data = self.get(instance_name) for name, value in query.items(): if name in data: data[KEY_FIELD_NAME] = instance_name target_value = data[name] if isinstance(target_value, str): # just a simple normalization value = str(value).lower() target_value = target_value.lower() if value == target_value and data not in found: found.append(data) if len(found) >= limit_: break if index == all_count - 1: new_cursor = None else: new_cursor = all_names[index + 1] # return the new cursor, total found and a generator return new_cursor, len(found), (config for config in found) def save(self, instance_name, config): """ save instance config Args: instance_name (str): name config (dict): config data, any key that starts with `__` will be encrypted Returns: bool: written or not """ new_config = self._process_config(config, EncryptionMode.Encrypt) return self.write(instance_name, self.serializer.serialize(new_config))
Ancestors
- ConfigStore
- abc.ABC
- EncryptionMixin
Subclasses
Methods
def find(self, cursor_=None, limit_=None, **query)
-
a generic find, which do a linear search over all items
if you want a better way, use a store which provides search
Args
cursor_
:any
, optional- an optional cursor, to start searching from. Defaults to None.
limit_
:int
, optional- results limit. Defaults to None.
query
- a mapping between field and value fo search by
Returns
tuple
- the new cursor, total result count and a generator for results
Expand source code
def find(self, cursor_=None, limit_=None, **query): """ a generic find, which do a linear search over all items if you want a better way, use a store which provides search Args: cursor_ (any, optional): an optional cursor, to start searching from. Defaults to None. limit_ (int, optional): results limit. Defaults to None. query: a mapping between field and value fo search by Returns: tuple: the new cursor, total result count and a generator for results """ all_names = self.list_all() if not all_names: # empty result return cursor_, 0, (name for name in all_names) all_count = len(all_names) if not limit_: limit_ = all_count start_search = not bool(cursor_) found = [] for index in range(all_count): instance_name = all_names[index] if instance_name == cursor_: start_search = True if not start_search: continue data = self.get(instance_name) for name, value in query.items(): if name in data: data[KEY_FIELD_NAME] = instance_name target_value = data[name] if isinstance(target_value, str): # just a simple normalization value = str(value).lower() target_value = target_value.lower() if value == target_value and data not in found: found.append(data) if len(found) >= limit_: break if index == all_count - 1: new_cursor = None else: new_cursor = all_names[index + 1] # return the new cursor, total found and a generator return new_cursor, len(found), (config for config in found)
def get(self, instance_name)
-
get instance config
Args
instance_name
:str
- instance name
Returns
dict
- instance config as dict
Expand source code
def get(self, instance_name): """ get instance config Args: instance_name (str): instance name Returns: dict: instance config as dict """ config = self.serializer.deserialize(self.read(instance_name)) return self._process_config(config, EncryptionMode.Decrypt)
def save(self, instance_name, config)
-
save instance config
Args
instance_name
:str
- name
config
:dict
- config data, any key that starts with
__
will be encrypted
Returns
bool
- written or not
Expand source code
def save(self, instance_name, config): """ save instance config Args: instance_name (str): name config (dict): config data, any key that starts with `__` will be encrypted Returns: bool: written or not """ new_config = self._process_config(config, EncryptionMode.Encrypt) return self.write(instance_name, self.serializer.serialize(new_config))
Inherited members
class EncryptionMixin
-
A mixin that provides encrypt and decrypt methods, which can be used in any store
Expand source code
class EncryptionMixin: """ A mixin that provides encrypt and decrypt methods, which can be used in any store """ def encrypt(self, data): """encrypt data Args: data (str): input string Returns: bytes: encrypted data as byte string """ if not isinstance(data, bytes): data = data.encode() return self.nacl.encrypt(data, self.public_key) def decrypt(self, data): """decrypt data Args: data (bytes): encrypted byte string Returns: str: decrypted data """ return self.nacl.decrypt(data, self.public_key).decode()
Subclasses
Methods
def decrypt(self, data)
-
decrypt data
Args
data
:bytes
- encrypted byte string
Returns
str
- decrypted data
Expand source code
def decrypt(self, data): """decrypt data Args: data (bytes): encrypted byte string Returns: str: decrypted data """ return self.nacl.decrypt(data, self.public_key).decode()
def encrypt(self, data)
-
encrypt data
Args
data
:str
- input string
Returns
bytes
- encrypted data as byte string
Expand source code
def encrypt(self, data): """encrypt data Args: data (str): input string Returns: bytes: encrypted data as byte string """ if not isinstance(data, bytes): data = data.encode() return self.nacl.encrypt(data, self.public_key)
class EncryptionMode (value, names=None, *, module=None, qualname=None, type=None, start=1)
-
An enum to select encryption mode based on loading or storing the data
Expand source code
class EncryptionMode(Enum): """ An enum to select encryption mode based on loading or storing the data """ Encrypt = 0 Decrypt = 1
Ancestors
- enum.Enum
Class variables
var Decrypt
var Encrypt
class InvalidPrivateKey (*args, **kwargs)
-
raised when the private key configured is invalid
Expand source code
class InvalidPrivateKey(Exception): """ raised when the private key configured is invalid """
Ancestors
- builtins.Exception
- builtins.BaseException
class Location (*name_list, type_=None)
-
dot-separated auto-location for any type
for example, if we have a class in jumpscale/clients/redis/
location name will be jumpscale.clients.redis. Expand source code
class Location: """ dot-separated auto-location for any type for example, if we have a class in jumpscale/clients/redis/<type> location name will be jumpscale.clients.redis.<type> """ def __init__(self, *name_list, type_=None): self.type = type_ self.name_list = [name for name in name_list if name] @property def name(self): """ get dot seprated string from current name list Returns: str: dot separated string """ return ".".join(self.name_list) @property def path(self): """ get a filesystem path with from `name`, where dots are replaced by `os.sep` Returns: str: path """ return os.path.join(*self.name.split(".")) @classmethod def from_type(cls, type_): """ get a location from any type/class Args: type_ (type): any type (class) Returns: Location: a location object """ return cls(type_.__module__, type_.__name__) def __str__(self): args = "', '".join(self.name_list) cls_name = self.__class__.__name__ return f"{cls_name}('{args}')" __repr__ = __str__ def __eq__(self, other): return self.name == other.name def __hash__(self): return hash(self.name)
Static methods
def from_type(type_)
-
get a location from any type/class
Args
type_
:type
- any type (class)
Returns
Location
- a location object
Expand source code
@classmethod def from_type(cls, type_): """ get a location from any type/class Args: type_ (type): any type (class) Returns: Location: a location object """ return cls(type_.__module__, type_.__name__)
Instance variables
var name
-
get dot seprated string from current name list
Returns
str
- dot separated string
Expand source code
@property def name(self): """ get dot seprated string from current name list Returns: str: dot separated string """ return ".".join(self.name_list)
var path
-
get a filesystem path with from
name
, where dots are replaced byos.sep
Returns
str
- path
Expand source code
@property def path(self): """ get a filesystem path with from `name`, where dots are replaced by `os.sep` Returns: str: path """ return os.path.join(*self.name.split("."))
class StoreException (*args, **kwargs)
-
raised by store backends
Expand source code
class StoreException(Exception): """ raised by store backends """
Ancestors
- builtins.Exception
- builtins.BaseException