Module jumpscale.packages.admin.services.heartbeat

Expand source code
from jumpscale.loader import j
from jumpscale.tools.servicemanager.servicemanager import BackgroundService
import requests
import os
import binascii
from io import StringIO
import urllib.parse

VDC_NAME = os.environ.get("VDC_NAME", "")
MONITORING_SERVER_URL = os.environ.get("MONITORING_SERVER_URL")


class HeartBeatService(BackgroundService):
    # def __init__(self, interval=60 * 10, *args, **kwargs):
    def __init__(self, interval=10, *args, **kwargs):
        """
        Check disk space every 10 seconds
        """
        super().__init__(interval, *args, **kwargs)

    def _encode_data(self, data):
        keys = [
            "tid",
            "tname",
            "vdc_name",
        ]
        b = StringIO()
        for key in keys:
            if key in data:
                b.write(key)
                b.write(str(data.get(key)))
        return b.getvalue().encode()

    def job(self):
        if MONITORING_SERVER_URL:
            identity_name = j.core.identity.me.instance_name
            tid = j.core.identity.get(identity_name).tid
            data = {
                "tid": tid,
                "vdc_name": VDC_NAME,
                "tname": os.environ.get("VDC_OWNER_TNAME"),
            }
            encoded_data = self._encode_data(data)
            signature = j.core.identity.me.nacl.signing_key.sign(encoded_data).signature

            data["signature"] = binascii.hexlify(signature).decode()
            url = urllib.parse.urljoin(MONITORING_SERVER_URL, "heartbeat")
            try:
                requests.post(url, json=data, headers={"Content-type": "application/json"})
            except Exception as e:
                j.logger.error(f"Failed to send heartbeat, URL:{url}, exception: {str(e)}")


service = HeartBeatService()

Classes

class HeartBeatService (interval=10, *args, **kwargs)

Helper class that provides a standard way to create an ABC using inheritance.

Check disk space every 10 seconds

Expand source code
class HeartBeatService(BackgroundService):
    # def __init__(self, interval=60 * 10, *args, **kwargs):
    def __init__(self, interval=10, *args, **kwargs):
        """
        Check disk space every 10 seconds
        """
        super().__init__(interval, *args, **kwargs)

    def _encode_data(self, data):
        keys = [
            "tid",
            "tname",
            "vdc_name",
        ]
        b = StringIO()
        for key in keys:
            if key in data:
                b.write(key)
                b.write(str(data.get(key)))
        return b.getvalue().encode()

    def job(self):
        if MONITORING_SERVER_URL:
            identity_name = j.core.identity.me.instance_name
            tid = j.core.identity.get(identity_name).tid
            data = {
                "tid": tid,
                "vdc_name": VDC_NAME,
                "tname": os.environ.get("VDC_OWNER_TNAME"),
            }
            encoded_data = self._encode_data(data)
            signature = j.core.identity.me.nacl.signing_key.sign(encoded_data).signature

            data["signature"] = binascii.hexlify(signature).decode()
            url = urllib.parse.urljoin(MONITORING_SERVER_URL, "heartbeat")
            try:
                requests.post(url, json=data, headers={"Content-type": "application/json"})
            except Exception as e:
                j.logger.error(f"Failed to send heartbeat, URL:{url}, exception: {str(e)}")

Ancestors

Inherited members