Module jumpscale.tools.servicemanager.servicemanager
Description
Service manager is the service that monitors and manages background services through gevent greenlets.
Each service defines an interval to define the period of the service and defines also a job method that is run each period.
Any service should:
- Inherit from BackgroundService
class defined here: from jumpscale.tools.servicemanager.servicemanager import BackgroundService
- Define an interval in the constructor
- Implement the abstract job
method of the BackgroundService
base class.
How it schedules services
The service manager uses gevent greenlets to run jobs. It spawns the job in a greenlet after its interval period. Rescheduling the service job is done in by linking a callback to the greenlet which is run after the greenlet finishes. After the greenlet finishes execution the callback is fired which schedules the job to be run again after another interval.
To add a service to the service manager you should call the add_service
method which takes the package name and package path as parameters.
It loads the file in this path as a module and gets the service object defined in the service.py file.
Immediately schedule service
The services first start will be after the requested interval. If you need to make the service first interval immediately on server start, It can be added in your service by adding
self.schedule_on_start = True
in your service init after calling super() init.
Example service
from jumpscale.tools.servicemanager.servicemanager import BackgroundService
class TestService(BackgroundService):
def __init__(self, interval="* * * * *", *args, **kwargs):
'''
Test service that runs every 1 minute
'''
super().__init__(interval, *args, **kwargs)
self.schedule_on_start = True # immediately schedule the service (optional step and the line can be removed, default=False)
def job(self):
print("[Test Service] Done")
service = TestService()
Expand source code
"""
### Description
Service manager is the service that monitors and manages background services through gevent greenlets.
Each service defines an interval to define the period of the service and defines also a job method that is run each period.
Any service should:
- Inherit from `BackgroundService` class defined here: `from jumpscale.tools.servicemanager.servicemanager import BackgroundService`
- Define an interval in the constructor
- Implement the abstract `job` method of the `BackgroundService` base class.
### How it schedules services
The service manager uses gevent greenlets to run jobs. It spawns the job in a greenlet after its interval period.
Rescheduling the service job is done in by linking a callback to the greenlet which is run after the greenlet finishes.
After the greenlet finishes execution the callback is fired which schedules the job to be run again after another interval.
To add a service to the service manager you should call the `add_service` method which takes the package name and package path as parameters.
It loads the file in this path as a module and gets the service object defined in the service.py file.
### Immediately schedule service
The services first start will be after the requested interval.
If you need to make the service first interval immediately on server start, It can be added in your service by adding
```python3
self.schedule_on_start = True
```
in your service init after calling super() init.
### Example service
```python3
from jumpscale.tools.servicemanager.servicemanager import BackgroundService
class TestService(BackgroundService):
def __init__(self, interval="* * * * *", *args, **kwargs):
'''
Test service that runs every 1 minute
'''
super().__init__(interval, *args, **kwargs)
self.schedule_on_start = True # immediately schedule the service (optional step and the line can be removed, default=False)
def job(self):
print("[Test Service] Done")
service = TestService()
```
"""
from numbers import Real
from math import ceil
from abc import ABC, abstractmethod
from signal import SIGTERM, SIGKILL
from crontab import CronTab
import gevent
from gevent.pool import Pool
from jumpscale.loader import j
from jumpscale.core.base import Base
from multiprocessing import cpu_count
class BackgroundService(ABC):
def __init__(self, interval=60, *args, **kwargs):
"""Abstract base class for background services managed by the service manager
Arguments:
interval (int | CronTab object | str): scheduled job is executed every interval in seconds / CronTab object / CronTab-formatted string
"""
self.interval = interval
self.schedule_on_start = False
@abstractmethod
def job(self):
"""
Background job to be scheduled.
Should be implemented by any service that inherits from this class
"""
pass
# TODO: add support for non-periodic tasks
# TODO: configurable services
# TODO: add support for services not in packages
class ServiceManager(Base):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.services = {} # service objects
self._scheduled = {} # greenlets of scheduled services
self._running = {} # greenlets of currently running services
self._pool = Pool(cpu_count())
@staticmethod
def seconds_to_next_interval(interval):
"""Helper method to get seconds remaining to next_interval
Arguments:
interval (Any): next interval to which seconds remaining is calculated
Returns:
Real: number of seconds remaining until next interval
Raises:
ValueError: when the type of interval is not Real / CronTab object / CronTab string format
"""
if isinstance(interval, Real):
return interval
elif isinstance(interval, str):
try:
return CronTab(interval).next(default_utc=True)
except Exception as e:
raise j.exceptions.Value(str(e))
elif isinstance(interval, CronTab):
return interval.next(default_utc=True)
else:
raise j.exceptions.Runtime(f"Unsupported interval type: {type(interval)}")
@staticmethod
def _load_service(path):
"""Load the module in the service file path and get the service object
Arguments:
path (str): path of the service file
Returns:
service: service object defined in the service file
"""
module = j.tools.codeloader.load_python_module(path, force_reload=True)
return module.service
@staticmethod
def __on_exception(greenlet):
"""Callback to handle exception raised by service greenlet
Arguments:
greenlet (Greenlet): greenlet object
"""
message = f"Service {greenlet.service.name} raised an exception: {greenlet.exception}"
j.tools.alerthandler.alert_raise(app_name="servicemanager", message=message, alert_type="exception")
j.logger.exception(f"Service {greenlet.service.name} raised an exception", exception=greenlet.exception)
def __callback(self, greenlet):
"""Callback runs after greenlet finishes execution
Arguments:
greenlet (Greenlet): greenlet object
"""
greenlet.unlink(self.__callback)
if greenlet.service.name in self._running:
self._running.pop(greenlet.service.name)
def _schedule_service(self, service):
"""Runs a service job and schedules it to run again every period (interval) specified by the service
Arguments:
service (BackgroundService): background service object
"""
if service.name not in self._running:
greenlet = self._pool.apply_async(service.job)
greenlet.link(self.__callback)
greenlet.link_exception(self.__on_exception)
greenlet.start()
self._running[service.name] = greenlet
self._running[service.name].service = service
next_start = ceil(self.seconds_to_next_interval(service.interval))
self._scheduled[service.name] = gevent.spawn_later(next_start, self._schedule_service, service=service)
def start(self):
"""Start the service manager and schedule default services
"""
# schedule default services
for service in self.services.values():
next_start = ceil(self.seconds_to_next_interval(service.interval))
self._scheduled[service.name] = gevent.spawn_later(next_start, self._schedule_service, service=service)
def stop(self):
"""Stop all background services
"""
j.logger.info("Stopping background services")
for service in list(self.services.keys()):
self.stop_service(service)
j.logger.info("Done stopping the background services")
def add_service(self, service_name, service_path):
"""Add a new background service to be managed and scheduled by the service manager
Arguments:
service_path (str): absolute path of the service file
"""
service = self._load_service(service_path)
service.name = service_name
if service in self.services.values():
j.logger.debug(f"Service {service.name} is already running. Reloading...")
self.stop_service(service.name)
next_start = 0 if service.schedule_on_start else ceil(self.seconds_to_next_interval(service.interval))
self._scheduled[service.name] = gevent.spawn_later(next_start, self._schedule_service, service=service)
self.services[service.name] = service
j.logger.debug(f"Service {service.name} is added.")
def stop_service(self, service_name, block=True):
"""Stop a running background service and unschedule it if it's scheduled to run again
Arguments:
service_name (str): name of the service to be stopped
block (bool): wait for service job to finish. if False, service job will be killed without waiting
"""
if service_name not in self.services:
raise j.exceptions.Value(f"Service {service_name} is not running")
# unschedule service if it's scheduled to run again
if service_name in self._scheduled:
greenlet = self._scheduled[service_name]
greenlet.unlink(self.__callback)
greenlet.kill()
if not greenlet.dead:
raise j.exceptions.Runtime("Failed to unschedule greenlet")
self._scheduled.pop(service_name)
# wait for service to finish if it's already running
if service_name in self._running:
greenlet = self._running[service_name]
greenlet.unlink(self.__callback)
if block:
try:
j.logger.info(f"Waiting the service {service_name} to finish")
greenlet.join()
j.logger.info(f"Done waiting the service {service_name}")
except Exception as e:
raise j.exceptions.Runtime(f"Exception on waiting for greenlet: {str(e)}")
else:
try:
greenlet.kill()
except Exception as e:
raise j.exceptions.Runtime(f"Exception on killing greenlet: {str(e)}")
if not greenlet.dead:
raise j.exceptions.Runtime("Failed to kill running greenlet")
self.services.pop(service_name)
Classes
class BackgroundService (interval=60, *args, **kwargs)
-
Helper class that provides a standard way to create an ABC using inheritance.
Abstract base class for background services managed by the service manager
Arguments
interval (int | CronTab object | str): scheduled job is executed every interval in seconds / CronTab object / CronTab-formatted string
Expand source code
class BackgroundService(ABC): def __init__(self, interval=60, *args, **kwargs): """Abstract base class for background services managed by the service manager Arguments: interval (int | CronTab object | str): scheduled job is executed every interval in seconds / CronTab object / CronTab-formatted string """ self.interval = interval self.schedule_on_start = False @abstractmethod def job(self): """ Background job to be scheduled. Should be implemented by any service that inherits from this class """ pass
Ancestors
- abc.ABC
Subclasses
- AlertsNotifier
- DiskCheckService
- HeartBeatService
- Notifier
- StellarService
- SystemBackupService
- BillingService
Methods
def job(self)
-
Background job to be scheduled. Should be implemented by any service that inherits from this class
Expand source code
@abstractmethod def job(self): """ Background job to be scheduled. Should be implemented by any service that inherits from this class """ pass
class ServiceManager (*args, **kwargs)
-
A simple attribute-based namespace.
SimpleNamespace(**kwargs)
base class implementation for any class with fields which supports getting/setting raw data for any instance fields.
any instance can have an optional name and a parent.
class Person(Base): name = fields.String() age = fields.Float() p = Person(name="ahmed", age="19") print(p.name, p.age)
Args
parent_
:Base
, optional- parent instance. Defaults to None.
instance_name_
:str
, optional- instance name. Defaults to None.
**values
- any given field values to initiate the instance with
Expand source code
class ServiceManager(Base): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.services = {} # service objects self._scheduled = {} # greenlets of scheduled services self._running = {} # greenlets of currently running services self._pool = Pool(cpu_count()) @staticmethod def seconds_to_next_interval(interval): """Helper method to get seconds remaining to next_interval Arguments: interval (Any): next interval to which seconds remaining is calculated Returns: Real: number of seconds remaining until next interval Raises: ValueError: when the type of interval is not Real / CronTab object / CronTab string format """ if isinstance(interval, Real): return interval elif isinstance(interval, str): try: return CronTab(interval).next(default_utc=True) except Exception as e: raise j.exceptions.Value(str(e)) elif isinstance(interval, CronTab): return interval.next(default_utc=True) else: raise j.exceptions.Runtime(f"Unsupported interval type: {type(interval)}") @staticmethod def _load_service(path): """Load the module in the service file path and get the service object Arguments: path (str): path of the service file Returns: service: service object defined in the service file """ module = j.tools.codeloader.load_python_module(path, force_reload=True) return module.service @staticmethod def __on_exception(greenlet): """Callback to handle exception raised by service greenlet Arguments: greenlet (Greenlet): greenlet object """ message = f"Service {greenlet.service.name} raised an exception: {greenlet.exception}" j.tools.alerthandler.alert_raise(app_name="servicemanager", message=message, alert_type="exception") j.logger.exception(f"Service {greenlet.service.name} raised an exception", exception=greenlet.exception) def __callback(self, greenlet): """Callback runs after greenlet finishes execution Arguments: greenlet (Greenlet): greenlet object """ greenlet.unlink(self.__callback) if greenlet.service.name in self._running: self._running.pop(greenlet.service.name) def _schedule_service(self, service): """Runs a service job and schedules it to run again every period (interval) specified by the service Arguments: service (BackgroundService): background service object """ if service.name not in self._running: greenlet = self._pool.apply_async(service.job) greenlet.link(self.__callback) greenlet.link_exception(self.__on_exception) greenlet.start() self._running[service.name] = greenlet self._running[service.name].service = service next_start = ceil(self.seconds_to_next_interval(service.interval)) self._scheduled[service.name] = gevent.spawn_later(next_start, self._schedule_service, service=service) def start(self): """Start the service manager and schedule default services """ # schedule default services for service in self.services.values(): next_start = ceil(self.seconds_to_next_interval(service.interval)) self._scheduled[service.name] = gevent.spawn_later(next_start, self._schedule_service, service=service) def stop(self): """Stop all background services """ j.logger.info("Stopping background services") for service in list(self.services.keys()): self.stop_service(service) j.logger.info("Done stopping the background services") def add_service(self, service_name, service_path): """Add a new background service to be managed and scheduled by the service manager Arguments: service_path (str): absolute path of the service file """ service = self._load_service(service_path) service.name = service_name if service in self.services.values(): j.logger.debug(f"Service {service.name} is already running. Reloading...") self.stop_service(service.name) next_start = 0 if service.schedule_on_start else ceil(self.seconds_to_next_interval(service.interval)) self._scheduled[service.name] = gevent.spawn_later(next_start, self._schedule_service, service=service) self.services[service.name] = service j.logger.debug(f"Service {service.name} is added.") def stop_service(self, service_name, block=True): """Stop a running background service and unschedule it if it's scheduled to run again Arguments: service_name (str): name of the service to be stopped block (bool): wait for service job to finish. if False, service job will be killed without waiting """ if service_name not in self.services: raise j.exceptions.Value(f"Service {service_name} is not running") # unschedule service if it's scheduled to run again if service_name in self._scheduled: greenlet = self._scheduled[service_name] greenlet.unlink(self.__callback) greenlet.kill() if not greenlet.dead: raise j.exceptions.Runtime("Failed to unschedule greenlet") self._scheduled.pop(service_name) # wait for service to finish if it's already running if service_name in self._running: greenlet = self._running[service_name] greenlet.unlink(self.__callback) if block: try: j.logger.info(f"Waiting the service {service_name} to finish") greenlet.join() j.logger.info(f"Done waiting the service {service_name}") except Exception as e: raise j.exceptions.Runtime(f"Exception on waiting for greenlet: {str(e)}") else: try: greenlet.kill() except Exception as e: raise j.exceptions.Runtime(f"Exception on killing greenlet: {str(e)}") if not greenlet.dead: raise j.exceptions.Runtime("Failed to kill running greenlet") self.services.pop(service_name)
Ancestors
- Base
- types.SimpleNamespace
Static methods
def seconds_to_next_interval(interval)
-
Helper method to get seconds remaining to next_interval
Arguments
interval (Any): next interval to which seconds remaining is calculated
Returns
Real
- number of seconds remaining until next interval
Raises
ValueError
- when the type of interval is not Real / CronTab object / CronTab string format
Expand source code
@staticmethod def seconds_to_next_interval(interval): """Helper method to get seconds remaining to next_interval Arguments: interval (Any): next interval to which seconds remaining is calculated Returns: Real: number of seconds remaining until next interval Raises: ValueError: when the type of interval is not Real / CronTab object / CronTab string format """ if isinstance(interval, Real): return interval elif isinstance(interval, str): try: return CronTab(interval).next(default_utc=True) except Exception as e: raise j.exceptions.Value(str(e)) elif isinstance(interval, CronTab): return interval.next(default_utc=True) else: raise j.exceptions.Runtime(f"Unsupported interval type: {type(interval)}")
Methods
def add_service(self, service_name, service_path)
-
Add a new background service to be managed and scheduled by the service manager
Arguments
service_path (str): absolute path of the service file
Expand source code
def add_service(self, service_name, service_path): """Add a new background service to be managed and scheduled by the service manager Arguments: service_path (str): absolute path of the service file """ service = self._load_service(service_path) service.name = service_name if service in self.services.values(): j.logger.debug(f"Service {service.name} is already running. Reloading...") self.stop_service(service.name) next_start = 0 if service.schedule_on_start else ceil(self.seconds_to_next_interval(service.interval)) self._scheduled[service.name] = gevent.spawn_later(next_start, self._schedule_service, service=service) self.services[service.name] = service j.logger.debug(f"Service {service.name} is added.")
def start(self)
-
Start the service manager and schedule default services
Expand source code
def start(self): """Start the service manager and schedule default services """ # schedule default services for service in self.services.values(): next_start = ceil(self.seconds_to_next_interval(service.interval)) self._scheduled[service.name] = gevent.spawn_later(next_start, self._schedule_service, service=service)
def stop(self)
-
Stop all background services
Expand source code
def stop(self): """Stop all background services """ j.logger.info("Stopping background services") for service in list(self.services.keys()): self.stop_service(service) j.logger.info("Done stopping the background services")
def stop_service(self, service_name, block=True)
-
Stop a running background service and unschedule it if it's scheduled to run again
Arguments
service_name (str): name of the service to be stopped block (bool): wait for service job to finish. if False, service job will be killed without waiting
Expand source code
def stop_service(self, service_name, block=True): """Stop a running background service and unschedule it if it's scheduled to run again Arguments: service_name (str): name of the service to be stopped block (bool): wait for service job to finish. if False, service job will be killed without waiting """ if service_name not in self.services: raise j.exceptions.Value(f"Service {service_name} is not running") # unschedule service if it's scheduled to run again if service_name in self._scheduled: greenlet = self._scheduled[service_name] greenlet.unlink(self.__callback) greenlet.kill() if not greenlet.dead: raise j.exceptions.Runtime("Failed to unschedule greenlet") self._scheduled.pop(service_name) # wait for service to finish if it's already running if service_name in self._running: greenlet = self._running[service_name] greenlet.unlink(self.__callback) if block: try: j.logger.info(f"Waiting the service {service_name} to finish") greenlet.join() j.logger.info(f"Done waiting the service {service_name}") except Exception as e: raise j.exceptions.Runtime(f"Exception on waiting for greenlet: {str(e)}") else: try: greenlet.kill() except Exception as e: raise j.exceptions.Runtime(f"Exception on killing greenlet: {str(e)}") if not greenlet.dead: raise j.exceptions.Runtime("Failed to kill running greenlet") self.services.pop(service_name)
Inherited members