Module jumpscale.servers.gedis.server

Expand source code
import inspect
import json
import sys
import os
from redis import Redis
from enum import Enum
from functools import partial
from io import BytesIO
from signal import SIGKILL, SIGTERM
import json
import gevent
from gevent.pool import Pool
from gevent import time
from gevent.server import StreamServer
from jumpscale.core.base import Base, fields
from jumpscale.loader import j
from redis.connection import DefaultParser, Encoder
from redis.exceptions import ConnectionError, TimeoutError
from .baseactor import BaseActor
from .systemactor import CoreActor, SystemActor


def serialize(obj):
    if not isinstance(obj, (str, int, float, list, tuple, dict, bool)):
        module = inspect.getmodule(obj).__file__[:-3]
        return dict(__serialized__=True, module=module, type=obj.__class__.__name__, data=obj.to_dict())
    return obj


def deserialize(obj):
    if isinstance(obj, dict) and obj.get("__serialized__"):
        module = sys.modules[obj["module"]]
        object_instance = getattr(module, obj["type"])()
        object_instance.from_dict(obj["data"])
        return object_instance
    return obj


class GedisErrorTypes(Enum):
    NOT_FOUND = 0
    BAD_REQUEST = 1
    ACTOR_ERROR = 3
    INTERNAL_SERVER_ERROR = 4
    PERMISSION_ERROR = 5


EXCEPTIONS_MAP = {
    j.exceptions.Value: GedisErrorTypes.BAD_REQUEST.value,
    j.exceptions.NotFound: GedisErrorTypes.NOT_FOUND.value,
    j.exceptions.Permission: GedisErrorTypes.PERMISSION_ERROR.value,
}


class RedisConnectionAdapter:
    def __init__(self, sock):
        self.socket = sock
        self._sock = sock
        self.socket_timeout = 600
        self.socket_connect_timeout = 600
        self.socket_keepalive = True
        self.retry_on_timeout = True
        self.socket_keepalive_options = {}
        self.encoder = Encoder("utf", "strict", False)


class ResponseEncoder:
    def __init__(self, socket):
        self.socket = socket
        self.buffer = BytesIO()

    def encode(self, value):
        """Respond with data."""
        if value is None:
            self._write_buffer("$-1\r\n")
        elif isinstance(value, int):
            self._write_buffer(":{}\r\n".format(value))
        elif isinstance(value, bool):
            self._write_buffer(":{}\r\n".format(1 if value else 0))
        elif isinstance(value, str):
            if "\n" in value:
                self._bulk(value)
            else:
                self._write_buffer("+{}\r\n".format(value))
        elif isinstance(value, bytes):
            self._bulkbytes(value)
        elif isinstance(value, list):
            if value and value[0] == "*REDIS*":
                value = value[1:]
            self._array(value)
        elif hasattr(value, "__repr__"):
            self._bulk(value.__repr__())
        else:
            value = j.data.serializers.json.dumps(value, encoding="utf-8")
            self.encode(value)

        self._send()

    def status(self, msg="OK"):
        """Send a status."""
        self._write_buffer("+{}\r\n".format(msg))
        self._send()

    def error(self, msg):
        """Send an error."""
        # print("###:%s" % msg)
        self._write_buffer("-ERR {}\r\n".format(msg))
        self._send()

    def _bulk(self, value):
        """Send part of a multiline reply."""
        data = ["$", str(len(value)), "\r\n", value, "\r\n"]
        self._write_buffer("".join(data))

    def _array(self, value):
        """send an array."""
        self._write_buffer("*{}\r\n".format(len(value)))
        for item in value:
            self.encode(item)

    def _bulkbytes(self, value):
        data = [b"$", str(len(value)).encode(), b"\r\n", value, b"\r\n"]
        self._write_buffer(b"".join(data))

    def _write_buffer(self, data):
        if isinstance(data, str):
            data = data.encode()

        self.buffer.write(data)

    def _send(self):
        self.socket.sendall(self.buffer.getvalue())
        self.buffer = BytesIO()  # seems faster then truncating


SERIALIZABLE_TYPES = (str, int, float, list, tuple, dict, bool)
RESERVED_ACTOR_NAMES = ("core", "system")


class GedisServer(Base):
    host = fields.String(default="127.0.0.1")
    port = fields.Integer(default=16000)
    enable_system_actor = fields.Boolean(default=True)
    run_async = fields.Boolean(default=True)
    _actors = fields.Typed(dict, default={})

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._core_actor = CoreActor()
        self._system_actor = SystemActor()
        self._loaded_actors = {"core": self._core_actor}

    @property
    def actors(self):
        """Lists saved actors

        Returns:
            list -- List of saved actors
        """
        return self._actors

    def actor_add(self, actor_name: str, actor_path: str):
        """Adds an actor to the server

        Arguments:
            actor_name {str} -- Actor name
            actor_path {str} -- Actor absolute path

        Raises:
            j.exceptions.Value: raises if actor name is matched one of the reserved actor names
            j.exceptions.Value: raises if actor name is not a valid identifier
        """
        if actor_name in RESERVED_ACTOR_NAMES:
            raise j.exceptions.Value("Invalid actor name")

        if not actor_name.isidentifier():
            raise j.exceptions.Value(f"Actor name should be a valid identifier")

        self._actors[actor_name] = actor_path

    def actor_delete(self, actor_name: str):
        """Removes an actor from the server

        Arguments:
            actor_name {str} -- Actor name
        """
        self._actors.pop(actor_name, None)

    def start(self):
        """Starts the server
        """
        # register system actor if enabled
        if self.enable_system_actor:
            self._register_actor("system", self._system_actor)

        self._core_actor.set_server(self)
        self._system_actor.set_server(self)

        # register saved actors
        for actor_name, actor_path in self._actors.items():
            self._system_actor.register_actor(actor_name, actor_path)

        # start the server
        self._server = StreamServer((self.host, self.port), self._on_connection, spawn=Pool())
        self._server.reuse_addr = True
        self._server.start()

        j.logger.info(f"Gedis server is started at {self.host}:{self.port}...")

    def stop(self):
        """Stops the server
        """
        j.logger.info("Shutting down...")
        self._server.stop()

    def _register_actor(self, actor_name: str, actor_module: BaseActor):
        self._loaded_actors[actor_name] = actor_module

    def _unregister_actor(self, actor_name: str):
        self._loaded_actors.pop(actor_name, None)

    def _execute(self, method, args, kwargs):
        response = {}
        try:
            response["result"] = method(*args, **kwargs)

        except Exception as e:
            j.logger.exception(f"error while executing {method}", exception=e)

            response["error"] = str(e)
            response["error_type"] = EXCEPTIONS_MAP.get(e.__class__, GedisErrorTypes.ACTOR_ERROR.value)

        return response

    def _on_connection(self, socket, address):
        j.logger.debug(f"New connection from {address}")
        parser = DefaultParser(65536)
        connection = RedisConnectionAdapter(socket)
        try:
            encoder = ResponseEncoder(socket)
            parser.on_connect(connection)

            while True:
                response = dict(success=True, result=None, error=None, error_type=None, is_async=False, task_id=None)
                try:
                    request = parser.read_response()

                    if len(request) < 2:
                        response["error"] = "invalid request"
                        response["error_type"] = GedisErrorTypes.BAD_REQUEST.value

                    else:
                        actor_name = request.pop(0).decode()
                        method_name = request.pop(0).decode()
                        actor_object = self._loaded_actors.get(actor_name)

                        if not actor_object:
                            response["error"] = "actor not found"
                            response["error_type"] = GedisErrorTypes.NOT_FOUND.value

                        elif not hasattr(actor_object, method_name):
                            response["error"] = "method not found"
                            response["error_type"] = GedisErrorTypes.NOT_FOUND.value

                        else:
                            j.logger.debug(
                                f"Executing method {method_name} from actor {actor_name} to client {address}"
                            )

                            if request:
                                args, kwargs = json.loads(request.pop(0), object_hook=deserialize)
                            else:
                                args, kwargs = (), {}

                            method = getattr(actor_object, method_name)
                            result = self._execute(method, args, kwargs)
                            response.update(result)

                except (TimeoutError, ConnectionError):
                    j.logger.debug(f"Client {address} closed the connection/or timeout", address)
                    parser.on_disconnect()
                    return

                except Exception as exception:
                    j.logger.exception("internal error", exception=exception)
                    response["error"] = "internal server error"
                    response["error_type"] = GedisErrorTypes.INTERNAL_SERVER_ERROR.value

                response["success"] = response["error"] is None
                encoder.encode(json.dumps(response, default=serialize))

        except BrokenPipeError:
            pass

Functions

def deserialize(obj)
Expand source code
def deserialize(obj):
    if isinstance(obj, dict) and obj.get("__serialized__"):
        module = sys.modules[obj["module"]]
        object_instance = getattr(module, obj["type"])()
        object_instance.from_dict(obj["data"])
        return object_instance
    return obj
def serialize(obj)
Expand source code
def serialize(obj):
    if not isinstance(obj, (str, int, float, list, tuple, dict, bool)):
        module = inspect.getmodule(obj).__file__[:-3]
        return dict(__serialized__=True, module=module, type=obj.__class__.__name__, data=obj.to_dict())
    return obj

Classes

class GedisErrorTypes (value, names=None, *, module=None, qualname=None, type=None, start=1)

An enumeration.

Expand source code
class GedisErrorTypes(Enum):
    NOT_FOUND = 0
    BAD_REQUEST = 1
    ACTOR_ERROR = 3
    INTERNAL_SERVER_ERROR = 4
    PERMISSION_ERROR = 5

Ancestors

  • enum.Enum

Class variables

var ACTOR_ERROR
var BAD_REQUEST
var INTERNAL_SERVER_ERROR
var NOT_FOUND
var PERMISSION_ERROR
class GedisServer (*args, **kwargs)

A simple attribute-based namespace.

SimpleNamespace(**kwargs)

base class implementation for any class with fields which supports getting/setting raw data for any instance fields.

any instance can have an optional name and a parent.

class Person(Base):
    name = fields.String()
    age = fields.Float()

p = Person(name="ahmed", age="19")
print(p.name, p.age)

Args

parent_ : Base, optional
parent instance. Defaults to None.
instance_name_ : str, optional
instance name. Defaults to None.
**values
any given field values to initiate the instance with
Expand source code
class GedisServer(Base):
    host = fields.String(default="127.0.0.1")
    port = fields.Integer(default=16000)
    enable_system_actor = fields.Boolean(default=True)
    run_async = fields.Boolean(default=True)
    _actors = fields.Typed(dict, default={})

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._core_actor = CoreActor()
        self._system_actor = SystemActor()
        self._loaded_actors = {"core": self._core_actor}

    @property
    def actors(self):
        """Lists saved actors

        Returns:
            list -- List of saved actors
        """
        return self._actors

    def actor_add(self, actor_name: str, actor_path: str):
        """Adds an actor to the server

        Arguments:
            actor_name {str} -- Actor name
            actor_path {str} -- Actor absolute path

        Raises:
            j.exceptions.Value: raises if actor name is matched one of the reserved actor names
            j.exceptions.Value: raises if actor name is not a valid identifier
        """
        if actor_name in RESERVED_ACTOR_NAMES:
            raise j.exceptions.Value("Invalid actor name")

        if not actor_name.isidentifier():
            raise j.exceptions.Value(f"Actor name should be a valid identifier")

        self._actors[actor_name] = actor_path

    def actor_delete(self, actor_name: str):
        """Removes an actor from the server

        Arguments:
            actor_name {str} -- Actor name
        """
        self._actors.pop(actor_name, None)

    def start(self):
        """Starts the server
        """
        # register system actor if enabled
        if self.enable_system_actor:
            self._register_actor("system", self._system_actor)

        self._core_actor.set_server(self)
        self._system_actor.set_server(self)

        # register saved actors
        for actor_name, actor_path in self._actors.items():
            self._system_actor.register_actor(actor_name, actor_path)

        # start the server
        self._server = StreamServer((self.host, self.port), self._on_connection, spawn=Pool())
        self._server.reuse_addr = True
        self._server.start()

        j.logger.info(f"Gedis server is started at {self.host}:{self.port}...")

    def stop(self):
        """Stops the server
        """
        j.logger.info("Shutting down...")
        self._server.stop()

    def _register_actor(self, actor_name: str, actor_module: BaseActor):
        self._loaded_actors[actor_name] = actor_module

    def _unregister_actor(self, actor_name: str):
        self._loaded_actors.pop(actor_name, None)

    def _execute(self, method, args, kwargs):
        response = {}
        try:
            response["result"] = method(*args, **kwargs)

        except Exception as e:
            j.logger.exception(f"error while executing {method}", exception=e)

            response["error"] = str(e)
            response["error_type"] = EXCEPTIONS_MAP.get(e.__class__, GedisErrorTypes.ACTOR_ERROR.value)

        return response

    def _on_connection(self, socket, address):
        j.logger.debug(f"New connection from {address}")
        parser = DefaultParser(65536)
        connection = RedisConnectionAdapter(socket)
        try:
            encoder = ResponseEncoder(socket)
            parser.on_connect(connection)

            while True:
                response = dict(success=True, result=None, error=None, error_type=None, is_async=False, task_id=None)
                try:
                    request = parser.read_response()

                    if len(request) < 2:
                        response["error"] = "invalid request"
                        response["error_type"] = GedisErrorTypes.BAD_REQUEST.value

                    else:
                        actor_name = request.pop(0).decode()
                        method_name = request.pop(0).decode()
                        actor_object = self._loaded_actors.get(actor_name)

                        if not actor_object:
                            response["error"] = "actor not found"
                            response["error_type"] = GedisErrorTypes.NOT_FOUND.value

                        elif not hasattr(actor_object, method_name):
                            response["error"] = "method not found"
                            response["error_type"] = GedisErrorTypes.NOT_FOUND.value

                        else:
                            j.logger.debug(
                                f"Executing method {method_name} from actor {actor_name} to client {address}"
                            )

                            if request:
                                args, kwargs = json.loads(request.pop(0), object_hook=deserialize)
                            else:
                                args, kwargs = (), {}

                            method = getattr(actor_object, method_name)
                            result = self._execute(method, args, kwargs)
                            response.update(result)

                except (TimeoutError, ConnectionError):
                    j.logger.debug(f"Client {address} closed the connection/or timeout", address)
                    parser.on_disconnect()
                    return

                except Exception as exception:
                    j.logger.exception("internal error", exception=exception)
                    response["error"] = "internal server error"
                    response["error_type"] = GedisErrorTypes.INTERNAL_SERVER_ERROR.value

                response["success"] = response["error"] is None
                encoder.encode(json.dumps(response, default=serialize))

        except BrokenPipeError:
            pass

Ancestors

  • Base
  • types.SimpleNamespace

Instance variables

var actors

Lists saved actors

Returns

list – List of saved actors

Expand source code
@property
def actors(self):
    """Lists saved actors

    Returns:
        list -- List of saved actors
    """
    return self._actors
var enable_system_actor

getter method this property

will call _get_value, which would if the value is already defined and will get the default value if not

Returns

any
the field value
Expand source code
def getter(self):
    """
    getter method this property

    will call `_get_value`, which would if the value is already defined
    and will get the default value if not

    Returns:
        any: the field value
    """
    return self._get_value(name, field)
var host

getter method this property

will call _get_value, which would if the value is already defined and will get the default value if not

Returns

any
the field value
Expand source code
def getter(self):
    """
    getter method this property

    will call `_get_value`, which would if the value is already defined
    and will get the default value if not

    Returns:
        any: the field value
    """
    return self._get_value(name, field)
var port

getter method this property

will call _get_value, which would if the value is already defined and will get the default value if not

Returns

any
the field value
Expand source code
def getter(self):
    """
    getter method this property

    will call `_get_value`, which would if the value is already defined
    and will get the default value if not

    Returns:
        any: the field value
    """
    return self._get_value(name, field)
var run_async

getter method this property

will call _get_value, which would if the value is already defined and will get the default value if not

Returns

any
the field value
Expand source code
def getter(self):
    """
    getter method this property

    will call `_get_value`, which would if the value is already defined
    and will get the default value if not

    Returns:
        any: the field value
    """
    return self._get_value(name, field)

Methods

def actor_add(self, actor_name: str, actor_path: str)

Adds an actor to the server

Arguments

actor_name {str} – Actor name actor_path {str} – Actor absolute path

Raises

j.exceptions.Value
raises if actor name is matched one of the reserved actor names
j.exceptions.Value
raises if actor name is not a valid identifier
Expand source code
def actor_add(self, actor_name: str, actor_path: str):
    """Adds an actor to the server

    Arguments:
        actor_name {str} -- Actor name
        actor_path {str} -- Actor absolute path

    Raises:
        j.exceptions.Value: raises if actor name is matched one of the reserved actor names
        j.exceptions.Value: raises if actor name is not a valid identifier
    """
    if actor_name in RESERVED_ACTOR_NAMES:
        raise j.exceptions.Value("Invalid actor name")

    if not actor_name.isidentifier():
        raise j.exceptions.Value(f"Actor name should be a valid identifier")

    self._actors[actor_name] = actor_path
def actor_delete(self, actor_name: str)

Removes an actor from the server

Arguments

actor_name {str} – Actor name

Expand source code
def actor_delete(self, actor_name: str):
    """Removes an actor from the server

    Arguments:
        actor_name {str} -- Actor name
    """
    self._actors.pop(actor_name, None)
def start(self)

Starts the server

Expand source code
def start(self):
    """Starts the server
    """
    # register system actor if enabled
    if self.enable_system_actor:
        self._register_actor("system", self._system_actor)

    self._core_actor.set_server(self)
    self._system_actor.set_server(self)

    # register saved actors
    for actor_name, actor_path in self._actors.items():
        self._system_actor.register_actor(actor_name, actor_path)

    # start the server
    self._server = StreamServer((self.host, self.port), self._on_connection, spawn=Pool())
    self._server.reuse_addr = True
    self._server.start()

    j.logger.info(f"Gedis server is started at {self.host}:{self.port}...")
def stop(self)

Stops the server

Expand source code
def stop(self):
    """Stops the server
    """
    j.logger.info("Shutting down...")
    self._server.stop()

Inherited members

class RedisConnectionAdapter (sock)
Expand source code
class RedisConnectionAdapter:
    def __init__(self, sock):
        self.socket = sock
        self._sock = sock
        self.socket_timeout = 600
        self.socket_connect_timeout = 600
        self.socket_keepalive = True
        self.retry_on_timeout = True
        self.socket_keepalive_options = {}
        self.encoder = Encoder("utf", "strict", False)
class ResponseEncoder (socket)
Expand source code
class ResponseEncoder:
    def __init__(self, socket):
        self.socket = socket
        self.buffer = BytesIO()

    def encode(self, value):
        """Respond with data."""
        if value is None:
            self._write_buffer("$-1\r\n")
        elif isinstance(value, int):
            self._write_buffer(":{}\r\n".format(value))
        elif isinstance(value, bool):
            self._write_buffer(":{}\r\n".format(1 if value else 0))
        elif isinstance(value, str):
            if "\n" in value:
                self._bulk(value)
            else:
                self._write_buffer("+{}\r\n".format(value))
        elif isinstance(value, bytes):
            self._bulkbytes(value)
        elif isinstance(value, list):
            if value and value[0] == "*REDIS*":
                value = value[1:]
            self._array(value)
        elif hasattr(value, "__repr__"):
            self._bulk(value.__repr__())
        else:
            value = j.data.serializers.json.dumps(value, encoding="utf-8")
            self.encode(value)

        self._send()

    def status(self, msg="OK"):
        """Send a status."""
        self._write_buffer("+{}\r\n".format(msg))
        self._send()

    def error(self, msg):
        """Send an error."""
        # print("###:%s" % msg)
        self._write_buffer("-ERR {}\r\n".format(msg))
        self._send()

    def _bulk(self, value):
        """Send part of a multiline reply."""
        data = ["$", str(len(value)), "\r\n", value, "\r\n"]
        self._write_buffer("".join(data))

    def _array(self, value):
        """send an array."""
        self._write_buffer("*{}\r\n".format(len(value)))
        for item in value:
            self.encode(item)

    def _bulkbytes(self, value):
        data = [b"$", str(len(value)).encode(), b"\r\n", value, b"\r\n"]
        self._write_buffer(b"".join(data))

    def _write_buffer(self, data):
        if isinstance(data, str):
            data = data.encode()

        self.buffer.write(data)

    def _send(self):
        self.socket.sendall(self.buffer.getvalue())
        self.buffer = BytesIO()  # seems faster then truncating

Methods

def encode(self, value)

Respond with data.

Expand source code
def encode(self, value):
    """Respond with data."""
    if value is None:
        self._write_buffer("$-1\r\n")
    elif isinstance(value, int):
        self._write_buffer(":{}\r\n".format(value))
    elif isinstance(value, bool):
        self._write_buffer(":{}\r\n".format(1 if value else 0))
    elif isinstance(value, str):
        if "\n" in value:
            self._bulk(value)
        else:
            self._write_buffer("+{}\r\n".format(value))
    elif isinstance(value, bytes):
        self._bulkbytes(value)
    elif isinstance(value, list):
        if value and value[0] == "*REDIS*":
            value = value[1:]
        self._array(value)
    elif hasattr(value, "__repr__"):
        self._bulk(value.__repr__())
    else:
        value = j.data.serializers.json.dumps(value, encoding="utf-8")
        self.encode(value)

    self._send()
def error(self, msg)

Send an error.

Expand source code
def error(self, msg):
    """Send an error."""
    # print("###:%s" % msg)
    self._write_buffer("-ERR {}\r\n".format(msg))
    self._send()
def status(self, msg='OK')

Send a status.

Expand source code
def status(self, msg="OK"):
    """Send a status."""
    self._write_buffer("+{}\r\n".format(msg))
    self._send()