Module jumpscale.tools.startupcmd.startupcmd
This module manages long running commands
To start a python http server
cmd = j.tools.startupcmd.get("cmd_name")
cmd.start_cmd = "python3 -m http.server"
cmd.start()
Check if it is running
cmd.is_running()
Two types of executor: - tmux(default) - foreground
You can attach to a running process by specufying correct name
, ports
, process_strings
, process_strings_regex
.
If it matches any of the above you would be able to perform available on that process.
- Special cases
you can add cmd.ports, cmd.process_strings_regex or cmd.process_strings_regex to reach the process pid
Expand source code
"""This module manages long running commands
To start a python http server
```
cmd = j.tools.startupcmd.get("cmd_name")
cmd.start_cmd = "python3 -m http.server"
cmd.start()
```
Check if it is running
```
cmd.is_running()
```
Two types of executor:
- tmux(default)
- foreground
You can attach to a running process by specufying correct `name`, `ports`, `process_strings`, `process_strings_regex`.
If it matches any of the above you would be able to perform available on that process.
- Special cases
you can add cmd.ports, cmd.process_strings_regex or cmd.process_strings_regex to reach the process pid
"""
from enum import Enum
from jumpscale.loader import j
from jumpscale.core.base import Base, fields
import time
from psutil import NoSuchProcess
class Executor(Enum):
TMUX = "tmux"
FOREGROUND = "foreground"
class StartupCmd(Base):
start_cmd = fields.String()
ports = fields.List(fields.Integer())
executor = fields.Enum(Executor)
check_cmd = fields.String()
path = fields.String(default=j.core.dirs.TMPDIR)
stop_cmd = fields.String()
env = fields.Typed(dict, default={})
timeout = fields.Integer(default=60)
process_strings = fields.List(fields.String())
process_strings_regex = fields.List(fields.String())
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._process = None
self._pid = None
self._cmd_path = None
self.__tmux_window = None
def reset(self):
self._process = None
self._pid = None
@property
def pid(self):
if not self._pid:
pids = j.sals.process.get_pids(f"startupcmd_{self.instance_name}")
if pids:
self._pid = pids[0]
return self._pid
@property
def cmd_path(self):
if not self._cmd_path:
self._cmd_path = j.sals.fs.join_paths(j.core.dirs.VARDIR, "cmds", f"{self.instance_name}.sh")
j.sals.fs.mkdirs(j.sals.fs.dirname(self._cmd_path))
return self._cmd_path
@pid.setter
def pid(self, pid):
self._pid = pid
@property
def process(self):
if not self._process:
if self.pid:
self._process = j.sals.process.get_process_object(self.pid, die=False)
if not self._process:
self.pid = None
else:
processes = self._get_processes_by_port_or_filter()
if len(processes) == 1:
self._process = processes[0]
return self._process
@property
def _tmux_window(self):
if self.executor.value == Executor.TMUX.value:
if self.__tmux_window is None:
self.__tmux_window = j.core.executors.tmux.get_js_window(self.instance_name)
return self.__tmux_window
def _get_processes_by_port_or_filter(self):
"""Uses object properties to find the corresponding process(es)
Returns:
list: All processes that matched
"""
pids_done = []
result = []
def _add_to_result(process):
if process and process.pid not in pids_done:
result.append(process)
pids_done.append(process.pid)
for port in self.ports:
try:
process = j.sals.process.get_process_by_port(port)
except Exception:
continue
_add_to_result(process)
for process_string in self.process_strings:
for pid in j.sals.process.get_filtered_pids(process_string):
process = j.sals.process.get_process_object(pid, die=False)
_add_to_result(process)
for pid in j.sals.process.get_pids_filtered_by_regex(self.process_strings_regex):
process = j.sals.process.get_process_object(pid, die=False)
_add_to_result(process)
# We return all processes which match
return result
def _kill_processes_by_port_or_filter(self):
"""Kills processes that matches object properties"""
processes = self._get_processes_by_port_or_filter()
self._kill_processes(processes)
def _kill_processes(self, processes):
"""Kill processes
Args:
processes (list): List of processes
"""
for process in processes:
try:
process.kill()
except NoSuchProcess:
pass # already killed
def _soft_kill(self):
"""Kills the poocess using `stop_cmd`
Returns:
bool: True if was killed
"""
if self.stop_cmd:
cmd = j.tools.jinja2.render_template(template_text=self.stop_cmd, args=self._get_data())
exit_code, _, _ = j.sals.process.execute(cmd, die=False)
self.reset()
return exit_code == 0
elif self.process:
try:
self.process.terminate()
return True
except Exception:
pass
return False
def _hard_kill(self):
"""Force Kills the process"""
if self.process:
self._kill_processes([self.process])
self.reset()
self._kill_processes_by_port_or_filter()
if self.executor.value == Executor.TMUX.value:
self._tmux_window.kill_window()
self.__tmux_window = None
def stop(self, force=True, wait_for_stop=True, die=True, timeout=None):
"""Stops the running command
Args:
force (bool, optional): If True will force kill the process. Defaults to True.
wait_for_stop (bool, optional): If True will wait until process is stopped. Defaults to True.
die (bool, optional): If True will raise if timeout is exceeded for stop. Defaults to True.
timeout (int, optional): Timeout for stop wait.If not set will use `timeout` property. Defaults to None.
"""
timeout = timeout or self.timeout
if self.is_running():
if self._soft_kill():
self.wait_for_stop(die=False, timeout=timeout)
if force:
self._hard_kill()
if wait_for_stop:
self.wait_for_stop(die=die, timeout=timeout)
j.sals.process.execute(f"rm {self.cmd_path}", die=False)
def is_running(self):
"""Checks if startup cmd is running. Will use `check_cmd` property if defined or check based on objet properties
Returns:
bool: True if it is running
"""
if self.check_cmd:
exit_code, _, _ = j.sals.process.execute(self.check_cmd, die=False)
return exit_code == 0
self.reset()
if self.process:
return self.process.is_running()
return self._get_processes_by_port_or_filter() != []
def _wait(self, for_running, die, timeout):
"""Wait for either start or stop to finishes
Args:
for_running (bool): Whether to check if it is running or stopped.
die (bool, optional): If True will raise if timeout is exceeded for stop. Defaults to True.
timeout (int, optional): Timeout for wait operation. Defaults to None.
Raises:
j.exceptions.Timeout: If timeout is exceeded.
"""
end = j.data.time.now().timestamp + timeout
while j.data.time.now().timestamp < end:
if self.is_running() == for_running:
break
time.sleep(0.05)
else:
if die:
raise j.exceptions.Timeout(f"Wait operation exceeded timeout: {timeout}")
def wait_for_stop(self, die=True, timeout=10):
"""Wait for stop to finishes
Args:
die (bool, optional): If True will raise if timeout is exceeded for stop. Defaults to True.
timeout (int, optional): Timeout for wait operation. Defaults to None.
Raises:
j.exceptions.Timeout: If timeout is exceeded.
"""
self._wait(False, die, timeout)
def wait_for_running(self, die=True, timeout=10):
"""Wait for start to finishes
Args:
die (bool, optional): If True will raise if timeout is exceeded for stop. Defaults to True.
timeout (int, optional): Timeout for wait operation. Defaults to None.
Raises:
j.exceptions.Timeout: If timeout is exceeded.
"""
self._wait(True, die, timeout)
def start(self):
"""Starts the process"""
if self.is_running():
return
if not self.start_cmd:
raise j.exceptions.Value("please make sure start_cmd has been set")
if "\n" in self.start_cmd.strip():
command = self.start_cmd
else:
template_script = """
set +ex
{% for key,val in env.items() %}
export {{key}}='{{val}}'
{% endfor %}
mkdir -p {{path}}
cd {{path}}
bash -c \"exec -a startupcmd_{{name}} {{start_cmd}}\"
"""
script = j.tools.jinja2.render_template(
template_text=template_script,
env=self.env,
path=self.path,
start_cmd=self.start_cmd,
name=self.instance_name,
)
j.sals.fs.write_file(self.cmd_path, script)
j.sals.fs.chmod(self.cmd_path, 0o770)
command = f"sh {self.cmd_path}"
if self.executor.value == Executor.FOREGROUND.value:
j.sals.process.execute(command)
elif self.executor.value == Executor.TMUX.value:
self._tmux_window.attached_pane.send_keys(command)
self.wait_for_running(die=True, timeout=self.timeout)
Classes
class Executor (value, names=None, *, module=None, qualname=None, type=None, start=1)
-
An enumeration.
Expand source code
class Executor(Enum): TMUX = "tmux" FOREGROUND = "foreground"
Ancestors
- enum.Enum
Class variables
var FOREGROUND
var TMUX
class StartupCmd (*args, **kwargs)
-
A simple attribute-based namespace.
SimpleNamespace(**kwargs)
base class implementation for any class with fields which supports getting/setting raw data for any instance fields.
any instance can have an optional name and a parent.
class Person(Base): name = fields.String() age = fields.Float() p = Person(name="ahmed", age="19") print(p.name, p.age)
Args
parent_
:Base
, optional- parent instance. Defaults to None.
instance_name_
:str
, optional- instance name. Defaults to None.
**values
- any given field values to initiate the instance with
Expand source code
class StartupCmd(Base): start_cmd = fields.String() ports = fields.List(fields.Integer()) executor = fields.Enum(Executor) check_cmd = fields.String() path = fields.String(default=j.core.dirs.TMPDIR) stop_cmd = fields.String() env = fields.Typed(dict, default={}) timeout = fields.Integer(default=60) process_strings = fields.List(fields.String()) process_strings_regex = fields.List(fields.String()) def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._process = None self._pid = None self._cmd_path = None self.__tmux_window = None def reset(self): self._process = None self._pid = None @property def pid(self): if not self._pid: pids = j.sals.process.get_pids(f"startupcmd_{self.instance_name}") if pids: self._pid = pids[0] return self._pid @property def cmd_path(self): if not self._cmd_path: self._cmd_path = j.sals.fs.join_paths(j.core.dirs.VARDIR, "cmds", f"{self.instance_name}.sh") j.sals.fs.mkdirs(j.sals.fs.dirname(self._cmd_path)) return self._cmd_path @pid.setter def pid(self, pid): self._pid = pid @property def process(self): if not self._process: if self.pid: self._process = j.sals.process.get_process_object(self.pid, die=False) if not self._process: self.pid = None else: processes = self._get_processes_by_port_or_filter() if len(processes) == 1: self._process = processes[0] return self._process @property def _tmux_window(self): if self.executor.value == Executor.TMUX.value: if self.__tmux_window is None: self.__tmux_window = j.core.executors.tmux.get_js_window(self.instance_name) return self.__tmux_window def _get_processes_by_port_or_filter(self): """Uses object properties to find the corresponding process(es) Returns: list: All processes that matched """ pids_done = [] result = [] def _add_to_result(process): if process and process.pid not in pids_done: result.append(process) pids_done.append(process.pid) for port in self.ports: try: process = j.sals.process.get_process_by_port(port) except Exception: continue _add_to_result(process) for process_string in self.process_strings: for pid in j.sals.process.get_filtered_pids(process_string): process = j.sals.process.get_process_object(pid, die=False) _add_to_result(process) for pid in j.sals.process.get_pids_filtered_by_regex(self.process_strings_regex): process = j.sals.process.get_process_object(pid, die=False) _add_to_result(process) # We return all processes which match return result def _kill_processes_by_port_or_filter(self): """Kills processes that matches object properties""" processes = self._get_processes_by_port_or_filter() self._kill_processes(processes) def _kill_processes(self, processes): """Kill processes Args: processes (list): List of processes """ for process in processes: try: process.kill() except NoSuchProcess: pass # already killed def _soft_kill(self): """Kills the poocess using `stop_cmd` Returns: bool: True if was killed """ if self.stop_cmd: cmd = j.tools.jinja2.render_template(template_text=self.stop_cmd, args=self._get_data()) exit_code, _, _ = j.sals.process.execute(cmd, die=False) self.reset() return exit_code == 0 elif self.process: try: self.process.terminate() return True except Exception: pass return False def _hard_kill(self): """Force Kills the process""" if self.process: self._kill_processes([self.process]) self.reset() self._kill_processes_by_port_or_filter() if self.executor.value == Executor.TMUX.value: self._tmux_window.kill_window() self.__tmux_window = None def stop(self, force=True, wait_for_stop=True, die=True, timeout=None): """Stops the running command Args: force (bool, optional): If True will force kill the process. Defaults to True. wait_for_stop (bool, optional): If True will wait until process is stopped. Defaults to True. die (bool, optional): If True will raise if timeout is exceeded for stop. Defaults to True. timeout (int, optional): Timeout for stop wait.If not set will use `timeout` property. Defaults to None. """ timeout = timeout or self.timeout if self.is_running(): if self._soft_kill(): self.wait_for_stop(die=False, timeout=timeout) if force: self._hard_kill() if wait_for_stop: self.wait_for_stop(die=die, timeout=timeout) j.sals.process.execute(f"rm {self.cmd_path}", die=False) def is_running(self): """Checks if startup cmd is running. Will use `check_cmd` property if defined or check based on objet properties Returns: bool: True if it is running """ if self.check_cmd: exit_code, _, _ = j.sals.process.execute(self.check_cmd, die=False) return exit_code == 0 self.reset() if self.process: return self.process.is_running() return self._get_processes_by_port_or_filter() != [] def _wait(self, for_running, die, timeout): """Wait for either start or stop to finishes Args: for_running (bool): Whether to check if it is running or stopped. die (bool, optional): If True will raise if timeout is exceeded for stop. Defaults to True. timeout (int, optional): Timeout for wait operation. Defaults to None. Raises: j.exceptions.Timeout: If timeout is exceeded. """ end = j.data.time.now().timestamp + timeout while j.data.time.now().timestamp < end: if self.is_running() == for_running: break time.sleep(0.05) else: if die: raise j.exceptions.Timeout(f"Wait operation exceeded timeout: {timeout}") def wait_for_stop(self, die=True, timeout=10): """Wait for stop to finishes Args: die (bool, optional): If True will raise if timeout is exceeded for stop. Defaults to True. timeout (int, optional): Timeout for wait operation. Defaults to None. Raises: j.exceptions.Timeout: If timeout is exceeded. """ self._wait(False, die, timeout) def wait_for_running(self, die=True, timeout=10): """Wait for start to finishes Args: die (bool, optional): If True will raise if timeout is exceeded for stop. Defaults to True. timeout (int, optional): Timeout for wait operation. Defaults to None. Raises: j.exceptions.Timeout: If timeout is exceeded. """ self._wait(True, die, timeout) def start(self): """Starts the process""" if self.is_running(): return if not self.start_cmd: raise j.exceptions.Value("please make sure start_cmd has been set") if "\n" in self.start_cmd.strip(): command = self.start_cmd else: template_script = """ set +ex {% for key,val in env.items() %} export {{key}}='{{val}}' {% endfor %} mkdir -p {{path}} cd {{path}} bash -c \"exec -a startupcmd_{{name}} {{start_cmd}}\" """ script = j.tools.jinja2.render_template( template_text=template_script, env=self.env, path=self.path, start_cmd=self.start_cmd, name=self.instance_name, ) j.sals.fs.write_file(self.cmd_path, script) j.sals.fs.chmod(self.cmd_path, 0o770) command = f"sh {self.cmd_path}" if self.executor.value == Executor.FOREGROUND.value: j.sals.process.execute(command) elif self.executor.value == Executor.TMUX.value: self._tmux_window.attached_pane.send_keys(command) self.wait_for_running(die=True, timeout=self.timeout)
Ancestors
- Base
- types.SimpleNamespace
Instance variables
var check_cmd
-
getter method this property
will call
_get_value
, which would if the value is already defined and will get the default value if notReturns
any
- the field value
Expand source code
def getter(self): """ getter method this property will call `_get_value`, which would if the value is already defined and will get the default value if not Returns: any: the field value """ return self._get_value(name, field)
var cmd_path
-
Expand source code
@property def cmd_path(self): if not self._cmd_path: self._cmd_path = j.sals.fs.join_paths(j.core.dirs.VARDIR, "cmds", f"{self.instance_name}.sh") j.sals.fs.mkdirs(j.sals.fs.dirname(self._cmd_path)) return self._cmd_path
var env
-
getter method this property
will call
_get_value
, which would if the value is already defined and will get the default value if notReturns
any
- the field value
Expand source code
def getter(self): """ getter method this property will call `_get_value`, which would if the value is already defined and will get the default value if not Returns: any: the field value """ return self._get_value(name, field)
var executor
-
getter method this property
will call
_get_value
, which would if the value is already defined and will get the default value if notReturns
any
- the field value
Expand source code
def getter(self): """ getter method this property will call `_get_value`, which would if the value is already defined and will get the default value if not Returns: any: the field value """ return self._get_value(name, field)
var path
-
getter method this property
will call
_get_value
, which would if the value is already defined and will get the default value if notReturns
any
- the field value
Expand source code
def getter(self): """ getter method this property will call `_get_value`, which would if the value is already defined and will get the default value if not Returns: any: the field value """ return self._get_value(name, field)
var pid
-
Expand source code
@property def pid(self): if not self._pid: pids = j.sals.process.get_pids(f"startupcmd_{self.instance_name}") if pids: self._pid = pids[0] return self._pid
var ports
-
getter method this property
will call
_get_value
, which would if the value is already defined and will get the default value if notReturns
any
- the field value
Expand source code
def getter(self): """ getter method this property will call `_get_value`, which would if the value is already defined and will get the default value if not Returns: any: the field value """ return self._get_value(name, field)
var process
-
Expand source code
@property def process(self): if not self._process: if self.pid: self._process = j.sals.process.get_process_object(self.pid, die=False) if not self._process: self.pid = None else: processes = self._get_processes_by_port_or_filter() if len(processes) == 1: self._process = processes[0] return self._process
var process_strings
-
getter method this property
will call
_get_value
, which would if the value is already defined and will get the default value if notReturns
any
- the field value
Expand source code
def getter(self): """ getter method this property will call `_get_value`, which would if the value is already defined and will get the default value if not Returns: any: the field value """ return self._get_value(name, field)
var process_strings_regex
-
getter method this property
will call
_get_value
, which would if the value is already defined and will get the default value if notReturns
any
- the field value
Expand source code
def getter(self): """ getter method this property will call `_get_value`, which would if the value is already defined and will get the default value if not Returns: any: the field value """ return self._get_value(name, field)
var start_cmd
-
getter method this property
will call
_get_value
, which would if the value is already defined and will get the default value if notReturns
any
- the field value
Expand source code
def getter(self): """ getter method this property will call `_get_value`, which would if the value is already defined and will get the default value if not Returns: any: the field value """ return self._get_value(name, field)
var stop_cmd
-
getter method this property
will call
_get_value
, which would if the value is already defined and will get the default value if notReturns
any
- the field value
Expand source code
def getter(self): """ getter method this property will call `_get_value`, which would if the value is already defined and will get the default value if not Returns: any: the field value """ return self._get_value(name, field)
var timeout
-
getter method this property
will call
_get_value
, which would if the value is already defined and will get the default value if notReturns
any
- the field value
Expand source code
def getter(self): """ getter method this property will call `_get_value`, which would if the value is already defined and will get the default value if not Returns: any: the field value """ return self._get_value(name, field)
Methods
def is_running(self)
-
Checks if startup cmd is running. Will use
check_cmd
property if defined or check based on objet propertiesReturns
bool
- True if it is running
Expand source code
def is_running(self): """Checks if startup cmd is running. Will use `check_cmd` property if defined or check based on objet properties Returns: bool: True if it is running """ if self.check_cmd: exit_code, _, _ = j.sals.process.execute(self.check_cmd, die=False) return exit_code == 0 self.reset() if self.process: return self.process.is_running() return self._get_processes_by_port_or_filter() != []
def reset(self)
-
Expand source code
def reset(self): self._process = None self._pid = None
def start(self)
-
Starts the process
Expand source code
def start(self): """Starts the process""" if self.is_running(): return if not self.start_cmd: raise j.exceptions.Value("please make sure start_cmd has been set") if "\n" in self.start_cmd.strip(): command = self.start_cmd else: template_script = """ set +ex {% for key,val in env.items() %} export {{key}}='{{val}}' {% endfor %} mkdir -p {{path}} cd {{path}} bash -c \"exec -a startupcmd_{{name}} {{start_cmd}}\" """ script = j.tools.jinja2.render_template( template_text=template_script, env=self.env, path=self.path, start_cmd=self.start_cmd, name=self.instance_name, ) j.sals.fs.write_file(self.cmd_path, script) j.sals.fs.chmod(self.cmd_path, 0o770) command = f"sh {self.cmd_path}" if self.executor.value == Executor.FOREGROUND.value: j.sals.process.execute(command) elif self.executor.value == Executor.TMUX.value: self._tmux_window.attached_pane.send_keys(command) self.wait_for_running(die=True, timeout=self.timeout)
def stop(self, force=True, wait_for_stop=True, die=True, timeout=None)
-
Stops the running command
Args
force
:bool
, optional- If True will force kill the process. Defaults to True.
wait_for_stop
:bool
, optional- If True will wait until process is stopped. Defaults to True.
die
:bool
, optional- If True will raise if timeout is exceeded for stop. Defaults to True.
timeout
:int
, optional- Timeout for stop wait.If not set will use
timeout
property. Defaults to None.
Expand source code
def stop(self, force=True, wait_for_stop=True, die=True, timeout=None): """Stops the running command Args: force (bool, optional): If True will force kill the process. Defaults to True. wait_for_stop (bool, optional): If True will wait until process is stopped. Defaults to True. die (bool, optional): If True will raise if timeout is exceeded for stop. Defaults to True. timeout (int, optional): Timeout for stop wait.If not set will use `timeout` property. Defaults to None. """ timeout = timeout or self.timeout if self.is_running(): if self._soft_kill(): self.wait_for_stop(die=False, timeout=timeout) if force: self._hard_kill() if wait_for_stop: self.wait_for_stop(die=die, timeout=timeout) j.sals.process.execute(f"rm {self.cmd_path}", die=False)
def wait_for_running(self, die=True, timeout=10)
-
Wait for start to finishes
Args
die
:bool
, optional- If True will raise if timeout is exceeded for stop. Defaults to True.
timeout
:int
, optional- Timeout for wait operation. Defaults to None.
Raises
j.exceptions.Timeout
- If timeout is exceeded.
Expand source code
def wait_for_running(self, die=True, timeout=10): """Wait for start to finishes Args: die (bool, optional): If True will raise if timeout is exceeded for stop. Defaults to True. timeout (int, optional): Timeout for wait operation. Defaults to None. Raises: j.exceptions.Timeout: If timeout is exceeded. """ self._wait(True, die, timeout)
def wait_for_stop(self, die=True, timeout=10)
-
Wait for stop to finishes
Args
die
:bool
, optional- If True will raise if timeout is exceeded for stop. Defaults to True.
timeout
:int
, optional- Timeout for wait operation. Defaults to None.
Raises
j.exceptions.Timeout
- If timeout is exceeded.
Expand source code
def wait_for_stop(self, die=True, timeout=10): """Wait for stop to finishes Args: die (bool, optional): If True will raise if timeout is exceeded for stop. Defaults to True. timeout (int, optional): Timeout for wait operation. Defaults to None. Raises: j.exceptions.Timeout: If timeout is exceeded. """ self._wait(False, die, timeout)
Inherited members