Module jumpscale.tools.notificationsqueue.queue

Expand source code
from enum import Enum

from jumpscale.loader import j


class LEVEL(Enum):
    INFO = "info"
    WARNING = "warning"
    ERROR = "error"


class Notification:
    def __init__(self):
        self.id = None
        self.category = None
        self.level = 0
        self.message = None
        self.date = None

    @classmethod
    def loads(cls, value):
        json = j.data.serializers.json.loads(value)
        instance = cls()
        instance.__dict__ = json
        return instance

    @property
    def json(self):
        return self.__dict__

    def dumps(self):
        return j.data.serializers.json.dumps(self.__dict__)


class NotificationsQueue:
    def __init__(self, *args, **kwargs):
        self._rkey = "queue:notifications"
        self._rkey_incr = "queue:notifications:id:incr"
        self._rkey_seen = "list:notifications:seen"
        self._seen_list_max_size = 10
        self._db = None

    @property
    def db(self):
        if self._db is None:
            self._db = j.core.db
        return self._db

    def push(self, message: str, category: str = "default", level: LEVEL = LEVEL.INFO):
        """Push a new notification

        Arguments:
            message {str}: notification message

        Keyword Arguments:
            type {str}: notification type (default: {"default"})
            level {LEVEL}: notification level (default: {LEVEL.INFO})
        """
        if not self.db.is_running():
            return

        if not isinstance(level, LEVEL):
            raise j.exceptions.Value(f"{level} is not of type: LEVEL")

        notification = Notification()

        notification.message = message
        notification.level = level.value
        notification.category = category
        notification.date = j.data.time.utcnow().timestamp
        notification.id = self.db.incr(self._rkey_incr)

        self.db.lpush(self._rkey, notification.dumps())

    def fetch(self, count: int = -1) -> list:
        """Fetch notifications from queue

        Keyword Arguments:
            count {int}: number of new notifications to fetch (default: {-1} = new notifications only)
                           if new notifications < count => will return list of (new notifications + old notifications) with length = count

        Returns:
            list: Notification objects
        """
        get_all_new = count == -1
        ret_notifications_count = 0

        if get_all_new:
            ret_notifications_count = self.count()
            if ret_notifications_count == 0:
                return []
        else:
            ret_notifications_count = count

        # Transactional pipeline to fetch notifications from the queue and save them in the seen list
        p = self.db.pipeline()
        p.multi()

        for _ in range(ret_notifications_count):
            p.rpoplpush(self._rkey, self._rkey_seen)
        p.lrange(self._rkey_seen, 0, ret_notifications_count - 1)
        p.ltrim(self._rkey_seen, 0, self._seen_list_max_size - 1)

        notifications = p.execute()

        return [Notification.loads(notification) for notification in notifications[-2]]  # -2 = result of lrange command

    def count(self) -> int:
        """Get notifications count

        Returns:
            int: total number of notifications
        """
        return self.db.llen(self._rkey)

    def clear(self):
        """Delete all notifications
        """
        self.db.delete(self._rkey)
        self.db.delete(self._rkey_seen)

Classes

class LEVEL (value, names=None, *, module=None, qualname=None, type=None, start=1)

An enumeration.

Expand source code
class LEVEL(Enum):
    INFO = "info"
    WARNING = "warning"
    ERROR = "error"

Ancestors

  • enum.Enum

Class variables

var ERROR
var INFO
var WARNING
class Notification
Expand source code
class Notification:
    def __init__(self):
        self.id = None
        self.category = None
        self.level = 0
        self.message = None
        self.date = None

    @classmethod
    def loads(cls, value):
        json = j.data.serializers.json.loads(value)
        instance = cls()
        instance.__dict__ = json
        return instance

    @property
    def json(self):
        return self.__dict__

    def dumps(self):
        return j.data.serializers.json.dumps(self.__dict__)

Static methods

def loads(value)
Expand source code
@classmethod
def loads(cls, value):
    json = j.data.serializers.json.loads(value)
    instance = cls()
    instance.__dict__ = json
    return instance

Instance variables

var json
Expand source code
@property
def json(self):
    return self.__dict__

Methods

def dumps(self)
Expand source code
def dumps(self):
    return j.data.serializers.json.dumps(self.__dict__)
class NotificationsQueue (*args, **kwargs)
Expand source code
class NotificationsQueue:
    def __init__(self, *args, **kwargs):
        self._rkey = "queue:notifications"
        self._rkey_incr = "queue:notifications:id:incr"
        self._rkey_seen = "list:notifications:seen"
        self._seen_list_max_size = 10
        self._db = None

    @property
    def db(self):
        if self._db is None:
            self._db = j.core.db
        return self._db

    def push(self, message: str, category: str = "default", level: LEVEL = LEVEL.INFO):
        """Push a new notification

        Arguments:
            message {str}: notification message

        Keyword Arguments:
            type {str}: notification type (default: {"default"})
            level {LEVEL}: notification level (default: {LEVEL.INFO})
        """
        if not self.db.is_running():
            return

        if not isinstance(level, LEVEL):
            raise j.exceptions.Value(f"{level} is not of type: LEVEL")

        notification = Notification()

        notification.message = message
        notification.level = level.value
        notification.category = category
        notification.date = j.data.time.utcnow().timestamp
        notification.id = self.db.incr(self._rkey_incr)

        self.db.lpush(self._rkey, notification.dumps())

    def fetch(self, count: int = -1) -> list:
        """Fetch notifications from queue

        Keyword Arguments:
            count {int}: number of new notifications to fetch (default: {-1} = new notifications only)
                           if new notifications < count => will return list of (new notifications + old notifications) with length = count

        Returns:
            list: Notification objects
        """
        get_all_new = count == -1
        ret_notifications_count = 0

        if get_all_new:
            ret_notifications_count = self.count()
            if ret_notifications_count == 0:
                return []
        else:
            ret_notifications_count = count

        # Transactional pipeline to fetch notifications from the queue and save them in the seen list
        p = self.db.pipeline()
        p.multi()

        for _ in range(ret_notifications_count):
            p.rpoplpush(self._rkey, self._rkey_seen)
        p.lrange(self._rkey_seen, 0, ret_notifications_count - 1)
        p.ltrim(self._rkey_seen, 0, self._seen_list_max_size - 1)

        notifications = p.execute()

        return [Notification.loads(notification) for notification in notifications[-2]]  # -2 = result of lrange command

    def count(self) -> int:
        """Get notifications count

        Returns:
            int: total number of notifications
        """
        return self.db.llen(self._rkey)

    def clear(self):
        """Delete all notifications
        """
        self.db.delete(self._rkey)
        self.db.delete(self._rkey_seen)

Instance variables

var db
Expand source code
@property
def db(self):
    if self._db is None:
        self._db = j.core.db
    return self._db

Methods

def clear(self)

Delete all notifications

Expand source code
def clear(self):
    """Delete all notifications
    """
    self.db.delete(self._rkey)
    self.db.delete(self._rkey_seen)
def count(self) ‑> int

Get notifications count

Returns

int
total number of notifications
Expand source code
def count(self) -> int:
    """Get notifications count

    Returns:
        int: total number of notifications
    """
    return self.db.llen(self._rkey)
def fetch(self, count: int = -1) ‑> list

Fetch notifications from queue

Keyword Arguments: count {int}: number of new notifications to fetch (default: {-1} = new notifications only) if new notifications < count => will return list of (new notifications + old notifications) with length = count

Returns

list
Notification objects
Expand source code
def fetch(self, count: int = -1) -> list:
    """Fetch notifications from queue

    Keyword Arguments:
        count {int}: number of new notifications to fetch (default: {-1} = new notifications only)
                       if new notifications < count => will return list of (new notifications + old notifications) with length = count

    Returns:
        list: Notification objects
    """
    get_all_new = count == -1
    ret_notifications_count = 0

    if get_all_new:
        ret_notifications_count = self.count()
        if ret_notifications_count == 0:
            return []
    else:
        ret_notifications_count = count

    # Transactional pipeline to fetch notifications from the queue and save them in the seen list
    p = self.db.pipeline()
    p.multi()

    for _ in range(ret_notifications_count):
        p.rpoplpush(self._rkey, self._rkey_seen)
    p.lrange(self._rkey_seen, 0, ret_notifications_count - 1)
    p.ltrim(self._rkey_seen, 0, self._seen_list_max_size - 1)

    notifications = p.execute()

    return [Notification.loads(notification) for notification in notifications[-2]]  # -2 = result of lrange command
def push(self, message: str, category: str = 'default', level: LEVEL = LEVEL.INFO)

Push a new notification

Arguments

message {str}: notification message

Keyword Arguments: type {str}: notification type (default: {"default"}) level {LEVEL}: notification level (default: {LEVEL.INFO})

Expand source code
def push(self, message: str, category: str = "default", level: LEVEL = LEVEL.INFO):
    """Push a new notification

    Arguments:
        message {str}: notification message

    Keyword Arguments:
        type {str}: notification type (default: {"default"})
        level {LEVEL}: notification level (default: {LEVEL.INFO})
    """
    if not self.db.is_running():
        return

    if not isinstance(level, LEVEL):
        raise j.exceptions.Value(f"{level} is not of type: LEVEL")

    notification = Notification()

    notification.message = message
    notification.level = level.value
    notification.category = category
    notification.date = j.data.time.utcnow().timestamp
    notification.id = self.db.incr(self._rkey_incr)

    self.db.lpush(self._rkey, notification.dumps())