Module jumpscale.servers.gedis.systemactor

Expand source code
import os
import sys
import json
import inspect
from jumpscale.loader import j
from jumpscale.servers.gedis.baseactor import BaseActor, actor_method


class CoreActor(BaseActor):
    def __init__(self):
        super().__init__()
        self._server = None
        self.path = __file__

    def set_server(self, server):
        self._server = server

    @actor_method
    def list_actors(self) -> list:
        """List available actors

        Returns:
            list -- list of available actors
        """
        return list(self._server._loaded_actors.keys())


class SystemActor(BaseActor):
    def __init__(self):
        super().__init__()
        self._server = None
        self.path = __file__

    def set_server(self, server):
        self._server = server

    @actor_method
    def register_actor(self, actor_name: str, actor_path: str, force_reload: bool = False) -> bool:
        """
        Register new actor

        Args:
            actor_name (str): actor name within gedis server.
            actor_path (str): actor path on gedis server machine.
            force_reload (bool, optional): reload the module if set. Defaults to False.

        Raises:
            j.exceptions.Validation: in case the actor is not valid

        Returns:
            bool: True if registered
        """
        module = j.tools.codeloader.load_python_module(actor_path, force_reload=force_reload)
        actor = module.Actor()
        actor.path = actor_path
        result = actor.__validate_actor__()

        if not result["valid"]:
            raise j.exceptions.Validation(
                "Actor {} is not valid, check the following errors {}".format(actor_name, result["errors"])
            )

        self._server._register_actor(actor_name, actor)
        return True

    @actor_method
    def unregister_actor(self, actor_name: str) -> bool:
        """Register actor

        Arguments:
            actor_name {str} -- actor name

        Returns:
            bool -- True if actors is unregistered
        """
        self._server._unregister_actor(actor_name)
        return True

Classes

class CoreActor
Expand source code
class CoreActor(BaseActor):
    def __init__(self):
        super().__init__()
        self._server = None
        self.path = __file__

    def set_server(self, server):
        self._server = server

    @actor_method
    def list_actors(self) -> list:
        """List available actors

        Returns:
            list -- list of available actors
        """
        return list(self._server._loaded_actors.keys())

Ancestors

Methods

def list_actors(self) ‑> list

List available actors

Returns

list – list of available actors

Expand source code
@actor_method
def list_actors(self) -> list:
    """List available actors

    Returns:
        list -- list of available actors
    """
    return list(self._server._loaded_actors.keys())
def set_server(self, server)
Expand source code
def set_server(self, server):
    self._server = server
class SystemActor
Expand source code
class SystemActor(BaseActor):
    def __init__(self):
        super().__init__()
        self._server = None
        self.path = __file__

    def set_server(self, server):
        self._server = server

    @actor_method
    def register_actor(self, actor_name: str, actor_path: str, force_reload: bool = False) -> bool:
        """
        Register new actor

        Args:
            actor_name (str): actor name within gedis server.
            actor_path (str): actor path on gedis server machine.
            force_reload (bool, optional): reload the module if set. Defaults to False.

        Raises:
            j.exceptions.Validation: in case the actor is not valid

        Returns:
            bool: True if registered
        """
        module = j.tools.codeloader.load_python_module(actor_path, force_reload=force_reload)
        actor = module.Actor()
        actor.path = actor_path
        result = actor.__validate_actor__()

        if not result["valid"]:
            raise j.exceptions.Validation(
                "Actor {} is not valid, check the following errors {}".format(actor_name, result["errors"])
            )

        self._server._register_actor(actor_name, actor)
        return True

    @actor_method
    def unregister_actor(self, actor_name: str) -> bool:
        """Register actor

        Arguments:
            actor_name {str} -- actor name

        Returns:
            bool -- True if actors is unregistered
        """
        self._server._unregister_actor(actor_name)
        return True

Ancestors

Methods

def register_actor(self, actor_name: str, actor_path: str, force_reload: bool = False) ‑> bool

Register new actor

Args

actor_name : str
actor name within gedis server.
actor_path : str
actor path on gedis server machine.
force_reload : bool, optional
reload the module if set. Defaults to False.

Raises

j.exceptions.Validation
in case the actor is not valid

Returns

bool
True if registered
Expand source code
@actor_method
def register_actor(self, actor_name: str, actor_path: str, force_reload: bool = False) -> bool:
    """
    Register new actor

    Args:
        actor_name (str): actor name within gedis server.
        actor_path (str): actor path on gedis server machine.
        force_reload (bool, optional): reload the module if set. Defaults to False.

    Raises:
        j.exceptions.Validation: in case the actor is not valid

    Returns:
        bool: True if registered
    """
    module = j.tools.codeloader.load_python_module(actor_path, force_reload=force_reload)
    actor = module.Actor()
    actor.path = actor_path
    result = actor.__validate_actor__()

    if not result["valid"]:
        raise j.exceptions.Validation(
            "Actor {} is not valid, check the following errors {}".format(actor_name, result["errors"])
        )

    self._server._register_actor(actor_name, actor)
    return True
def set_server(self, server)
Expand source code
def set_server(self, server):
    self._server = server
def unregister_actor(self, actor_name: str) ‑> bool

Register actor

Arguments

actor_name {str} – actor name

Returns

bool – True if actors is unregistered

Expand source code
@actor_method
def unregister_actor(self, actor_name: str) -> bool:
    """Register actor

    Arguments:
        actor_name {str} -- actor name

    Returns:
        bool -- True if actors is unregistered
    """
    self._server._unregister_actor(actor_name)
    return True