Module jumpscale.servers.gedis.server
Expand source code
import inspect
import json
import sys
import os
from redis import Redis
from enum import Enum
from functools import partial
from io import BytesIO
from signal import SIGKILL, SIGTERM
import json
import gevent
from gevent.pool import Pool
from gevent import time
from gevent.server import StreamServer
from jumpscale.core.base import Base, fields
from jumpscale.loader import j
from redis.connection import DefaultParser, Encoder
from redis.exceptions import ConnectionError, TimeoutError
from .baseactor import BaseActor
from .systemactor import CoreActor, SystemActor
def serialize(obj):
    if not isinstance(obj, (str, int, float, list, tuple, dict, bool)):
        module = inspect.getmodule(obj).__file__[:-3]
        return dict(__serialized__=True, module=module, type=obj.__class__.__name__, data=obj.to_dict())
    return obj
def deserialize(obj):
    if isinstance(obj, dict) and obj.get("__serialized__"):
        module = sys.modules[obj["module"]]
        object_instance = getattr(module, obj["type"])()
        object_instance.from_dict(obj["data"])
        return object_instance
    return obj
class GedisErrorTypes(Enum):
    NOT_FOUND = 0
    BAD_REQUEST = 1
    ACTOR_ERROR = 3
    INTERNAL_SERVER_ERROR = 4
    PERMISSION_ERROR = 5
EXCEPTIONS_MAP = {
    j.exceptions.Value: GedisErrorTypes.BAD_REQUEST.value,
    j.exceptions.NotFound: GedisErrorTypes.NOT_FOUND.value,
    j.exceptions.Permission: GedisErrorTypes.PERMISSION_ERROR.value,
}
class RedisConnectionAdapter:
    def __init__(self, sock):
        self.socket = sock
        self._sock = sock
        self.socket_timeout = 600
        self.socket_connect_timeout = 600
        self.socket_keepalive = True
        self.retry_on_timeout = True
        self.socket_keepalive_options = {}
        self.encoder = Encoder("utf", "strict", False)
class ResponseEncoder:
    def __init__(self, socket):
        self.socket = socket
        self.buffer = BytesIO()
    def encode(self, value):
        """Respond with data."""
        if value is None:
            self._write_buffer("$-1\r\n")
        elif isinstance(value, int):
            self._write_buffer(":{}\r\n".format(value))
        elif isinstance(value, bool):
            self._write_buffer(":{}\r\n".format(1 if value else 0))
        elif isinstance(value, str):
            if "\n" in value:
                self._bulk(value)
            else:
                self._write_buffer("+{}\r\n".format(value))
        elif isinstance(value, bytes):
            self._bulkbytes(value)
        elif isinstance(value, list):
            if value and value[0] == "*REDIS*":
                value = value[1:]
            self._array(value)
        elif hasattr(value, "__repr__"):
            self._bulk(value.__repr__())
        else:
            value = j.data.serializers.json.dumps(value, encoding="utf-8")
            self.encode(value)
        self._send()
    def status(self, msg="OK"):
        """Send a status."""
        self._write_buffer("+{}\r\n".format(msg))
        self._send()
    def error(self, msg):
        """Send an error."""
        # print("###:%s" % msg)
        self._write_buffer("-ERR {}\r\n".format(msg))
        self._send()
    def _bulk(self, value):
        """Send part of a multiline reply."""
        data = ["$", str(len(value)), "\r\n", value, "\r\n"]
        self._write_buffer("".join(data))
    def _array(self, value):
        """send an array."""
        self._write_buffer("*{}\r\n".format(len(value)))
        for item in value:
            self.encode(item)
    def _bulkbytes(self, value):
        data = [b"$", str(len(value)).encode(), b"\r\n", value, b"\r\n"]
        self._write_buffer(b"".join(data))
    def _write_buffer(self, data):
        if isinstance(data, str):
            data = data.encode()
        self.buffer.write(data)
    def _send(self):
        self.socket.sendall(self.buffer.getvalue())
        self.buffer = BytesIO()  # seems faster then truncating
SERIALIZABLE_TYPES = (str, int, float, list, tuple, dict, bool)
RESERVED_ACTOR_NAMES = ("core", "system")
class GedisServer(Base):
    host = fields.String(default="127.0.0.1")
    port = fields.Integer(default=16000)
    enable_system_actor = fields.Boolean(default=True)
    run_async = fields.Boolean(default=True)
    _actors = fields.Typed(dict, default={})
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._core_actor = CoreActor()
        self._system_actor = SystemActor()
        self._loaded_actors = {"core": self._core_actor}
    @property
    def actors(self):
        """Lists saved actors
        Returns:
            list -- List of saved actors
        """
        return self._actors
    def actor_add(self, actor_name: str, actor_path: str):
        """Adds an actor to the server
        Arguments:
            actor_name {str} -- Actor name
            actor_path {str} -- Actor absolute path
        Raises:
            j.exceptions.Value: raises if actor name is matched one of the reserved actor names
            j.exceptions.Value: raises if actor name is not a valid identifier
        """
        if actor_name in RESERVED_ACTOR_NAMES:
            raise j.exceptions.Value("Invalid actor name")
        if not actor_name.isidentifier():
            raise j.exceptions.Value(f"Actor name should be a valid identifier")
        self._actors[actor_name] = actor_path
    def actor_delete(self, actor_name: str):
        """Removes an actor from the server
        Arguments:
            actor_name {str} -- Actor name
        """
        self._actors.pop(actor_name, None)
    def start(self):
        """Starts the server
        """
        # register system actor if enabled
        if self.enable_system_actor:
            self._register_actor("system", self._system_actor)
        self._core_actor.set_server(self)
        self._system_actor.set_server(self)
        # register saved actors
        for actor_name, actor_path in self._actors.items():
            self._system_actor.register_actor(actor_name, actor_path)
        # start the server
        self._server = StreamServer((self.host, self.port), self._on_connection, spawn=Pool())
        self._server.reuse_addr = True
        self._server.start()
        j.logger.info(f"Gedis server is started at {self.host}:{self.port}...")
    def stop(self):
        """Stops the server
        """
        j.logger.info("Shutting down...")
        self._server.stop()
    def _register_actor(self, actor_name: str, actor_module: BaseActor):
        self._loaded_actors[actor_name] = actor_module
    def _unregister_actor(self, actor_name: str):
        self._loaded_actors.pop(actor_name, None)
    def _execute(self, method, args, kwargs):
        response = {}
        try:
            response["result"] = method(*args, **kwargs)
        except Exception as e:
            j.logger.exception(f"error while executing {method}", exception=e)
            response["error"] = str(e)
            response["error_type"] = EXCEPTIONS_MAP.get(e.__class__, GedisErrorTypes.ACTOR_ERROR.value)
        return response
    def _on_connection(self, socket, address):
        j.logger.debug(f"New connection from {address}")
        parser = DefaultParser(65536)
        connection = RedisConnectionAdapter(socket)
        try:
            encoder = ResponseEncoder(socket)
            parser.on_connect(connection)
            while True:
                response = dict(success=True, result=None, error=None, error_type=None, is_async=False, task_id=None)
                try:
                    request = parser.read_response()
                    if len(request) < 2:
                        response["error"] = "invalid request"
                        response["error_type"] = GedisErrorTypes.BAD_REQUEST.value
                    else:
                        actor_name = request.pop(0).decode()
                        method_name = request.pop(0).decode()
                        actor_object = self._loaded_actors.get(actor_name)
                        if not actor_object:
                            response["error"] = "actor not found"
                            response["error_type"] = GedisErrorTypes.NOT_FOUND.value
                        elif not hasattr(actor_object, method_name):
                            response["error"] = "method not found"
                            response["error_type"] = GedisErrorTypes.NOT_FOUND.value
                        else:
                            j.logger.debug(
                                f"Executing method {method_name} from actor {actor_name} to client {address}"
                            )
                            if request:
                                args, kwargs = json.loads(request.pop(0), object_hook=deserialize)
                            else:
                                args, kwargs = (), {}
                            method = getattr(actor_object, method_name)
                            result = self._execute(method, args, kwargs)
                            response.update(result)
                except (TimeoutError, ConnectionError):
                    j.logger.debug(f"Client {address} closed the connection/or timeout", address)
                    parser.on_disconnect()
                    return
                except Exception as exception:
                    j.logger.exception("internal error", exception=exception)
                    response["error"] = "internal server error"
                    response["error_type"] = GedisErrorTypes.INTERNAL_SERVER_ERROR.value
                response["success"] = response["error"] is None
                encoder.encode(json.dumps(response, default=serialize))
        except BrokenPipeError:
            pass
Functions
def deserialize(obj)- 
Expand source code
def deserialize(obj): if isinstance(obj, dict) and obj.get("__serialized__"): module = sys.modules[obj["module"]] object_instance = getattr(module, obj["type"])() object_instance.from_dict(obj["data"]) return object_instance return obj def serialize(obj)- 
Expand source code
def serialize(obj): if not isinstance(obj, (str, int, float, list, tuple, dict, bool)): module = inspect.getmodule(obj).__file__[:-3] return dict(__serialized__=True, module=module, type=obj.__class__.__name__, data=obj.to_dict()) return obj 
Classes
class GedisErrorTypes (value, names=None, *, module=None, qualname=None, type=None, start=1)- 
An enumeration.
Expand source code
class GedisErrorTypes(Enum): NOT_FOUND = 0 BAD_REQUEST = 1 ACTOR_ERROR = 3 INTERNAL_SERVER_ERROR = 4 PERMISSION_ERROR = 5Ancestors
- enum.Enum
 
Class variables
var ACTOR_ERRORvar BAD_REQUESTvar INTERNAL_SERVER_ERRORvar NOT_FOUNDvar PERMISSION_ERROR
 class GedisServer (*args, **kwargs)- 
A simple attribute-based namespace.
SimpleNamespace(**kwargs)
base class implementation for any class with fields which supports getting/setting raw data for any instance fields.
any instance can have an optional name and a parent.
class Person(Base): name = fields.String() age = fields.Float() p = Person(name="ahmed", age="19") print(p.name, p.age)Args
parent_:Base, optional- parent instance. Defaults to None.
 instance_name_:str, optional- instance name. Defaults to None.
 **values- any given field values to initiate the instance with
 
Expand source code
class GedisServer(Base): host = fields.String(default="127.0.0.1") port = fields.Integer(default=16000) enable_system_actor = fields.Boolean(default=True) run_async = fields.Boolean(default=True) _actors = fields.Typed(dict, default={}) def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._core_actor = CoreActor() self._system_actor = SystemActor() self._loaded_actors = {"core": self._core_actor} @property def actors(self): """Lists saved actors Returns: list -- List of saved actors """ return self._actors def actor_add(self, actor_name: str, actor_path: str): """Adds an actor to the server Arguments: actor_name {str} -- Actor name actor_path {str} -- Actor absolute path Raises: j.exceptions.Value: raises if actor name is matched one of the reserved actor names j.exceptions.Value: raises if actor name is not a valid identifier """ if actor_name in RESERVED_ACTOR_NAMES: raise j.exceptions.Value("Invalid actor name") if not actor_name.isidentifier(): raise j.exceptions.Value(f"Actor name should be a valid identifier") self._actors[actor_name] = actor_path def actor_delete(self, actor_name: str): """Removes an actor from the server Arguments: actor_name {str} -- Actor name """ self._actors.pop(actor_name, None) def start(self): """Starts the server """ # register system actor if enabled if self.enable_system_actor: self._register_actor("system", self._system_actor) self._core_actor.set_server(self) self._system_actor.set_server(self) # register saved actors for actor_name, actor_path in self._actors.items(): self._system_actor.register_actor(actor_name, actor_path) # start the server self._server = StreamServer((self.host, self.port), self._on_connection, spawn=Pool()) self._server.reuse_addr = True self._server.start() j.logger.info(f"Gedis server is started at {self.host}:{self.port}...") def stop(self): """Stops the server """ j.logger.info("Shutting down...") self._server.stop() def _register_actor(self, actor_name: str, actor_module: BaseActor): self._loaded_actors[actor_name] = actor_module def _unregister_actor(self, actor_name: str): self._loaded_actors.pop(actor_name, None) def _execute(self, method, args, kwargs): response = {} try: response["result"] = method(*args, **kwargs) except Exception as e: j.logger.exception(f"error while executing {method}", exception=e) response["error"] = str(e) response["error_type"] = EXCEPTIONS_MAP.get(e.__class__, GedisErrorTypes.ACTOR_ERROR.value) return response def _on_connection(self, socket, address): j.logger.debug(f"New connection from {address}") parser = DefaultParser(65536) connection = RedisConnectionAdapter(socket) try: encoder = ResponseEncoder(socket) parser.on_connect(connection) while True: response = dict(success=True, result=None, error=None, error_type=None, is_async=False, task_id=None) try: request = parser.read_response() if len(request) < 2: response["error"] = "invalid request" response["error_type"] = GedisErrorTypes.BAD_REQUEST.value else: actor_name = request.pop(0).decode() method_name = request.pop(0).decode() actor_object = self._loaded_actors.get(actor_name) if not actor_object: response["error"] = "actor not found" response["error_type"] = GedisErrorTypes.NOT_FOUND.value elif not hasattr(actor_object, method_name): response["error"] = "method not found" response["error_type"] = GedisErrorTypes.NOT_FOUND.value else: j.logger.debug( f"Executing method {method_name} from actor {actor_name} to client {address}" ) if request: args, kwargs = json.loads(request.pop(0), object_hook=deserialize) else: args, kwargs = (), {} method = getattr(actor_object, method_name) result = self._execute(method, args, kwargs) response.update(result) except (TimeoutError, ConnectionError): j.logger.debug(f"Client {address} closed the connection/or timeout", address) parser.on_disconnect() return except Exception as exception: j.logger.exception("internal error", exception=exception) response["error"] = "internal server error" response["error_type"] = GedisErrorTypes.INTERNAL_SERVER_ERROR.value response["success"] = response["error"] is None encoder.encode(json.dumps(response, default=serialize)) except BrokenPipeError: passAncestors
- Base
 - types.SimpleNamespace
 
Instance variables
var actors- 
Lists saved actors
Returns
list – List of saved actors
Expand source code
@property def actors(self): """Lists saved actors Returns: list -- List of saved actors """ return self._actors var enable_system_actor- 
getter method this property
will call
_get_value, which would if the value is already defined and will get the default value if notReturns
any- the field value
 
Expand source code
def getter(self): """ getter method this property will call `_get_value`, which would if the value is already defined and will get the default value if not Returns: any: the field value """ return self._get_value(name, field) var host- 
getter method this property
will call
_get_value, which would if the value is already defined and will get the default value if notReturns
any- the field value
 
Expand source code
def getter(self): """ getter method this property will call `_get_value`, which would if the value is already defined and will get the default value if not Returns: any: the field value """ return self._get_value(name, field) var port- 
getter method this property
will call
_get_value, which would if the value is already defined and will get the default value if notReturns
any- the field value
 
Expand source code
def getter(self): """ getter method this property will call `_get_value`, which would if the value is already defined and will get the default value if not Returns: any: the field value """ return self._get_value(name, field) var run_async- 
getter method this property
will call
_get_value, which would if the value is already defined and will get the default value if notReturns
any- the field value
 
Expand source code
def getter(self): """ getter method this property will call `_get_value`, which would if the value is already defined and will get the default value if not Returns: any: the field value """ return self._get_value(name, field) 
Methods
def actor_add(self, actor_name: str, actor_path: str)- 
Adds an actor to the server
Arguments
actor_name {str} – Actor name actor_path {str} – Actor absolute path
Raises
j.exceptions.Value- raises if actor name is matched one of the reserved actor names
 j.exceptions.Value- raises if actor name is not a valid identifier
 
Expand source code
def actor_add(self, actor_name: str, actor_path: str): """Adds an actor to the server Arguments: actor_name {str} -- Actor name actor_path {str} -- Actor absolute path Raises: j.exceptions.Value: raises if actor name is matched one of the reserved actor names j.exceptions.Value: raises if actor name is not a valid identifier """ if actor_name in RESERVED_ACTOR_NAMES: raise j.exceptions.Value("Invalid actor name") if not actor_name.isidentifier(): raise j.exceptions.Value(f"Actor name should be a valid identifier") self._actors[actor_name] = actor_path def actor_delete(self, actor_name: str)- 
Removes an actor from the server
Arguments
actor_name {str} – Actor name
Expand source code
def actor_delete(self, actor_name: str): """Removes an actor from the server Arguments: actor_name {str} -- Actor name """ self._actors.pop(actor_name, None) def start(self)- 
Starts the server
Expand source code
def start(self): """Starts the server """ # register system actor if enabled if self.enable_system_actor: self._register_actor("system", self._system_actor) self._core_actor.set_server(self) self._system_actor.set_server(self) # register saved actors for actor_name, actor_path in self._actors.items(): self._system_actor.register_actor(actor_name, actor_path) # start the server self._server = StreamServer((self.host, self.port), self._on_connection, spawn=Pool()) self._server.reuse_addr = True self._server.start() j.logger.info(f"Gedis server is started at {self.host}:{self.port}...") def stop(self)- 
Stops the server
Expand source code
def stop(self): """Stops the server """ j.logger.info("Shutting down...") self._server.stop() 
Inherited members
 class RedisConnectionAdapter (sock)- 
Expand source code
class RedisConnectionAdapter: def __init__(self, sock): self.socket = sock self._sock = sock self.socket_timeout = 600 self.socket_connect_timeout = 600 self.socket_keepalive = True self.retry_on_timeout = True self.socket_keepalive_options = {} self.encoder = Encoder("utf", "strict", False) class ResponseEncoder (socket)- 
Expand source code
class ResponseEncoder: def __init__(self, socket): self.socket = socket self.buffer = BytesIO() def encode(self, value): """Respond with data.""" if value is None: self._write_buffer("$-1\r\n") elif isinstance(value, int): self._write_buffer(":{}\r\n".format(value)) elif isinstance(value, bool): self._write_buffer(":{}\r\n".format(1 if value else 0)) elif isinstance(value, str): if "\n" in value: self._bulk(value) else: self._write_buffer("+{}\r\n".format(value)) elif isinstance(value, bytes): self._bulkbytes(value) elif isinstance(value, list): if value and value[0] == "*REDIS*": value = value[1:] self._array(value) elif hasattr(value, "__repr__"): self._bulk(value.__repr__()) else: value = j.data.serializers.json.dumps(value, encoding="utf-8") self.encode(value) self._send() def status(self, msg="OK"): """Send a status.""" self._write_buffer("+{}\r\n".format(msg)) self._send() def error(self, msg): """Send an error.""" # print("###:%s" % msg) self._write_buffer("-ERR {}\r\n".format(msg)) self._send() def _bulk(self, value): """Send part of a multiline reply.""" data = ["$", str(len(value)), "\r\n", value, "\r\n"] self._write_buffer("".join(data)) def _array(self, value): """send an array.""" self._write_buffer("*{}\r\n".format(len(value))) for item in value: self.encode(item) def _bulkbytes(self, value): data = [b"$", str(len(value)).encode(), b"\r\n", value, b"\r\n"] self._write_buffer(b"".join(data)) def _write_buffer(self, data): if isinstance(data, str): data = data.encode() self.buffer.write(data) def _send(self): self.socket.sendall(self.buffer.getvalue()) self.buffer = BytesIO() # seems faster then truncatingMethods
def encode(self, value)- 
Respond with data.
Expand source code
def encode(self, value): """Respond with data.""" if value is None: self._write_buffer("$-1\r\n") elif isinstance(value, int): self._write_buffer(":{}\r\n".format(value)) elif isinstance(value, bool): self._write_buffer(":{}\r\n".format(1 if value else 0)) elif isinstance(value, str): if "\n" in value: self._bulk(value) else: self._write_buffer("+{}\r\n".format(value)) elif isinstance(value, bytes): self._bulkbytes(value) elif isinstance(value, list): if value and value[0] == "*REDIS*": value = value[1:] self._array(value) elif hasattr(value, "__repr__"): self._bulk(value.__repr__()) else: value = j.data.serializers.json.dumps(value, encoding="utf-8") self.encode(value) self._send() def error(self, msg)- 
Send an error.
Expand source code
def error(self, msg): """Send an error.""" # print("###:%s" % msg) self._write_buffer("-ERR {}\r\n".format(msg)) self._send() def status(self, msg='OK')- 
Send a status.
Expand source code
def status(self, msg="OK"): """Send a status.""" self._write_buffer("+{}\r\n".format(msg)) self._send()