Module jumpscale.packages.chatflows.actors.chatbot

Expand source code
from jumpscale.servers.gedis.baseactor import BaseActor, actor_method
from jumpscale.loader import j
import base64
import sys
import uuid
import importlib
import os


class ChatFlows(BaseActor):
    def __init__(self):
        super().__init__()
        self.chats = {}
        self.sessions = {}

    @actor_method
    def new(self, package: str, chat: str, client_ip: str, query_params: dict = None) -> dict:
        package_object = self.chats.get(package)
        if not package_object:
            raise j.exceptions.Value(f"Package {package} not found")

        chatflow = package_object.get(chat)
        if not chatflow:
            raise j.exceptions.Value(f"Chat {chat} not found")

        if query_params is None:
            query_params = {}
        obj = chatflow(**query_params)
        self.sessions[obj.session_id] = obj
        return {"sessionId": obj.session_id, "title": obj.title}

    @actor_method
    def fetch(self, session_id: str, restore: bool = False) -> dict:
        chatflow = self.sessions.get(session_id)
        if not chatflow:
            return {"payload": {"category": "end"}}

        result = chatflow.get_work(restore)

        if result.get("category") == "end":
            self.sessions.pop(session_id)

        return result

    @actor_method
    def validate(self, session_id: str) -> dict:
        return {"valid": session_id in self.sessions}

    @actor_method
    def report(self, session_id: str, result: str = None):
        chatflow = self.sessions.get(session_id)
        chatflow.set_work(result)

    @actor_method
    def back(self, session_id: str):
        chatflow = self.sessions.get(session_id)
        chatflow.go_back()

    @actor_method
    def chatflows_list(self) -> list:
        return list(self.chats.keys())

    def _scan_chats(self, path):
        package = j.sals.fs.basename(j.sals.fs.parent(path))
        for path in j.sals.fs.walk_files(path, recursive=False):
            if path.endswith(".py"):
                name = j.sals.fs.stem(path)
                yield path, package, name

    def _import_path(self, filepath):
        absolute_path = os.path.abspath(filepath)
        paths = sys.path[:]
        paths.sort(key=lambda p: len(p), reverse=True)
        for syspath in paths:
            absolute_sys_path = os.path.abspath(syspath)
            if absolute_path.startswith(absolute_sys_path):
                parts = absolute_path[len(absolute_sys_path) + 1 : -3].split(os.path.sep)
                if "__init__" in parts:
                    parts.remove("__init__")
                module_name = ".".join(parts)
                break
        else:
            module_name = absolute_path

        spec = importlib.util.spec_from_file_location(module_name, absolute_path)
        module = importlib.util.module_from_spec(spec)
        sys.modules[module_name] = module
        spec.loader.exec_module(module)
        return module

    @actor_method
    def load(self, path: str):
        for path, package, chat in self._scan_chats(path):
            module = self._import_path(path)
            if package not in self.chats:
                self.chats[package] = {}

            self.chats[package][chat] = module.chat

    @actor_method
    def unload(self, path: str):
        for _, package, chat in self._scan_chats(path):
            self.chats[package].pop(chat)


Actor = ChatFlows

Classes

class ChatFlows
Expand source code
class ChatFlows(BaseActor):
    def __init__(self):
        super().__init__()
        self.chats = {}
        self.sessions = {}

    @actor_method
    def new(self, package: str, chat: str, client_ip: str, query_params: dict = None) -> dict:
        package_object = self.chats.get(package)
        if not package_object:
            raise j.exceptions.Value(f"Package {package} not found")

        chatflow = package_object.get(chat)
        if not chatflow:
            raise j.exceptions.Value(f"Chat {chat} not found")

        if query_params is None:
            query_params = {}
        obj = chatflow(**query_params)
        self.sessions[obj.session_id] = obj
        return {"sessionId": obj.session_id, "title": obj.title}

    @actor_method
    def fetch(self, session_id: str, restore: bool = False) -> dict:
        chatflow = self.sessions.get(session_id)
        if not chatflow:
            return {"payload": {"category": "end"}}

        result = chatflow.get_work(restore)

        if result.get("category") == "end":
            self.sessions.pop(session_id)

        return result

    @actor_method
    def validate(self, session_id: str) -> dict:
        return {"valid": session_id in self.sessions}

    @actor_method
    def report(self, session_id: str, result: str = None):
        chatflow = self.sessions.get(session_id)
        chatflow.set_work(result)

    @actor_method
    def back(self, session_id: str):
        chatflow = self.sessions.get(session_id)
        chatflow.go_back()

    @actor_method
    def chatflows_list(self) -> list:
        return list(self.chats.keys())

    def _scan_chats(self, path):
        package = j.sals.fs.basename(j.sals.fs.parent(path))
        for path in j.sals.fs.walk_files(path, recursive=False):
            if path.endswith(".py"):
                name = j.sals.fs.stem(path)
                yield path, package, name

    def _import_path(self, filepath):
        absolute_path = os.path.abspath(filepath)
        paths = sys.path[:]
        paths.sort(key=lambda p: len(p), reverse=True)
        for syspath in paths:
            absolute_sys_path = os.path.abspath(syspath)
            if absolute_path.startswith(absolute_sys_path):
                parts = absolute_path[len(absolute_sys_path) + 1 : -3].split(os.path.sep)
                if "__init__" in parts:
                    parts.remove("__init__")
                module_name = ".".join(parts)
                break
        else:
            module_name = absolute_path

        spec = importlib.util.spec_from_file_location(module_name, absolute_path)
        module = importlib.util.module_from_spec(spec)
        sys.modules[module_name] = module
        spec.loader.exec_module(module)
        return module

    @actor_method
    def load(self, path: str):
        for path, package, chat in self._scan_chats(path):
            module = self._import_path(path)
            if package not in self.chats:
                self.chats[package] = {}

            self.chats[package][chat] = module.chat

    @actor_method
    def unload(self, path: str):
        for _, package, chat in self._scan_chats(path):
            self.chats[package].pop(chat)

Ancestors

Methods

def back(self, session_id: str)
Expand source code
@actor_method
def back(self, session_id: str):
    chatflow = self.sessions.get(session_id)
    chatflow.go_back()
def chatflows_list(self) ‑> list
Expand source code
@actor_method
def chatflows_list(self) -> list:
    return list(self.chats.keys())
def fetch(self, session_id: str, restore: bool = False) ‑> dict
Expand source code
@actor_method
def fetch(self, session_id: str, restore: bool = False) -> dict:
    chatflow = self.sessions.get(session_id)
    if not chatflow:
        return {"payload": {"category": "end"}}

    result = chatflow.get_work(restore)

    if result.get("category") == "end":
        self.sessions.pop(session_id)

    return result
def load(self, path: str)
Expand source code
@actor_method
def load(self, path: str):
    for path, package, chat in self._scan_chats(path):
        module = self._import_path(path)
        if package not in self.chats:
            self.chats[package] = {}

        self.chats[package][chat] = module.chat
def new(self, package: str, chat: str, client_ip: str, query_params: dict = None) ‑> dict
Expand source code
@actor_method
def new(self, package: str, chat: str, client_ip: str, query_params: dict = None) -> dict:
    package_object = self.chats.get(package)
    if not package_object:
        raise j.exceptions.Value(f"Package {package} not found")

    chatflow = package_object.get(chat)
    if not chatflow:
        raise j.exceptions.Value(f"Chat {chat} not found")

    if query_params is None:
        query_params = {}
    obj = chatflow(**query_params)
    self.sessions[obj.session_id] = obj
    return {"sessionId": obj.session_id, "title": obj.title}
def report(self, session_id: str, result: str = None)
Expand source code
@actor_method
def report(self, session_id: str, result: str = None):
    chatflow = self.sessions.get(session_id)
    chatflow.set_work(result)
def unload(self, path: str)
Expand source code
@actor_method
def unload(self, path: str):
    for _, package, chat in self._scan_chats(path):
        self.chats[package].pop(chat)
def validate(self, session_id: str) ‑> dict
Expand source code
@actor_method
def validate(self, session_id: str) -> dict:
    return {"valid": session_id in self.sessions}
class Actor
Expand source code
class ChatFlows(BaseActor):
    def __init__(self):
        super().__init__()
        self.chats = {}
        self.sessions = {}

    @actor_method
    def new(self, package: str, chat: str, client_ip: str, query_params: dict = None) -> dict:
        package_object = self.chats.get(package)
        if not package_object:
            raise j.exceptions.Value(f"Package {package} not found")

        chatflow = package_object.get(chat)
        if not chatflow:
            raise j.exceptions.Value(f"Chat {chat} not found")

        if query_params is None:
            query_params = {}
        obj = chatflow(**query_params)
        self.sessions[obj.session_id] = obj
        return {"sessionId": obj.session_id, "title": obj.title}

    @actor_method
    def fetch(self, session_id: str, restore: bool = False) -> dict:
        chatflow = self.sessions.get(session_id)
        if not chatflow:
            return {"payload": {"category": "end"}}

        result = chatflow.get_work(restore)

        if result.get("category") == "end":
            self.sessions.pop(session_id)

        return result

    @actor_method
    def validate(self, session_id: str) -> dict:
        return {"valid": session_id in self.sessions}

    @actor_method
    def report(self, session_id: str, result: str = None):
        chatflow = self.sessions.get(session_id)
        chatflow.set_work(result)

    @actor_method
    def back(self, session_id: str):
        chatflow = self.sessions.get(session_id)
        chatflow.go_back()

    @actor_method
    def chatflows_list(self) -> list:
        return list(self.chats.keys())

    def _scan_chats(self, path):
        package = j.sals.fs.basename(j.sals.fs.parent(path))
        for path in j.sals.fs.walk_files(path, recursive=False):
            if path.endswith(".py"):
                name = j.sals.fs.stem(path)
                yield path, package, name

    def _import_path(self, filepath):
        absolute_path = os.path.abspath(filepath)
        paths = sys.path[:]
        paths.sort(key=lambda p: len(p), reverse=True)
        for syspath in paths:
            absolute_sys_path = os.path.abspath(syspath)
            if absolute_path.startswith(absolute_sys_path):
                parts = absolute_path[len(absolute_sys_path) + 1 : -3].split(os.path.sep)
                if "__init__" in parts:
                    parts.remove("__init__")
                module_name = ".".join(parts)
                break
        else:
            module_name = absolute_path

        spec = importlib.util.spec_from_file_location(module_name, absolute_path)
        module = importlib.util.module_from_spec(spec)
        sys.modules[module_name] = module
        spec.loader.exec_module(module)
        return module

    @actor_method
    def load(self, path: str):
        for path, package, chat in self._scan_chats(path):
            module = self._import_path(path)
            if package not in self.chats:
                self.chats[package] = {}

            self.chats[package][chat] = module.chat

    @actor_method
    def unload(self, path: str):
        for _, package, chat in self._scan_chats(path):
            self.chats[package].pop(chat)

Ancestors

Methods

def back(self, session_id: str)
Expand source code
@actor_method
def back(self, session_id: str):
    chatflow = self.sessions.get(session_id)
    chatflow.go_back()
def chatflows_list(self) ‑> list
Expand source code
@actor_method
def chatflows_list(self) -> list:
    return list(self.chats.keys())
def fetch(self, session_id: str, restore: bool = False) ‑> dict
Expand source code
@actor_method
def fetch(self, session_id: str, restore: bool = False) -> dict:
    chatflow = self.sessions.get(session_id)
    if not chatflow:
        return {"payload": {"category": "end"}}

    result = chatflow.get_work(restore)

    if result.get("category") == "end":
        self.sessions.pop(session_id)

    return result
def load(self, path: str)
Expand source code
@actor_method
def load(self, path: str):
    for path, package, chat in self._scan_chats(path):
        module = self._import_path(path)
        if package not in self.chats:
            self.chats[package] = {}

        self.chats[package][chat] = module.chat
def new(self, package: str, chat: str, client_ip: str, query_params: dict = None) ‑> dict
Expand source code
@actor_method
def new(self, package: str, chat: str, client_ip: str, query_params: dict = None) -> dict:
    package_object = self.chats.get(package)
    if not package_object:
        raise j.exceptions.Value(f"Package {package} not found")

    chatflow = package_object.get(chat)
    if not chatflow:
        raise j.exceptions.Value(f"Chat {chat} not found")

    if query_params is None:
        query_params = {}
    obj = chatflow(**query_params)
    self.sessions[obj.session_id] = obj
    return {"sessionId": obj.session_id, "title": obj.title}
def report(self, session_id: str, result: str = None)
Expand source code
@actor_method
def report(self, session_id: str, result: str = None):
    chatflow = self.sessions.get(session_id)
    chatflow.set_work(result)
def unload(self, path: str)
Expand source code
@actor_method
def unload(self, path: str):
    for _, package, chat in self._scan_chats(path):
        self.chats[package].pop(chat)
def validate(self, session_id: str) ‑> dict
Expand source code
@actor_method
def validate(self, session_id: str) -> dict:
    return {"valid": session_id in self.sessions}