Module jumpscale.packages.admin.actors.admin

Expand source code
from jumpscale.loader import j
from jumpscale.servers.gedis.baseactor import BaseActor, actor_method
from requests import HTTPError
import json


class Admin(BaseActor):
    @actor_method
    def list_admins(self) -> str:
        admins = list(set(j.core.identity.me.admins))
        return j.data.serializers.json.dumps({"data": admins})

    @actor_method
    def add_admin(self, name: str):
        if name in j.core.identity.me.admins:
            raise j.exceptions.Value(f"Admin {name} already exists")
        j.core.identity.me.admins.append(name)
        j.core.identity.me.save()

    @actor_method
    def delete_admin(self, name: str):
        if name not in j.core.identity.me.admins:
            raise j.exceptions.Value(f"Admin {name} does not exist")
        j.core.identity.me.admins.remove(name)
        j.core.identity.me.save()

    @actor_method
    def list_identities(self) -> str:
        identities = j.core.identity.list_all()
        identity_data = {}
        for identity_name in identities:
            try:
                identity = j.core.identity.get(identity_name)
                if identity.tid < 0:
                    continue
                identity_dict = identity.to_dict()
                identity_dict["instance_name"] = identity.instance_name
                identity_dict.pop("__words")
            except Exception as e:
                j.logger.exception("error", exception=e)
                # TODO: include traceback
                j.tools.alerthandler.alert_raise(
                    app_name="admin",
                    category="internal_errors",
                    message=f"failed to get identity {identity_name} info due to error {str(e)}",
                    alert_type="exception",
                )
                continue
            identity_data[identity_name] = identity_dict
        return j.data.serializers.json.dumps({"data": identity_data})

    @actor_method
    def get_identity(self, identity_instance_name: str) -> str:
        identity_names = j.core.identity.list_all()
        if identity_instance_name in identity_names:
            identity = j.core.identity.get(identity_instance_name)

            return j.data.serializers.json.dumps(
                {
                    "data": {
                        "instance_name": identity.instance_name,
                        "name": identity.tname,
                        "email": identity.email,
                        "tid": identity.tid,
                        "words": identity.words,
                        "admins": identity.admins,
                    }
                }
            )
        else:
            return j.data.serializers.json.dumps({"data": f"{identity_instance_name} doesn't exist"})

    @actor_method
    def get_current_identity_name(self) -> str:
        return j.data.serializers.json.dumps({"data": j.core.identity.me.instance_name})

    @actor_method
    def set_identity(self, identity_instance_name: str) -> str:
        identity_names = j.core.identity.list_all()
        if identity_instance_name in identity_names:
            j.core.identity.set_default(identity_instance_name)

            return j.data.serializers.json.dumps({"data": {"instance_name": identity_instance_name}})
        else:
            return j.data.serializers.json.dumps({"data": f"{identity_instance_name} doesn't exist"})

    def generate_mnemonic(self) -> str:
        words = j.data.encryption.generate_mnemonic()
        return j.data.serializers.json.dumps({"data": words})

    @actor_method
    def check_tname_exists(self, tname) -> str:
        try:
            j.core.identity.get_user(tname)
            return j.data.serializers.json.dumps({"data": True})
        except j.exceptions.NotFound:
            return j.data.serializers.json.dumps({"data": False})

    @actor_method
    def check_identity_instance_name(self, name) -> str:
        if name in j.core.identity.list_all():
            return j.data.serializers.json.dumps({"data": True})
        return j.data.serializers.json.dumps({"data": False})

    @actor_method
    def add_identity(
        self,
        display_name: str,
        tname: str,
        email: str,
        words: str,
        admins: list,
    ) -> str:
        checked_admins = []
        for admin in admins:
            if type(admin) is str and not admin.endswith(".3bot"):
                admin = admin + ".3bot"
            checked_admins.append(admin)

        if not display_name.isidentifier() or not display_name.islower():
            raise j.exceptions.Value(
                "The display name must be a lowercase valid python identitifier (English letters, underscores, and numbers not starting with a number)."
            )
        identity_instance_name = display_name
        if identity_instance_name in j.core.identity.list_all():
            raise j.exceptions.Value("Identity with the same name already exists")

        try:
            new_identity = j.core.identity.new(
                name=identity_instance_name,
                tname=tname,
                email=email,
                words=words,
                admins=checked_admins,
            )
            new_identity.register()
            new_identity.save()
        except HTTPError as e:
            j.core.identity.delete(identity_instance_name)
            try:
                raise j.exceptions.Value(j.data.serializers.json.loads(e.response.content)["error"])
            except (KeyError, json.decoder.JSONDecodeError):
                # it failed for some reason other than Bad Request error
                raise j.exceptions.Value(str(e))
        except Exception as e:
            # register sometimes throws exceptions other than HTTP like Input
            j.core.identity.delete(identity_instance_name)
            raise j.exceptions.Value(str(e))
        return j.data.serializers.json.dumps({"data": "New identity successfully created and registered"})

    @actor_method
    def get_config(self) -> str:
        config_obj = j.core.config.get_config()
        return j.data.serializers.json.dumps({"data": config_obj})

    @actor_method
    def get_sdk_version(self) -> str:
        import importlib_metadata as metadata

        packages = ["js-ng", "js-sdk"]
        data = {}
        for package in packages:
            data[package] = metadata.version(package)
        return j.data.serializers.json.dumps({"data": data})

    @actor_method
    def delete_identity(self, identity_instance_name: str) -> str:
        identity_names = j.core.identity.list_all()
        if identity_instance_name in identity_names:
            identity = j.core.identity.get(identity_instance_name)
            if identity.instance_name == j.core.identity.me.instance_name:
                return j.data.serializers.json.dumps({"data": "Cannot delete current default identity"})

            j.core.identity.delete(identity_instance_name)
            return j.data.serializers.json.dumps({"data": f"{identity_instance_name} deleted successfully"})
        else:
            return j.data.serializers.json.dumps({"data": f"{identity_instance_name} doesn't exist"})

    @actor_method
    def list_sshkeys(self) -> str:
        sshkeys = j.core.config.get("SSH_KEYS", {})
        return j.data.serializers.json.dumps({"data": sshkeys})

    @actor_method
    def add_sshkey(self, key_id, sshkey) -> str:
        sshkeys = j.core.config.get("SSH_KEYS", {})
        if key_id not in sshkeys:
            sshkeys[key_id] = sshkey
            j.core.config.set("SSH_KEYS", sshkeys)
        return j.data.serializers.json.dumps({"data": sshkeys})

    @actor_method
    def delete_sshkey(self, key_id) -> str:
        sshkeys = j.core.config.get("SSH_KEYS", {})
        if key_id in sshkeys:
            sshkeys.pop(key_id)
            j.core.config.set("SSH_KEYS", sshkeys)
        return j.data.serializers.json.dumps({"data": sshkeys})

    @actor_method
    def get_developer_options(self) -> str:
        test_cert = j.core.config.set_default("TEST_CERT", False)
        over_provision = j.core.config.set_default("OVER_PROVISIONING", False)
        escalation_emails = j.core.config.set_default("ESCALATION_EMAILS_ENABLED", False)
        sort_nodes_by_sru = j.core.config.set_default("SORT_NODES_BY_SRU", False)
        return j.data.serializers.json.dumps(
            {
                "data": {
                    "test_cert": test_cert,
                    "over_provision": over_provision,
                    "escalation_emails": escalation_emails,
                    "sort_nodes_by_sru": sort_nodes_by_sru,
                }
            }
        )

    @actor_method
    def set_developer_options(
        self,
        test_cert: bool,
        over_provision: bool,
        sort_nodes_by_sru: bool,
        escalation_emails: bool,
    ) -> str:
        j.core.config.set("TEST_CERT", test_cert)
        j.core.config.set("OVER_PROVISIONING", over_provision)
        j.core.config.set("ESCALATION_EMAILS_ENABLED", escalation_emails)
        j.core.config.set("SORT_NODES_BY_SRU", sort_nodes_by_sru)
        return j.data.serializers.json.dumps(
            {
                "data": {
                    "test_cert": test_cert,
                    "over_provision": over_provision,
                    "escalation_emails": escalation_emails,
                    "sort_nodes_by_sru": sort_nodes_by_sru,
                }
            }
        )

    @actor_method
    def clear_blocked_nodes(self) -> str:
        j.sals.reservation_chatflow.reservation_chatflow.clear_blocked_nodes()
        return j.data.serializers.json.dumps({"data": "blocked nodes got cleared successfully."})

    @actor_method
    def clear_blocked_managed_domains(self) -> str:
        j.sals.marketplace.deployer.clear_blocked_managed_domains()
        return j.data.serializers.json.dumps({"data": "blocked managed domains got cleared successfully."})

    @actor_method
    def get_email_server_config(self) -> str:
        email_server_config = j.core.config.get("EMAIL_SERVER_CONFIG", {})
        email_server_config.setdefault("host", "")
        email_server_config.setdefault("port", "")
        email_server_config.setdefault("username", "")
        email_server_config.setdefault("password", "")
        return j.data.serializers.json.dumps({"data": email_server_config})

    @actor_method
    def set_email_server_config(self, host="", port="", username="", password="") -> str:
        email_server_config = j.core.config.get("EMAIL_SERVER_CONFIG", {})
        email_server_config = {"host": host, "port": port, "username": username, "password": password}
        j.core.config.set("EMAIL_SERVER_CONFIG", email_server_config)
        return j.data.serializers.json.dumps({"data": email_server_config})

    @actor_method
    def list_escalation_emails(self) -> str:
        escalation_emails = j.core.config.get("ESCALATION_EMAILS", [])
        return j.data.serializers.json.dumps({"data": escalation_emails})

    @actor_method
    def add_escalation_email(self, email) -> str:
        escalation_emails = j.core.config.get("ESCALATION_EMAILS", [])
        if email not in escalation_emails:
            escalation_emails.append(email)
            j.core.config.set("ESCALATION_EMAILS", escalation_emails)
        return j.data.serializers.json.dumps({"data": escalation_emails})

    @actor_method
    def delete_escalation_email(self, email) -> str:
        escalation_emails = j.core.config.get("ESCALATION_EMAILS", [])
        if email in escalation_emails:
            escalation_emails.remove(email)
            j.core.config.set("ESCALATION_EMAILS", escalation_emails)
        return j.data.serializers.json.dumps({"data": escalation_emails})

    def get_notifications(self) -> str:
        notifications = []
        if j.tools.notificationsqueue.count() >= 10:
            notifications = j.tools.notificationsqueue.fetch()
        else:
            notifications = j.tools.notificationsqueue.fetch(10)
        ret = [notification.json for notification in notifications]
        return j.data.serializers.json.dumps({"data": ret})

    @actor_method
    def get_notifications_count(self) -> str:
        return j.data.serializers.json.dumps({"data": j.tools.notificationsqueue.count()})


Actor = Admin

Classes

class Admin
Expand source code
class Admin(BaseActor):
    @actor_method
    def list_admins(self) -> str:
        admins = list(set(j.core.identity.me.admins))
        return j.data.serializers.json.dumps({"data": admins})

    @actor_method
    def add_admin(self, name: str):
        if name in j.core.identity.me.admins:
            raise j.exceptions.Value(f"Admin {name} already exists")
        j.core.identity.me.admins.append(name)
        j.core.identity.me.save()

    @actor_method
    def delete_admin(self, name: str):
        if name not in j.core.identity.me.admins:
            raise j.exceptions.Value(f"Admin {name} does not exist")
        j.core.identity.me.admins.remove(name)
        j.core.identity.me.save()

    @actor_method
    def list_identities(self) -> str:
        identities = j.core.identity.list_all()
        identity_data = {}
        for identity_name in identities:
            try:
                identity = j.core.identity.get(identity_name)
                if identity.tid < 0:
                    continue
                identity_dict = identity.to_dict()
                identity_dict["instance_name"] = identity.instance_name
                identity_dict.pop("__words")
            except Exception as e:
                j.logger.exception("error", exception=e)
                # TODO: include traceback
                j.tools.alerthandler.alert_raise(
                    app_name="admin",
                    category="internal_errors",
                    message=f"failed to get identity {identity_name} info due to error {str(e)}",
                    alert_type="exception",
                )
                continue
            identity_data[identity_name] = identity_dict
        return j.data.serializers.json.dumps({"data": identity_data})

    @actor_method
    def get_identity(self, identity_instance_name: str) -> str:
        identity_names = j.core.identity.list_all()
        if identity_instance_name in identity_names:
            identity = j.core.identity.get(identity_instance_name)

            return j.data.serializers.json.dumps(
                {
                    "data": {
                        "instance_name": identity.instance_name,
                        "name": identity.tname,
                        "email": identity.email,
                        "tid": identity.tid,
                        "words": identity.words,
                        "admins": identity.admins,
                    }
                }
            )
        else:
            return j.data.serializers.json.dumps({"data": f"{identity_instance_name} doesn't exist"})

    @actor_method
    def get_current_identity_name(self) -> str:
        return j.data.serializers.json.dumps({"data": j.core.identity.me.instance_name})

    @actor_method
    def set_identity(self, identity_instance_name: str) -> str:
        identity_names = j.core.identity.list_all()
        if identity_instance_name in identity_names:
            j.core.identity.set_default(identity_instance_name)

            return j.data.serializers.json.dumps({"data": {"instance_name": identity_instance_name}})
        else:
            return j.data.serializers.json.dumps({"data": f"{identity_instance_name} doesn't exist"})

    def generate_mnemonic(self) -> str:
        words = j.data.encryption.generate_mnemonic()
        return j.data.serializers.json.dumps({"data": words})

    @actor_method
    def check_tname_exists(self, tname) -> str:
        try:
            j.core.identity.get_user(tname)
            return j.data.serializers.json.dumps({"data": True})
        except j.exceptions.NotFound:
            return j.data.serializers.json.dumps({"data": False})

    @actor_method
    def check_identity_instance_name(self, name) -> str:
        if name in j.core.identity.list_all():
            return j.data.serializers.json.dumps({"data": True})
        return j.data.serializers.json.dumps({"data": False})

    @actor_method
    def add_identity(
        self,
        display_name: str,
        tname: str,
        email: str,
        words: str,
        admins: list,
    ) -> str:
        checked_admins = []
        for admin in admins:
            if type(admin) is str and not admin.endswith(".3bot"):
                admin = admin + ".3bot"
            checked_admins.append(admin)

        if not display_name.isidentifier() or not display_name.islower():
            raise j.exceptions.Value(
                "The display name must be a lowercase valid python identitifier (English letters, underscores, and numbers not starting with a number)."
            )
        identity_instance_name = display_name
        if identity_instance_name in j.core.identity.list_all():
            raise j.exceptions.Value("Identity with the same name already exists")

        try:
            new_identity = j.core.identity.new(
                name=identity_instance_name,
                tname=tname,
                email=email,
                words=words,
                admins=checked_admins,
            )
            new_identity.register()
            new_identity.save()
        except HTTPError as e:
            j.core.identity.delete(identity_instance_name)
            try:
                raise j.exceptions.Value(j.data.serializers.json.loads(e.response.content)["error"])
            except (KeyError, json.decoder.JSONDecodeError):
                # it failed for some reason other than Bad Request error
                raise j.exceptions.Value(str(e))
        except Exception as e:
            # register sometimes throws exceptions other than HTTP like Input
            j.core.identity.delete(identity_instance_name)
            raise j.exceptions.Value(str(e))
        return j.data.serializers.json.dumps({"data": "New identity successfully created and registered"})

    @actor_method
    def get_config(self) -> str:
        config_obj = j.core.config.get_config()
        return j.data.serializers.json.dumps({"data": config_obj})

    @actor_method
    def get_sdk_version(self) -> str:
        import importlib_metadata as metadata

        packages = ["js-ng", "js-sdk"]
        data = {}
        for package in packages:
            data[package] = metadata.version(package)
        return j.data.serializers.json.dumps({"data": data})

    @actor_method
    def delete_identity(self, identity_instance_name: str) -> str:
        identity_names = j.core.identity.list_all()
        if identity_instance_name in identity_names:
            identity = j.core.identity.get(identity_instance_name)
            if identity.instance_name == j.core.identity.me.instance_name:
                return j.data.serializers.json.dumps({"data": "Cannot delete current default identity"})

            j.core.identity.delete(identity_instance_name)
            return j.data.serializers.json.dumps({"data": f"{identity_instance_name} deleted successfully"})
        else:
            return j.data.serializers.json.dumps({"data": f"{identity_instance_name} doesn't exist"})

    @actor_method
    def list_sshkeys(self) -> str:
        sshkeys = j.core.config.get("SSH_KEYS", {})
        return j.data.serializers.json.dumps({"data": sshkeys})

    @actor_method
    def add_sshkey(self, key_id, sshkey) -> str:
        sshkeys = j.core.config.get("SSH_KEYS", {})
        if key_id not in sshkeys:
            sshkeys[key_id] = sshkey
            j.core.config.set("SSH_KEYS", sshkeys)
        return j.data.serializers.json.dumps({"data": sshkeys})

    @actor_method
    def delete_sshkey(self, key_id) -> str:
        sshkeys = j.core.config.get("SSH_KEYS", {})
        if key_id in sshkeys:
            sshkeys.pop(key_id)
            j.core.config.set("SSH_KEYS", sshkeys)
        return j.data.serializers.json.dumps({"data": sshkeys})

    @actor_method
    def get_developer_options(self) -> str:
        test_cert = j.core.config.set_default("TEST_CERT", False)
        over_provision = j.core.config.set_default("OVER_PROVISIONING", False)
        escalation_emails = j.core.config.set_default("ESCALATION_EMAILS_ENABLED", False)
        sort_nodes_by_sru = j.core.config.set_default("SORT_NODES_BY_SRU", False)
        return j.data.serializers.json.dumps(
            {
                "data": {
                    "test_cert": test_cert,
                    "over_provision": over_provision,
                    "escalation_emails": escalation_emails,
                    "sort_nodes_by_sru": sort_nodes_by_sru,
                }
            }
        )

    @actor_method
    def set_developer_options(
        self,
        test_cert: bool,
        over_provision: bool,
        sort_nodes_by_sru: bool,
        escalation_emails: bool,
    ) -> str:
        j.core.config.set("TEST_CERT", test_cert)
        j.core.config.set("OVER_PROVISIONING", over_provision)
        j.core.config.set("ESCALATION_EMAILS_ENABLED", escalation_emails)
        j.core.config.set("SORT_NODES_BY_SRU", sort_nodes_by_sru)
        return j.data.serializers.json.dumps(
            {
                "data": {
                    "test_cert": test_cert,
                    "over_provision": over_provision,
                    "escalation_emails": escalation_emails,
                    "sort_nodes_by_sru": sort_nodes_by_sru,
                }
            }
        )

    @actor_method
    def clear_blocked_nodes(self) -> str:
        j.sals.reservation_chatflow.reservation_chatflow.clear_blocked_nodes()
        return j.data.serializers.json.dumps({"data": "blocked nodes got cleared successfully."})

    @actor_method
    def clear_blocked_managed_domains(self) -> str:
        j.sals.marketplace.deployer.clear_blocked_managed_domains()
        return j.data.serializers.json.dumps({"data": "blocked managed domains got cleared successfully."})

    @actor_method
    def get_email_server_config(self) -> str:
        email_server_config = j.core.config.get("EMAIL_SERVER_CONFIG", {})
        email_server_config.setdefault("host", "")
        email_server_config.setdefault("port", "")
        email_server_config.setdefault("username", "")
        email_server_config.setdefault("password", "")
        return j.data.serializers.json.dumps({"data": email_server_config})

    @actor_method
    def set_email_server_config(self, host="", port="", username="", password="") -> str:
        email_server_config = j.core.config.get("EMAIL_SERVER_CONFIG", {})
        email_server_config = {"host": host, "port": port, "username": username, "password": password}
        j.core.config.set("EMAIL_SERVER_CONFIG", email_server_config)
        return j.data.serializers.json.dumps({"data": email_server_config})

    @actor_method
    def list_escalation_emails(self) -> str:
        escalation_emails = j.core.config.get("ESCALATION_EMAILS", [])
        return j.data.serializers.json.dumps({"data": escalation_emails})

    @actor_method
    def add_escalation_email(self, email) -> str:
        escalation_emails = j.core.config.get("ESCALATION_EMAILS", [])
        if email not in escalation_emails:
            escalation_emails.append(email)
            j.core.config.set("ESCALATION_EMAILS", escalation_emails)
        return j.data.serializers.json.dumps({"data": escalation_emails})

    @actor_method
    def delete_escalation_email(self, email) -> str:
        escalation_emails = j.core.config.get("ESCALATION_EMAILS", [])
        if email in escalation_emails:
            escalation_emails.remove(email)
            j.core.config.set("ESCALATION_EMAILS", escalation_emails)
        return j.data.serializers.json.dumps({"data": escalation_emails})

    def get_notifications(self) -> str:
        notifications = []
        if j.tools.notificationsqueue.count() >= 10:
            notifications = j.tools.notificationsqueue.fetch()
        else:
            notifications = j.tools.notificationsqueue.fetch(10)
        ret = [notification.json for notification in notifications]
        return j.data.serializers.json.dumps({"data": ret})

    @actor_method
    def get_notifications_count(self) -> str:
        return j.data.serializers.json.dumps({"data": j.tools.notificationsqueue.count()})

Ancestors

Methods

def add_admin(self, name: str)
Expand source code
@actor_method
def add_admin(self, name: str):
    if name in j.core.identity.me.admins:
        raise j.exceptions.Value(f"Admin {name} already exists")
    j.core.identity.me.admins.append(name)
    j.core.identity.me.save()
def add_escalation_email(self, email) ‑> str
Expand source code
@actor_method
def add_escalation_email(self, email) -> str:
    escalation_emails = j.core.config.get("ESCALATION_EMAILS", [])
    if email not in escalation_emails:
        escalation_emails.append(email)
        j.core.config.set("ESCALATION_EMAILS", escalation_emails)
    return j.data.serializers.json.dumps({"data": escalation_emails})
def add_identity(self, display_name: str, tname: str, email: str, words: str, admins: list) ‑> str
Expand source code
@actor_method
def add_identity(
    self,
    display_name: str,
    tname: str,
    email: str,
    words: str,
    admins: list,
) -> str:
    checked_admins = []
    for admin in admins:
        if type(admin) is str and not admin.endswith(".3bot"):
            admin = admin + ".3bot"
        checked_admins.append(admin)

    if not display_name.isidentifier() or not display_name.islower():
        raise j.exceptions.Value(
            "The display name must be a lowercase valid python identitifier (English letters, underscores, and numbers not starting with a number)."
        )
    identity_instance_name = display_name
    if identity_instance_name in j.core.identity.list_all():
        raise j.exceptions.Value("Identity with the same name already exists")

    try:
        new_identity = j.core.identity.new(
            name=identity_instance_name,
            tname=tname,
            email=email,
            words=words,
            admins=checked_admins,
        )
        new_identity.register()
        new_identity.save()
    except HTTPError as e:
        j.core.identity.delete(identity_instance_name)
        try:
            raise j.exceptions.Value(j.data.serializers.json.loads(e.response.content)["error"])
        except (KeyError, json.decoder.JSONDecodeError):
            # it failed for some reason other than Bad Request error
            raise j.exceptions.Value(str(e))
    except Exception as e:
        # register sometimes throws exceptions other than HTTP like Input
        j.core.identity.delete(identity_instance_name)
        raise j.exceptions.Value(str(e))
    return j.data.serializers.json.dumps({"data": "New identity successfully created and registered"})
def add_sshkey(self, key_id, sshkey) ‑> str
Expand source code
@actor_method
def add_sshkey(self, key_id, sshkey) -> str:
    sshkeys = j.core.config.get("SSH_KEYS", {})
    if key_id not in sshkeys:
        sshkeys[key_id] = sshkey
        j.core.config.set("SSH_KEYS", sshkeys)
    return j.data.serializers.json.dumps({"data": sshkeys})
def check_identity_instance_name(self, name) ‑> str
Expand source code
@actor_method
def check_identity_instance_name(self, name) -> str:
    if name in j.core.identity.list_all():
        return j.data.serializers.json.dumps({"data": True})
    return j.data.serializers.json.dumps({"data": False})
def check_tname_exists(self, tname) ‑> str
Expand source code
@actor_method
def check_tname_exists(self, tname) -> str:
    try:
        j.core.identity.get_user(tname)
        return j.data.serializers.json.dumps({"data": True})
    except j.exceptions.NotFound:
        return j.data.serializers.json.dumps({"data": False})
def clear_blocked_managed_domains(self) ‑> str
Expand source code
@actor_method
def clear_blocked_managed_domains(self) -> str:
    j.sals.marketplace.deployer.clear_blocked_managed_domains()
    return j.data.serializers.json.dumps({"data": "blocked managed domains got cleared successfully."})
def clear_blocked_nodes(self) ‑> str
Expand source code
@actor_method
def clear_blocked_nodes(self) -> str:
    j.sals.reservation_chatflow.reservation_chatflow.clear_blocked_nodes()
    return j.data.serializers.json.dumps({"data": "blocked nodes got cleared successfully."})
def delete_admin(self, name: str)
Expand source code
@actor_method
def delete_admin(self, name: str):
    if name not in j.core.identity.me.admins:
        raise j.exceptions.Value(f"Admin {name} does not exist")
    j.core.identity.me.admins.remove(name)
    j.core.identity.me.save()
def delete_escalation_email(self, email) ‑> str
Expand source code
@actor_method
def delete_escalation_email(self, email) -> str:
    escalation_emails = j.core.config.get("ESCALATION_EMAILS", [])
    if email in escalation_emails:
        escalation_emails.remove(email)
        j.core.config.set("ESCALATION_EMAILS", escalation_emails)
    return j.data.serializers.json.dumps({"data": escalation_emails})
def delete_identity(self, identity_instance_name: str) ‑> str
Expand source code
@actor_method
def delete_identity(self, identity_instance_name: str) -> str:
    identity_names = j.core.identity.list_all()
    if identity_instance_name in identity_names:
        identity = j.core.identity.get(identity_instance_name)
        if identity.instance_name == j.core.identity.me.instance_name:
            return j.data.serializers.json.dumps({"data": "Cannot delete current default identity"})

        j.core.identity.delete(identity_instance_name)
        return j.data.serializers.json.dumps({"data": f"{identity_instance_name} deleted successfully"})
    else:
        return j.data.serializers.json.dumps({"data": f"{identity_instance_name} doesn't exist"})
def delete_sshkey(self, key_id) ‑> str
Expand source code
@actor_method
def delete_sshkey(self, key_id) -> str:
    sshkeys = j.core.config.get("SSH_KEYS", {})
    if key_id in sshkeys:
        sshkeys.pop(key_id)
        j.core.config.set("SSH_KEYS", sshkeys)
    return j.data.serializers.json.dumps({"data": sshkeys})
def generate_mnemonic(self) ‑> str
Expand source code
def generate_mnemonic(self) -> str:
    words = j.data.encryption.generate_mnemonic()
    return j.data.serializers.json.dumps({"data": words})
def get_config(self) ‑> str
Expand source code
@actor_method
def get_config(self) -> str:
    config_obj = j.core.config.get_config()
    return j.data.serializers.json.dumps({"data": config_obj})
def get_current_identity_name(self) ‑> str
Expand source code
@actor_method
def get_current_identity_name(self) -> str:
    return j.data.serializers.json.dumps({"data": j.core.identity.me.instance_name})
def get_developer_options(self) ‑> str
Expand source code
@actor_method
def get_developer_options(self) -> str:
    test_cert = j.core.config.set_default("TEST_CERT", False)
    over_provision = j.core.config.set_default("OVER_PROVISIONING", False)
    escalation_emails = j.core.config.set_default("ESCALATION_EMAILS_ENABLED", False)
    sort_nodes_by_sru = j.core.config.set_default("SORT_NODES_BY_SRU", False)
    return j.data.serializers.json.dumps(
        {
            "data": {
                "test_cert": test_cert,
                "over_provision": over_provision,
                "escalation_emails": escalation_emails,
                "sort_nodes_by_sru": sort_nodes_by_sru,
            }
        }
    )
def get_email_server_config(self) ‑> str
Expand source code
@actor_method
def get_email_server_config(self) -> str:
    email_server_config = j.core.config.get("EMAIL_SERVER_CONFIG", {})
    email_server_config.setdefault("host", "")
    email_server_config.setdefault("port", "")
    email_server_config.setdefault("username", "")
    email_server_config.setdefault("password", "")
    return j.data.serializers.json.dumps({"data": email_server_config})
def get_identity(self, identity_instance_name: str) ‑> str
Expand source code
@actor_method
def get_identity(self, identity_instance_name: str) -> str:
    identity_names = j.core.identity.list_all()
    if identity_instance_name in identity_names:
        identity = j.core.identity.get(identity_instance_name)

        return j.data.serializers.json.dumps(
            {
                "data": {
                    "instance_name": identity.instance_name,
                    "name": identity.tname,
                    "email": identity.email,
                    "tid": identity.tid,
                    "words": identity.words,
                    "admins": identity.admins,
                }
            }
        )
    else:
        return j.data.serializers.json.dumps({"data": f"{identity_instance_name} doesn't exist"})
def get_notifications(self) ‑> str
Expand source code
def get_notifications(self) -> str:
    notifications = []
    if j.tools.notificationsqueue.count() >= 10:
        notifications = j.tools.notificationsqueue.fetch()
    else:
        notifications = j.tools.notificationsqueue.fetch(10)
    ret = [notification.json for notification in notifications]
    return j.data.serializers.json.dumps({"data": ret})
def get_notifications_count(self) ‑> str
Expand source code
@actor_method
def get_notifications_count(self) -> str:
    return j.data.serializers.json.dumps({"data": j.tools.notificationsqueue.count()})
def get_sdk_version(self) ‑> str
Expand source code
@actor_method
def get_sdk_version(self) -> str:
    import importlib_metadata as metadata

    packages = ["js-ng", "js-sdk"]
    data = {}
    for package in packages:
        data[package] = metadata.version(package)
    return j.data.serializers.json.dumps({"data": data})
def list_admins(self) ‑> str
Expand source code
@actor_method
def list_admins(self) -> str:
    admins = list(set(j.core.identity.me.admins))
    return j.data.serializers.json.dumps({"data": admins})
def list_escalation_emails(self) ‑> str
Expand source code
@actor_method
def list_escalation_emails(self) -> str:
    escalation_emails = j.core.config.get("ESCALATION_EMAILS", [])
    return j.data.serializers.json.dumps({"data": escalation_emails})
def list_identities(self) ‑> str
Expand source code
@actor_method
def list_identities(self) -> str:
    identities = j.core.identity.list_all()
    identity_data = {}
    for identity_name in identities:
        try:
            identity = j.core.identity.get(identity_name)
            if identity.tid < 0:
                continue
            identity_dict = identity.to_dict()
            identity_dict["instance_name"] = identity.instance_name
            identity_dict.pop("__words")
        except Exception as e:
            j.logger.exception("error", exception=e)
            # TODO: include traceback
            j.tools.alerthandler.alert_raise(
                app_name="admin",
                category="internal_errors",
                message=f"failed to get identity {identity_name} info due to error {str(e)}",
                alert_type="exception",
            )
            continue
        identity_data[identity_name] = identity_dict
    return j.data.serializers.json.dumps({"data": identity_data})
def list_sshkeys(self) ‑> str
Expand source code
@actor_method
def list_sshkeys(self) -> str:
    sshkeys = j.core.config.get("SSH_KEYS", {})
    return j.data.serializers.json.dumps({"data": sshkeys})
def set_developer_options(self, test_cert: bool, over_provision: bool, sort_nodes_by_sru: bool, escalation_emails: bool) ‑> str
Expand source code
@actor_method
def set_developer_options(
    self,
    test_cert: bool,
    over_provision: bool,
    sort_nodes_by_sru: bool,
    escalation_emails: bool,
) -> str:
    j.core.config.set("TEST_CERT", test_cert)
    j.core.config.set("OVER_PROVISIONING", over_provision)
    j.core.config.set("ESCALATION_EMAILS_ENABLED", escalation_emails)
    j.core.config.set("SORT_NODES_BY_SRU", sort_nodes_by_sru)
    return j.data.serializers.json.dumps(
        {
            "data": {
                "test_cert": test_cert,
                "over_provision": over_provision,
                "escalation_emails": escalation_emails,
                "sort_nodes_by_sru": sort_nodes_by_sru,
            }
        }
    )
def set_email_server_config(self, host='', port='', username='', password='') ‑> str
Expand source code
@actor_method
def set_email_server_config(self, host="", port="", username="", password="") -> str:
    email_server_config = j.core.config.get("EMAIL_SERVER_CONFIG", {})
    email_server_config = {"host": host, "port": port, "username": username, "password": password}
    j.core.config.set("EMAIL_SERVER_CONFIG", email_server_config)
    return j.data.serializers.json.dumps({"data": email_server_config})
def set_identity(self, identity_instance_name: str) ‑> str
Expand source code
@actor_method
def set_identity(self, identity_instance_name: str) -> str:
    identity_names = j.core.identity.list_all()
    if identity_instance_name in identity_names:
        j.core.identity.set_default(identity_instance_name)

        return j.data.serializers.json.dumps({"data": {"instance_name": identity_instance_name}})
    else:
        return j.data.serializers.json.dumps({"data": f"{identity_instance_name} doesn't exist"})
class Actor
Expand source code
class Admin(BaseActor):
    @actor_method
    def list_admins(self) -> str:
        admins = list(set(j.core.identity.me.admins))
        return j.data.serializers.json.dumps({"data": admins})

    @actor_method
    def add_admin(self, name: str):
        if name in j.core.identity.me.admins:
            raise j.exceptions.Value(f"Admin {name} already exists")
        j.core.identity.me.admins.append(name)
        j.core.identity.me.save()

    @actor_method
    def delete_admin(self, name: str):
        if name not in j.core.identity.me.admins:
            raise j.exceptions.Value(f"Admin {name} does not exist")
        j.core.identity.me.admins.remove(name)
        j.core.identity.me.save()

    @actor_method
    def list_identities(self) -> str:
        identities = j.core.identity.list_all()
        identity_data = {}
        for identity_name in identities:
            try:
                identity = j.core.identity.get(identity_name)
                if identity.tid < 0:
                    continue
                identity_dict = identity.to_dict()
                identity_dict["instance_name"] = identity.instance_name
                identity_dict.pop("__words")
            except Exception as e:
                j.logger.exception("error", exception=e)
                # TODO: include traceback
                j.tools.alerthandler.alert_raise(
                    app_name="admin",
                    category="internal_errors",
                    message=f"failed to get identity {identity_name} info due to error {str(e)}",
                    alert_type="exception",
                )
                continue
            identity_data[identity_name] = identity_dict
        return j.data.serializers.json.dumps({"data": identity_data})

    @actor_method
    def get_identity(self, identity_instance_name: str) -> str:
        identity_names = j.core.identity.list_all()
        if identity_instance_name in identity_names:
            identity = j.core.identity.get(identity_instance_name)

            return j.data.serializers.json.dumps(
                {
                    "data": {
                        "instance_name": identity.instance_name,
                        "name": identity.tname,
                        "email": identity.email,
                        "tid": identity.tid,
                        "words": identity.words,
                        "admins": identity.admins,
                    }
                }
            )
        else:
            return j.data.serializers.json.dumps({"data": f"{identity_instance_name} doesn't exist"})

    @actor_method
    def get_current_identity_name(self) -> str:
        return j.data.serializers.json.dumps({"data": j.core.identity.me.instance_name})

    @actor_method
    def set_identity(self, identity_instance_name: str) -> str:
        identity_names = j.core.identity.list_all()
        if identity_instance_name in identity_names:
            j.core.identity.set_default(identity_instance_name)

            return j.data.serializers.json.dumps({"data": {"instance_name": identity_instance_name}})
        else:
            return j.data.serializers.json.dumps({"data": f"{identity_instance_name} doesn't exist"})

    def generate_mnemonic(self) -> str:
        words = j.data.encryption.generate_mnemonic()
        return j.data.serializers.json.dumps({"data": words})

    @actor_method
    def check_tname_exists(self, tname) -> str:
        try:
            j.core.identity.get_user(tname)
            return j.data.serializers.json.dumps({"data": True})
        except j.exceptions.NotFound:
            return j.data.serializers.json.dumps({"data": False})

    @actor_method
    def check_identity_instance_name(self, name) -> str:
        if name in j.core.identity.list_all():
            return j.data.serializers.json.dumps({"data": True})
        return j.data.serializers.json.dumps({"data": False})

    @actor_method
    def add_identity(
        self,
        display_name: str,
        tname: str,
        email: str,
        words: str,
        admins: list,
    ) -> str:
        checked_admins = []
        for admin in admins:
            if type(admin) is str and not admin.endswith(".3bot"):
                admin = admin + ".3bot"
            checked_admins.append(admin)

        if not display_name.isidentifier() or not display_name.islower():
            raise j.exceptions.Value(
                "The display name must be a lowercase valid python identitifier (English letters, underscores, and numbers not starting with a number)."
            )
        identity_instance_name = display_name
        if identity_instance_name in j.core.identity.list_all():
            raise j.exceptions.Value("Identity with the same name already exists")

        try:
            new_identity = j.core.identity.new(
                name=identity_instance_name,
                tname=tname,
                email=email,
                words=words,
                admins=checked_admins,
            )
            new_identity.register()
            new_identity.save()
        except HTTPError as e:
            j.core.identity.delete(identity_instance_name)
            try:
                raise j.exceptions.Value(j.data.serializers.json.loads(e.response.content)["error"])
            except (KeyError, json.decoder.JSONDecodeError):
                # it failed for some reason other than Bad Request error
                raise j.exceptions.Value(str(e))
        except Exception as e:
            # register sometimes throws exceptions other than HTTP like Input
            j.core.identity.delete(identity_instance_name)
            raise j.exceptions.Value(str(e))
        return j.data.serializers.json.dumps({"data": "New identity successfully created and registered"})

    @actor_method
    def get_config(self) -> str:
        config_obj = j.core.config.get_config()
        return j.data.serializers.json.dumps({"data": config_obj})

    @actor_method
    def get_sdk_version(self) -> str:
        import importlib_metadata as metadata

        packages = ["js-ng", "js-sdk"]
        data = {}
        for package in packages:
            data[package] = metadata.version(package)
        return j.data.serializers.json.dumps({"data": data})

    @actor_method
    def delete_identity(self, identity_instance_name: str) -> str:
        identity_names = j.core.identity.list_all()
        if identity_instance_name in identity_names:
            identity = j.core.identity.get(identity_instance_name)
            if identity.instance_name == j.core.identity.me.instance_name:
                return j.data.serializers.json.dumps({"data": "Cannot delete current default identity"})

            j.core.identity.delete(identity_instance_name)
            return j.data.serializers.json.dumps({"data": f"{identity_instance_name} deleted successfully"})
        else:
            return j.data.serializers.json.dumps({"data": f"{identity_instance_name} doesn't exist"})

    @actor_method
    def list_sshkeys(self) -> str:
        sshkeys = j.core.config.get("SSH_KEYS", {})
        return j.data.serializers.json.dumps({"data": sshkeys})

    @actor_method
    def add_sshkey(self, key_id, sshkey) -> str:
        sshkeys = j.core.config.get("SSH_KEYS", {})
        if key_id not in sshkeys:
            sshkeys[key_id] = sshkey
            j.core.config.set("SSH_KEYS", sshkeys)
        return j.data.serializers.json.dumps({"data": sshkeys})

    @actor_method
    def delete_sshkey(self, key_id) -> str:
        sshkeys = j.core.config.get("SSH_KEYS", {})
        if key_id in sshkeys:
            sshkeys.pop(key_id)
            j.core.config.set("SSH_KEYS", sshkeys)
        return j.data.serializers.json.dumps({"data": sshkeys})

    @actor_method
    def get_developer_options(self) -> str:
        test_cert = j.core.config.set_default("TEST_CERT", False)
        over_provision = j.core.config.set_default("OVER_PROVISIONING", False)
        escalation_emails = j.core.config.set_default("ESCALATION_EMAILS_ENABLED", False)
        sort_nodes_by_sru = j.core.config.set_default("SORT_NODES_BY_SRU", False)
        return j.data.serializers.json.dumps(
            {
                "data": {
                    "test_cert": test_cert,
                    "over_provision": over_provision,
                    "escalation_emails": escalation_emails,
                    "sort_nodes_by_sru": sort_nodes_by_sru,
                }
            }
        )

    @actor_method
    def set_developer_options(
        self,
        test_cert: bool,
        over_provision: bool,
        sort_nodes_by_sru: bool,
        escalation_emails: bool,
    ) -> str:
        j.core.config.set("TEST_CERT", test_cert)
        j.core.config.set("OVER_PROVISIONING", over_provision)
        j.core.config.set("ESCALATION_EMAILS_ENABLED", escalation_emails)
        j.core.config.set("SORT_NODES_BY_SRU", sort_nodes_by_sru)
        return j.data.serializers.json.dumps(
            {
                "data": {
                    "test_cert": test_cert,
                    "over_provision": over_provision,
                    "escalation_emails": escalation_emails,
                    "sort_nodes_by_sru": sort_nodes_by_sru,
                }
            }
        )

    @actor_method
    def clear_blocked_nodes(self) -> str:
        j.sals.reservation_chatflow.reservation_chatflow.clear_blocked_nodes()
        return j.data.serializers.json.dumps({"data": "blocked nodes got cleared successfully."})

    @actor_method
    def clear_blocked_managed_domains(self) -> str:
        j.sals.marketplace.deployer.clear_blocked_managed_domains()
        return j.data.serializers.json.dumps({"data": "blocked managed domains got cleared successfully."})

    @actor_method
    def get_email_server_config(self) -> str:
        email_server_config = j.core.config.get("EMAIL_SERVER_CONFIG", {})
        email_server_config.setdefault("host", "")
        email_server_config.setdefault("port", "")
        email_server_config.setdefault("username", "")
        email_server_config.setdefault("password", "")
        return j.data.serializers.json.dumps({"data": email_server_config})

    @actor_method
    def set_email_server_config(self, host="", port="", username="", password="") -> str:
        email_server_config = j.core.config.get("EMAIL_SERVER_CONFIG", {})
        email_server_config = {"host": host, "port": port, "username": username, "password": password}
        j.core.config.set("EMAIL_SERVER_CONFIG", email_server_config)
        return j.data.serializers.json.dumps({"data": email_server_config})

    @actor_method
    def list_escalation_emails(self) -> str:
        escalation_emails = j.core.config.get("ESCALATION_EMAILS", [])
        return j.data.serializers.json.dumps({"data": escalation_emails})

    @actor_method
    def add_escalation_email(self, email) -> str:
        escalation_emails = j.core.config.get("ESCALATION_EMAILS", [])
        if email not in escalation_emails:
            escalation_emails.append(email)
            j.core.config.set("ESCALATION_EMAILS", escalation_emails)
        return j.data.serializers.json.dumps({"data": escalation_emails})

    @actor_method
    def delete_escalation_email(self, email) -> str:
        escalation_emails = j.core.config.get("ESCALATION_EMAILS", [])
        if email in escalation_emails:
            escalation_emails.remove(email)
            j.core.config.set("ESCALATION_EMAILS", escalation_emails)
        return j.data.serializers.json.dumps({"data": escalation_emails})

    def get_notifications(self) -> str:
        notifications = []
        if j.tools.notificationsqueue.count() >= 10:
            notifications = j.tools.notificationsqueue.fetch()
        else:
            notifications = j.tools.notificationsqueue.fetch(10)
        ret = [notification.json for notification in notifications]
        return j.data.serializers.json.dumps({"data": ret})

    @actor_method
    def get_notifications_count(self) -> str:
        return j.data.serializers.json.dumps({"data": j.tools.notificationsqueue.count()})

Ancestors

Methods

def add_admin(self, name: str)
Expand source code
@actor_method
def add_admin(self, name: str):
    if name in j.core.identity.me.admins:
        raise j.exceptions.Value(f"Admin {name} already exists")
    j.core.identity.me.admins.append(name)
    j.core.identity.me.save()
def add_escalation_email(self, email) ‑> str
Expand source code
@actor_method
def add_escalation_email(self, email) -> str:
    escalation_emails = j.core.config.get("ESCALATION_EMAILS", [])
    if email not in escalation_emails:
        escalation_emails.append(email)
        j.core.config.set("ESCALATION_EMAILS", escalation_emails)
    return j.data.serializers.json.dumps({"data": escalation_emails})
def add_identity(self, display_name: str, tname: str, email: str, words: str, admins: list) ‑> str
Expand source code
@actor_method
def add_identity(
    self,
    display_name: str,
    tname: str,
    email: str,
    words: str,
    admins: list,
) -> str:
    checked_admins = []
    for admin in admins:
        if type(admin) is str and not admin.endswith(".3bot"):
            admin = admin + ".3bot"
        checked_admins.append(admin)

    if not display_name.isidentifier() or not display_name.islower():
        raise j.exceptions.Value(
            "The display name must be a lowercase valid python identitifier (English letters, underscores, and numbers not starting with a number)."
        )
    identity_instance_name = display_name
    if identity_instance_name in j.core.identity.list_all():
        raise j.exceptions.Value("Identity with the same name already exists")

    try:
        new_identity = j.core.identity.new(
            name=identity_instance_name,
            tname=tname,
            email=email,
            words=words,
            admins=checked_admins,
        )
        new_identity.register()
        new_identity.save()
    except HTTPError as e:
        j.core.identity.delete(identity_instance_name)
        try:
            raise j.exceptions.Value(j.data.serializers.json.loads(e.response.content)["error"])
        except (KeyError, json.decoder.JSONDecodeError):
            # it failed for some reason other than Bad Request error
            raise j.exceptions.Value(str(e))
    except Exception as e:
        # register sometimes throws exceptions other than HTTP like Input
        j.core.identity.delete(identity_instance_name)
        raise j.exceptions.Value(str(e))
    return j.data.serializers.json.dumps({"data": "New identity successfully created and registered"})
def add_sshkey(self, key_id, sshkey) ‑> str
Expand source code
@actor_method
def add_sshkey(self, key_id, sshkey) -> str:
    sshkeys = j.core.config.get("SSH_KEYS", {})
    if key_id not in sshkeys:
        sshkeys[key_id] = sshkey
        j.core.config.set("SSH_KEYS", sshkeys)
    return j.data.serializers.json.dumps({"data": sshkeys})
def check_identity_instance_name(self, name) ‑> str
Expand source code
@actor_method
def check_identity_instance_name(self, name) -> str:
    if name in j.core.identity.list_all():
        return j.data.serializers.json.dumps({"data": True})
    return j.data.serializers.json.dumps({"data": False})
def check_tname_exists(self, tname) ‑> str
Expand source code
@actor_method
def check_tname_exists(self, tname) -> str:
    try:
        j.core.identity.get_user(tname)
        return j.data.serializers.json.dumps({"data": True})
    except j.exceptions.NotFound:
        return j.data.serializers.json.dumps({"data": False})
def clear_blocked_managed_domains(self) ‑> str
Expand source code
@actor_method
def clear_blocked_managed_domains(self) -> str:
    j.sals.marketplace.deployer.clear_blocked_managed_domains()
    return j.data.serializers.json.dumps({"data": "blocked managed domains got cleared successfully."})
def clear_blocked_nodes(self) ‑> str
Expand source code
@actor_method
def clear_blocked_nodes(self) -> str:
    j.sals.reservation_chatflow.reservation_chatflow.clear_blocked_nodes()
    return j.data.serializers.json.dumps({"data": "blocked nodes got cleared successfully."})
def delete_admin(self, name: str)
Expand source code
@actor_method
def delete_admin(self, name: str):
    if name not in j.core.identity.me.admins:
        raise j.exceptions.Value(f"Admin {name} does not exist")
    j.core.identity.me.admins.remove(name)
    j.core.identity.me.save()
def delete_escalation_email(self, email) ‑> str
Expand source code
@actor_method
def delete_escalation_email(self, email) -> str:
    escalation_emails = j.core.config.get("ESCALATION_EMAILS", [])
    if email in escalation_emails:
        escalation_emails.remove(email)
        j.core.config.set("ESCALATION_EMAILS", escalation_emails)
    return j.data.serializers.json.dumps({"data": escalation_emails})
def delete_identity(self, identity_instance_name: str) ‑> str
Expand source code
@actor_method
def delete_identity(self, identity_instance_name: str) -> str:
    identity_names = j.core.identity.list_all()
    if identity_instance_name in identity_names:
        identity = j.core.identity.get(identity_instance_name)
        if identity.instance_name == j.core.identity.me.instance_name:
            return j.data.serializers.json.dumps({"data": "Cannot delete current default identity"})

        j.core.identity.delete(identity_instance_name)
        return j.data.serializers.json.dumps({"data": f"{identity_instance_name} deleted successfully"})
    else:
        return j.data.serializers.json.dumps({"data": f"{identity_instance_name} doesn't exist"})
def delete_sshkey(self, key_id) ‑> str
Expand source code
@actor_method
def delete_sshkey(self, key_id) -> str:
    sshkeys = j.core.config.get("SSH_KEYS", {})
    if key_id in sshkeys:
        sshkeys.pop(key_id)
        j.core.config.set("SSH_KEYS", sshkeys)
    return j.data.serializers.json.dumps({"data": sshkeys})
def generate_mnemonic(self) ‑> str
Expand source code
def generate_mnemonic(self) -> str:
    words = j.data.encryption.generate_mnemonic()
    return j.data.serializers.json.dumps({"data": words})
def get_config(self) ‑> str
Expand source code
@actor_method
def get_config(self) -> str:
    config_obj = j.core.config.get_config()
    return j.data.serializers.json.dumps({"data": config_obj})
def get_current_identity_name(self) ‑> str
Expand source code
@actor_method
def get_current_identity_name(self) -> str:
    return j.data.serializers.json.dumps({"data": j.core.identity.me.instance_name})
def get_developer_options(self) ‑> str
Expand source code
@actor_method
def get_developer_options(self) -> str:
    test_cert = j.core.config.set_default("TEST_CERT", False)
    over_provision = j.core.config.set_default("OVER_PROVISIONING", False)
    escalation_emails = j.core.config.set_default("ESCALATION_EMAILS_ENABLED", False)
    sort_nodes_by_sru = j.core.config.set_default("SORT_NODES_BY_SRU", False)
    return j.data.serializers.json.dumps(
        {
            "data": {
                "test_cert": test_cert,
                "over_provision": over_provision,
                "escalation_emails": escalation_emails,
                "sort_nodes_by_sru": sort_nodes_by_sru,
            }
        }
    )
def get_email_server_config(self) ‑> str
Expand source code
@actor_method
def get_email_server_config(self) -> str:
    email_server_config = j.core.config.get("EMAIL_SERVER_CONFIG", {})
    email_server_config.setdefault("host", "")
    email_server_config.setdefault("port", "")
    email_server_config.setdefault("username", "")
    email_server_config.setdefault("password", "")
    return j.data.serializers.json.dumps({"data": email_server_config})
def get_identity(self, identity_instance_name: str) ‑> str
Expand source code
@actor_method
def get_identity(self, identity_instance_name: str) -> str:
    identity_names = j.core.identity.list_all()
    if identity_instance_name in identity_names:
        identity = j.core.identity.get(identity_instance_name)

        return j.data.serializers.json.dumps(
            {
                "data": {
                    "instance_name": identity.instance_name,
                    "name": identity.tname,
                    "email": identity.email,
                    "tid": identity.tid,
                    "words": identity.words,
                    "admins": identity.admins,
                }
            }
        )
    else:
        return j.data.serializers.json.dumps({"data": f"{identity_instance_name} doesn't exist"})
def get_notifications(self) ‑> str
Expand source code
def get_notifications(self) -> str:
    notifications = []
    if j.tools.notificationsqueue.count() >= 10:
        notifications = j.tools.notificationsqueue.fetch()
    else:
        notifications = j.tools.notificationsqueue.fetch(10)
    ret = [notification.json for notification in notifications]
    return j.data.serializers.json.dumps({"data": ret})
def get_notifications_count(self) ‑> str
Expand source code
@actor_method
def get_notifications_count(self) -> str:
    return j.data.serializers.json.dumps({"data": j.tools.notificationsqueue.count()})
def get_sdk_version(self) ‑> str
Expand source code
@actor_method
def get_sdk_version(self) -> str:
    import importlib_metadata as metadata

    packages = ["js-ng", "js-sdk"]
    data = {}
    for package in packages:
        data[package] = metadata.version(package)
    return j.data.serializers.json.dumps({"data": data})
def list_admins(self) ‑> str
Expand source code
@actor_method
def list_admins(self) -> str:
    admins = list(set(j.core.identity.me.admins))
    return j.data.serializers.json.dumps({"data": admins})
def list_escalation_emails(self) ‑> str
Expand source code
@actor_method
def list_escalation_emails(self) -> str:
    escalation_emails = j.core.config.get("ESCALATION_EMAILS", [])
    return j.data.serializers.json.dumps({"data": escalation_emails})
def list_identities(self) ‑> str
Expand source code
@actor_method
def list_identities(self) -> str:
    identities = j.core.identity.list_all()
    identity_data = {}
    for identity_name in identities:
        try:
            identity = j.core.identity.get(identity_name)
            if identity.tid < 0:
                continue
            identity_dict = identity.to_dict()
            identity_dict["instance_name"] = identity.instance_name
            identity_dict.pop("__words")
        except Exception as e:
            j.logger.exception("error", exception=e)
            # TODO: include traceback
            j.tools.alerthandler.alert_raise(
                app_name="admin",
                category="internal_errors",
                message=f"failed to get identity {identity_name} info due to error {str(e)}",
                alert_type="exception",
            )
            continue
        identity_data[identity_name] = identity_dict
    return j.data.serializers.json.dumps({"data": identity_data})
def list_sshkeys(self) ‑> str
Expand source code
@actor_method
def list_sshkeys(self) -> str:
    sshkeys = j.core.config.get("SSH_KEYS", {})
    return j.data.serializers.json.dumps({"data": sshkeys})
def set_developer_options(self, test_cert: bool, over_provision: bool, sort_nodes_by_sru: bool, escalation_emails: bool) ‑> str
Expand source code
@actor_method
def set_developer_options(
    self,
    test_cert: bool,
    over_provision: bool,
    sort_nodes_by_sru: bool,
    escalation_emails: bool,
) -> str:
    j.core.config.set("TEST_CERT", test_cert)
    j.core.config.set("OVER_PROVISIONING", over_provision)
    j.core.config.set("ESCALATION_EMAILS_ENABLED", escalation_emails)
    j.core.config.set("SORT_NODES_BY_SRU", sort_nodes_by_sru)
    return j.data.serializers.json.dumps(
        {
            "data": {
                "test_cert": test_cert,
                "over_provision": over_provision,
                "escalation_emails": escalation_emails,
                "sort_nodes_by_sru": sort_nodes_by_sru,
            }
        }
    )
def set_email_server_config(self, host='', port='', username='', password='') ‑> str
Expand source code
@actor_method
def set_email_server_config(self, host="", port="", username="", password="") -> str:
    email_server_config = j.core.config.get("EMAIL_SERVER_CONFIG", {})
    email_server_config = {"host": host, "port": port, "username": username, "password": password}
    j.core.config.set("EMAIL_SERVER_CONFIG", email_server_config)
    return j.data.serializers.json.dumps({"data": email_server_config})
def set_identity(self, identity_instance_name: str) ‑> str
Expand source code
@actor_method
def set_identity(self, identity_instance_name: str) -> str:
    identity_names = j.core.identity.list_all()
    if identity_instance_name in identity_names:
        j.core.identity.set_default(identity_instance_name)

        return j.data.serializers.json.dumps({"data": {"instance_name": identity_instance_name}})
    else:
        return j.data.serializers.json.dumps({"data": f"{identity_instance_name} doesn't exist"})