Module jumpscale.sals.process

This module execute process on system and manage them

below are some examples of the functions included in this module (not all inclusive.):


>>> from jumpscale.loader import j
>>> import signal

to create a process

>>> rc, out, err = j.sals.process.execute("ls", cwd="/tmp", showout=True)
# this executes ls command on dir "/tmp" showing output from stdout
# rc -> contains exit status
# out -> the actual output
# err -> in case an error happened this var will contains the error msg

checks if a process with this pid is exists in the current process list

>>> j.sals.process.is_alive(10022)

Checks if a specific command is available on the system

>>> j.sals.process.is_installed('top')

kill a process with pid 10022 with SIGTERM

>>> j.sals.process.kill(10022)

kill a process with pid 10022 with SIGTERM, wait 3 seconds for it to disappear, then if still alive kill it with SIGKILL

>>> j.sals.process.kill(10022, timeout=3, sure_kill=True)

Check if there is any running process that match the given name.

>>> j.sals.process.ps_find('python3')

gets pid of the process listenning on port TCP 8000 ipv4 localhost address

>>> j.sals.process.get_pid_by_port(8000)

gets pid of the process listenning on port UDP 8000 ipv6 localhost address

>>> j.sals.process.get_pid_by_port(8000, ipv6=True, udp=True)

Returns the psutil.Process object that is listening on the given port

>>> j.sals.process.get_process_by_port(8000, ipv6=True, udp=True)

Get pids of process by a filter string and sort by cpu utilization descendingly

>>> j.sals.process.get_pids_filtered_sorted('chrome', sort='%cpu', desc=True)

Return a list of processes ID(s) matching the given name.

>>> j.sals.process.get_pids('code')

Return a list of processes ID(s) matching the given name, including the result of matching againest the full command line.

>>> j.sals.process.get_pids('http.server', full_cmd_line=True)

Return a list of processes ID(s) matching the given name, including any zombie processes

>>> j.sals.process.get_pids('python3', include_zombie=False)

get processes info about top 3 processes which consumed the most memory

>>> j.sals.process.get_processes_info(limit=3)

get processes info about top 3 processes which consumed the most cpu time

>>> j.sals.process.get_processes_info(sort='cpu_time', limit=3)

get processes info about last process started

>>> j.sals.process.get_processes_info(sort='create_time', limit=1)

get processes info sorted by pid ascending limited to 10 processes

>>> j.sals.process.get_processes_info(sort='pid', limit=10, desc=False)

Kill a process and its children (including grandchildren) with SIGTERM and fallback to SIGKILL when needed

>>> j.sals.process.kill_proc_tree(20778, sure_kill=True)

send SIGTERM to all processes spawned by a given process, but leave the process itself.

>>> j.sals.process.kill_proc_tree(20778, include_parent=False)

Terminate a list of processes with a given list of pids, fallback to SIGKILL after 1 seconds.

>>> j.sals.process.kill_all_pids([3067, 7888, 10221], timeout=1, sure_kill=True)

terminate a process that listen to a given tcp port on ipv4 address

>>> j.sals.process.kill_process_by_port(8000)

terminate a process that listen to a given udp port on ipv6 address, fallback to SIGKILL after 3 sec

>>> j.sals.process.kill_process_by_port(8000, udp=True, ipv6=True, timeout=3, sure_kill=True)

Terminate all processes owned by a given user name, fallback to SIGKILL when needed

>>> j.sals.process.kill_user_processes('sameh', sure_kill=True)


Expand source code
"""This module execute process on system and manage them

below are some examples of the functions included in this module (not all inclusive.):

    >>> from jumpscale.loader import j
    >>> import signal

    #to create a process
    >>> rc, out, err = j.sals.process.execute("ls", cwd="/tmp", showout=True)
    # this executes ls command on dir "/tmp" showing output from stdout
    # rc -> contains exit status
    # out -> the actual output
    # err -> in case an error happened this var will contains the error msg

    # checks if a process with this pid is exists in the current process list
    >>> j.sals.process.is_alive(10022)

    # Checks if a specific command is available on the system
    >>> j.sals.process.is_installed('top')

    # kill a process with pid 10022 with SIGTERM
    >>> j.sals.process.kill(10022)

    # kill a process with pid 10022 with SIGTERM, wait 3 seconds for it to disappear, then if still alive kill it with SIGKILL
    >>> j.sals.process.kill(10022, timeout=3, sure_kill=True)

    # Check if there is any running process that match the given name.
    >>> j.sals.process.ps_find('python3')

    # gets pid of the process listenning on port TCP 8000 ipv4 localhost address
    >>> j.sals.process.get_pid_by_port(8000)

    # gets pid of the process listenning on port UDP 8000 ipv6 localhost address
    >>> j.sals.process.get_pid_by_port(8000, ipv6=True, udp=True)

    # Returns the psutil.Process object that is listening on the given port
    >>> j.sals.process.get_process_by_port(8000, ipv6=True, udp=True)

    # Get pids of process by a filter string and sort by cpu utilization descendingly
    >>> j.sals.process.get_pids_filtered_sorted('chrome', sort='%cpu', desc=True)

    # Return a list of processes ID(s) matching the given name.
    >>> j.sals.process.get_pids('code')

    # Return a list of processes ID(s) matching the given name, including the result of matching againest the full command line.
    >>> j.sals.process.get_pids('http.server', full_cmd_line=True)

    # Return a list of processes ID(s) matching the given name, including any zombie processes
    >>> j.sals.process.get_pids('python3', include_zombie=False)

    # get processes info about top 3 processes which consumed the most memory
    >>> j.sals.process.get_processes_info(limit=3)

    # get processes info about top 3 processes which consumed the most cpu time
    >>> j.sals.process.get_processes_info(sort='cpu_time', limit=3)

    # get processes info about last process started
    >>> j.sals.process.get_processes_info(sort='create_time', limit=1)

    # get processes info sorted by pid ascending limited to 10 processes
    >>> j.sals.process.get_processes_info(sort='pid', limit=10, desc=False)

    # Kill a process and its children (including grandchildren) with SIGTERM and fallback to SIGKILL when needed
    >>> j.sals.process.kill_proc_tree(20778, sure_kill=True)

    # send SIGTERM to all processes spawned by a given process, but leave the process itself.
    >>> j.sals.process.kill_proc_tree(20778, include_parent=False)

    # Terminate a list of processes with a given list of pids, fallback to SIGKILL after 1 seconds.
    >>> j.sals.process.kill_all_pids([3067, 7888, 10221], timeout=1, sure_kill=True)

    # terminate a process that listen to a given tcp port on ipv4 address
    >>> j.sals.process.kill_process_by_port(8000)

    # terminate a process that listen to a given udp port on ipv6 address, fallback to SIGKILL after 3 sec
    >>> j.sals.process.kill_process_by_port(8000, udp=True, ipv6=True, timeout=3, sure_kill=True)

    # Terminate all processes owned by a given user name, fallback to SIGKILL when needed
    >>> j.sals.process.kill_user_processes('sameh', sure_kill=True)

import math
import os
import re
import shlex
import signal
import subprocess
import time
from collections import defaultdict

import psutil
from jumpscale.loader import j

def execute(
    """Execute a command.

    Accepts command as a list too, with auto-escaping.

        cmd (str or list of str): Command to be executed, e.g. "ls -la" or ["ls", "-la"]
        showout (bool, optional): Whether to show stdout of the command or not. Defaults to False.
        cwd (str, optional): Path to `cd` into before running command. Defaults to None.
        shell (str, optional): Specify a working directory for the command. Defaults to "/bin/bash".
        timeout (int, optional): Timeout before kill the process. Defaults to 600.
        asynchronous (bool, optional): Whether to execute in asynchronous mode or not. Defaults to False.
        env (dict, optional): Add environment variables here. Defaults to None.
        replace_env (bool, optional): Whether to replace the entire environment with env. Defaults to False.
        die (bool, optional): Whether to raise exception if command failed or not. Defaults to False.

        tuple: tuple[return_code: int, stdout: str, stderr: str]
    return j.core.executors.run_local(
        hide=not showout,
        env=env or {},
        warn=not die,

def is_alive(pid):
    """Check whether the given PID exists in the current process list.

        pid (int): Process ID (PID) to be checked.

        bool: True if the given PID exists in the current process list, False otherwise.
    return psutil.pid_exists(pid)

def is_installed(cmd):
    """Checks if a specific command is available on system e.g. curl.

        cmd (str): Command to be checked.

        bool: True if command is available, False otherwise.
    rc, _, _ = execute(f"which {cmd}", die=False)
    return rc == 0

def kill(proc, sig=signal.SIGTERM, timeout=5, sure_kill=False):
    """Kill a process with a specified signal.

        proc (int or psutil.Process): Target process ID (PID) or psutil.Process object.
        sig (signal, optional): See signal module constants. Defaults to signal.SIGTERM.
        timeout (int, optional): How long to wait for a process to terminate (seconds) before raise exception
            or, if sure_kill=True, send a SIGKILL. Defaults to 5.
        sure_kill (bool, optional): Whether to fallback to SIGKILL if the timeout exceeded for the terminate operation or not. Defaults to False.

        j.exceptions.Runtime: In case killing the process failed.
        j.exceptions.Permission: In case the permission to perform this action is denied.
        if isinstance(proc, int):
            proc = get_process_object(proc, die=True)
        if proc.status() == psutil.STATUS_ZOMBIE:
        # Wait for a process to terminate
        # If PID no longer exists return None immediately
        # If timeout exceeded and the process is still alive raise TimeoutExpired exception
        # the process with PID {} was terminated with sig {sig}
    except psutil.TimeoutExpired as e:
        # timeout expires and process is still alive.
        if sure_kill and sig != signal.SIGKILL and != "nt":
            # SIGKILL not supported in windows
            # If a process gets this signal it must quit immediately and will not perform any clean-up operations
            # SIGKILL signal sent
                # the process with PID {} was terminated with sig {signal.SIGKILL}
            except psutil.TimeoutExpired as e:
                if proc.status() == psutil.STATUS_ZOMBIE:
                    # the process with PID: {} becomes a zombie and should be considered a dead.
                # the process may be in an uninterruptible sleep
                j.logger.warning(f"Could not kill the process with pid: {} with {sig}. Timeout: {timeout}")
                raise j.exceptions.Runtime(f"Could not kill process with pid {}, {proc.status()}") from e
            raise j.exceptions.Runtime(f"Could not kill process with pid {}") from e
    except psutil.AccessDenied as e:
        # permission to perform an action is denied
        raise j.exceptions.Permission("Permission to perform this action is denied!") from e
    except psutil.NoSuchProcess:
        # Process no longer exists or Zombie (already dead)

def ps_find(process_name):
    """Check if there is any running process that match the given name.

        process_name (str): The target process name. will match against against, Process.exe() and Process.cmdline()

        bool: True if process is found, False otherwise.
    return len(get_pids(process_name, limit=1)) == 1

def get_pids_filtered_sorted(filterstr, sortkey=None, desc=False):
    """Get pids of process by a filter string and optionally sort by sortkey

        filterstr (str): filter string.
        sortkey (str, optional): Defaults to None. (if no sortkey used it will sort by pid(s) in ascending order).
            sortkey can be one of the following:
            %cpu           cpu utilization of the process in
            %mem           ratio of the process's resident set size  to the physical memory on the machine, expressed as a percentage.
            cputime        cumulative CPU time, "[DD-]hh:mm:ss" format.  (alias time).
            egid           effective group ID number of the process as a decimal integer.  (alias gid).
            egroup         effective group ID of the process. This will be the textual group ID, if it can be obtained and the field width permits, or a decimal representation otherwise.  (alias group).
            euid           effective user ID (alias uid).
            euser          effective user name.
            gid            see egid. (alias egid).
            pid            a number representing the process ID (alias tgid).
            ppid           parent process ID.
            psr            processor that process is currently assigned to.
            start_time     starting time or date of the process.
        desc: (bool, optional): Whether to sort the processes in descending order or not(asc). Defaults to False (asc).

        list of int: list of the processes IDs
    ps_to_psutil_map = {
        "%cpu": "cpu_percent",
        "%mem": "memory_percent",
        "cputime": "cpu_time",
        "psr": "cpu_num",
        "start_time": "create_time",
        "egid": "egid",
        "gid": "egid",
        "euid": "euid",
        "uid": "euid",
        "euser": "username",
        "pid": "pid",
        "ppid": "ppid",
    if sortkey is None:  # mimic default ps commnad sorting behavior
        sortkey = "pid"
    # return pids from process objects
    return [p["pid"] for p in get_processes_info(sort=ps_to_psutil_map[sortkey], filterstr=filterstr, desc=desc)]

def get_filtered_pids(filterstr, excludes=None):
    """Get pids filtered by filterstr and excludes, matching against the full command line used to start the process.

        filterstr (str): the String to filter based on.
        excludes (list[str]): exclude list. Defaults to None.

        list of int: List of the processes IDs
    pids = []
    for proc in psutil.process_iter(["name", "cmdline"]):
        cmd_line = " ".join(["cmdline"])
        if["cmdline"] and filterstr in cmd_line:
            # found filter string: {filterstr} in command line: {cmd_line}
            if excludes:
                for exclude in excludes:
                    if exclude in cmd_line:
                        # we excluded this because it contain exclude string
                else:  # intended `for/else` meaning for loop finished normally with no break
                    pids.append(  # may yield proc instead
                pids.append(  # may yield proc instead
    # if pids is empty root could be needed
    return pids

def get_pids_filtered_by_regex(regex_list):
    """Get pids of a process filtered by Regex list, matching against the full command line used to start the process.

        regex_list (list[str]): List of regex expressions.

        list of int: List of the processes IDs.
    res = []
    for process in psutil.process_iter(attrs=["cmdline"]):
            cmdline = " ".join(["cmdline"])
            for r in regex_list:
                if re.match(r, cmdline):
    return res

def check_start(cmd, filterstr, n_instances=1, retry=1, timeout=2, delay=0.5):
    """Run command (possibly multiple times) and check if it is started based on filterstr

        cmd (str or list of str): Command to be executed.
        filterstr (str): Filter string. will match against against, Process.exe() and Process.cmdline()
        n_instances (int, optional): Number of needed instances. Defaults to 1.
        retry (int, optional): Number of retries to execute the command and check. Defaults to 1.
        timout (int, optional): how long the function should wait for process to finish (first or parnet process started with the command)
        delay (int, optional): how long the function should delay the checking process after the process finished or timeout exceeded (in case the first process started another one).

        j.exceptions.Runtime: will be raised if we didn't reach number of required instances.
    for i in range(retry):
        if isinstance(cmd, str):
            args = shlex.split(cmd)
        proc = psutil.Popen(args, close_fds=True, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE)
            rc = proc.wait(timeout)  # makesure the process is stable
            if rc != 0:  # executing the command succeeded but exited immediately!
                output, error_output = proc.communicate()
                j.logger.error(f"the start command exited with error: {error_output}")  # the process exited with error
        except psutil.TimeoutExpired:
            pass  # still running
        # wait extra delay to allow any subprocess spawned from the process we just started to finish
        # this may not needed in many cases
        # TODO check based on command
        if check_running(filterstr, min=n_instances):
            # found at least {n_instances} instances using the filter string {filterstr}
            # the required number of instances using the filter string {filterstr} not found yet!
    j.logger.error(f"could not start the required number of instances ({n_instances}) after {i} attempts.")
    raise j.exceptions.Runtime("could not start the required number of instances.")

def check_stop(cmd, filterstr, retry=1, n_instances=0, timeout=2, delay=0.5):
    """Executes a stop command (possibly multiple times) and check if it is already stopped based on filterstr

        cmd (str): Command to be executed.
        filterstr (str): Filter string.
        retry (int, optional): Number of retries. Defaults to 1.
        n_inst (int, optional): Number of instances after stop. Defaults to 0.
        timout (int, optional): how long the function should wait for process to finish (first or parnet process started with the command)
        delay (int, optional): how long the function should delay the checking process after the process finished or timeout exceeded (in case the first process started another one).

        j.exceptions.Runtime: if number of instances not matched

    for i in range(retry):
        if isinstance(cmd, str):
            args = shlex.split(cmd)
        proc = psutil.Popen(args, close_fds=True, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE)
            rc = proc.wait(timeout)  # makesure the process is stable
            # print(f'rc: {rc}')
            if rc != 0:
                # the process exited with error
                output, error_output = proc.communicate()
                j.logger.warnnig(f"the stop command exited with error: {error_output}")
        except psutil.TimeoutExpired:
            # still running
        # wait extra delay to allow any subprocess spawned from the process we just started to finish
        # this may not needed in many cases
        found = get_pids(filterstr)
        if len(found) == n_instances:
            # the required {n_instances} matching the instances found using the filter string: {filterstr}
            # the required {n_instances} not matching the instances number found using the filter string: {filterstr} yet
    # could not match the required number of instances {n_instances} after {i} attempts.
    raise j.exceptions.Runtime(f"could not stop {cmd}, found {len(found)} of instances instead of {n_instances}")

def get_pids(process_name, match_predicate=None, limit=0, _alt_source=None, include_zombie=False, full_cmd_line=False):
    """Return a list of processes ID(s) matching a given process name.

    Function will check string against, Process.exe() and Process.cmdline()

        process_name (str): The target process name
        match_predicate (callable, optional): Function that does matching between found processes and the targeted process.
            the function should accept two arguments and return a boolean. Defaults to None.
        limit (int, optional): If not equal to 0, function will return as fast as the number of PID(s) found become equal to `limit` value.
        _alt_source(callable or iterable, optional): Can be used to specify an alternative source of the psutil.Process objects to match against.
            ex: get_user_processes func, or get_similar_processes.
            if not specified, psutil.process_iter will be used. Defaults to None.
        include_zombie (bool, optional): Whether to include pid for zombie proccesses or not. Defaults to False.
        full_cmd_line (bool, optional): The pattern is normally only matched against the process name.
            if full_cmd_line is set to True, the full command line is used. Defaults to False.

        list of int: List of the processes IDs.
    # default match predicate
    def default_predicate(target, given):
        if isinstance(given, list):
            return target in given
            return target.strip().lower() == given.lower()

    default_processes_source = psutil.process_iter(["name", "exe", "cmdline"])

    match_predicate = match_predicate or default_predicate
    p_source = _alt_source or default_processes_source

    pids = []
    for proc in p_source:
            if not include_zombie and proc.status() == psutil.STATUS_ZOMBIE:
                # {} is a zombie process, ignoring it

            candidates = [["name"]]
                if full_cmd_line:

            if any([match_predicate(process_name, candidate) for candidate in candidates]):
                # return early if no need to iterate over all running process
                if limit and len(pids) == limit:
                    return pids
        except psutil.Error:
    return pids

def get_my_process():
    """Get psutil.Process object of the current process.

        psutil.Process: Process object of the current process.
    return get_process_object(os.getpid(), die=True)

def get_process_object(pid, die=False):
    """Get psutil.Process object of a given process ID (PID).

        pid (int): Process ID (PID) to get
        die (bool, optional): Whether to raise an exception if no process with the given PID is found in the
            current process list or not. Defaults to False.

        psutil.NoSuchProcess: If process with the given PID is not found and die set to True.
        psutil.AccessDenied: If permission denied.

        psutil.Process or None: The Process object of the given PID if found, otherwise None, if die set to False.
        return psutil.Process(pid)
    except (psutil.AccessDenied, psutil.NoSuchProcess) as e:
        # when you query processess owned by another user, especially on macOS and Windows you may get AccessDenied exception
        if die:
            raise e
            return None

def get_user_processes(user):
    """Get all process for a specific user.

        user (str): The user name to match against.

        psutil.Process: process object for all processes owned by `user`.
        for process in psutil.process_iter(["name", "exe", "cmdline"]):
            if process.username() == user:
                yield process
    except (psutil.AccessDenied, psutil.NoSuchProcess):

def kill_user_processes(user, sig=signal.SIGTERM, timeout=5, sure_kill=False):
    """Kill all processes for a specific user.

        user (str): The user name to match against.
        sig (signal, optional): See signal module constants. Defaults to signal.SIGTERM.
        timeout (int, optional): How long to wait for a process to terminate (seconds) before raise exception or, if sure_kill=True, send a SIGKILL. Defaults to 5.
        sure_kill (bool, optional): Whether to fallback to SIGKILL if the timeout exceeded for the terminate operation or not. Defaults to False.

        list of psutil.Process): list of process objects that remain alive if any.
    failed_processes = []
    for proc in get_user_processes(user):
            kill(proc, sig=sig, timeout=timeout, sure_kill=sure_kill)
        except (j.exceptions.Runtime, j.exceptions.Permission) as e:
            j.logger.exception("ignoring an exception that occurred while iterating over user processes", exception=e)

    # making sure
    if failed_processes:
        gone, failed_processes = psutil.wait_procs(failed_processes, timeout=0)

    return failed_processes

def get_similar_processes(target_proc=None):
    """Gets similar processes to current process, started with same command line and same options.

        target_proc (int or psutil.Process, optional): pid, or psutil.Process object.
            if None then pid for current process will be used. Defaults to None.

        psutil.Process: psutil.Process object for all processes similar to a given process.
        if target_proc is None:
            target_proc = get_my_process()
        elif isinstance(target_proc, int):
            target_proc = get_process_object(target_proc, die=True)
        for proc in psutil.process_iter(["name", "exe", "cmdline"]):
            if["cmdline"] and target_proc.cmdline() and["cmdline"] == target_proc.cmdline():
                yield proc
    except (psutil.AccessDenied, psutil.NoSuchProcess):

def check_running(process_name, min=1):
    """Check if there are a specific number of running processes that match the given name.

    Function will check string against, Process.exe() and Process.cmdline().

        process_name (str): the target process name
        min (int, optional): min number of instances required to be running. Defaults to 1.

        bool: true if process is running, otherwise False
    pids = get_pids(process_name, limit=min)
    return len(pids) == min

def check_process_for_pid(pid, process_name):
    """Check whether a given pid actually does belong to a given process name.

        pid (int): Process ID
        process (str): String to match againset candidate processes name using equality operator

        bool: True if process_name matched process name of the pid, False otherwise.
        proc = psutil.Process(pid)
        return == process_name
    except (psutil.AccessDenied, psutil.NoSuchProcess):
        return False

def set_env_var(var_names, var_values):
    """Set the value of the environment variables {varnames}. Existing variable are overwritten

    Such changes to the environment affect subprocesses started with os.system(), popen() or fork() and execv()

        var_names (list of str): A list of the names of all the environment variables to set
        varvalues (list of str): A list of all values for the environment variables

        j.exceptions.RuntimeError: if error happened during setting the environment variables
    # Note:
    # On some platforms, including FreeBSD and Mac OS X, setting environ may cause memory leaks.
    # Refer to the system documentation for putenv().
    for i in range(len(var_names)):
        os.environ[var_names[i]] = str(var_values[i]).strip()

def get_pid_by_port(port, ipv6=False, udp=False):
    """Returns the PID of the process that is listening on the given port

        port (int): Port number to lookup for.
        ipv6 (bool, optional): Whether to search the connections that using ipv6 instead of ipv4. Defaults to False.
        udp (bool, optional): Whether to search the connections for UDP port instead of TCP. Defaults to False.

        int or None: PID for the proceses that listen on that port.

    process = get_process_by_port(port, ipv6=ipv6, udp=udp)
    if process:

def kill_process_by_name(process_name, sig=signal.SIGTERM, match_predicate=None, timeout=5, sure_kill=False):
    """Kill all processes that match 'process_name'.

        process_name (str): The target process name.
        sig (signal, optional): See signal module constants. Defaults to signal.SIGKILL
        match_predicate (callable, optional): Function that does matching between\
            found processes and the targeted process, the function should accept\
            two arguments and return a boolean. Defaults to None.
        timeout (int, optional): How long to wait for a process to terminate (seconds) before raise exception\
            or, if sure_kill=True, send a SIGKILL. Defaults to 5.
        sure_kill (bool, optional): Whether to fallback to SIGKILL if the timeout exceeded for the terminate operation or not. Defaults to False.

        list of int: represents the IDs of the processes remaning alive if any.
    pids = get_pids(process_name, match_predicate=match_predicate)
    failed_processes = []
    for pid in pids:
            kill(pid, sig, timeout=timeout, sure_kill=sure_kill)
        except (j.exceptions.Runtime, j.exceptions.Permission):
    return failed_processes

def kill_all_pids(pids, sig=signal.SIGTERM, timeout=5, sure_kill=False):
    """Kill all processes with given pids.

        pids (list of int): The target processes IDs.
        sig (signal, optional): See signal module constants. Defaults to signal.SIGKILL.
        timeout (int, optional): How long to wait for a process to terminate (seconds) before raise exception\
            or, if sure_kill=True, send a SIGKILL. Defaults to 5.
        sure_kill (bool, optional): Whether to fallback to SIGKILL if the timeout exceeded for the terminate operation or not. Defaults to False.

        list of int: represents the IDs of the processes remaning alive if any.
    failed_processes = []
    for pid in pids:
            kill(pid, sig, timeout=timeout, sure_kill=sure_kill)
        except (j.exceptions.Runtime, j.exceptions.Permission):
    return failed_processes

def kill_process_by_port(port, ipv6=False, udp=False, sig=signal.SIGTERM, timeout=5, sure_kill=False):
    """Kill process by port.

        port (int): The port number.
        ipv6 (bool, optional): Whether to search the connections that using ipv6 instead of ipv4. Defaults to False.
        udp (bool, optional): Whether to search the connections for UDP port instead of TCP. Defaults to False.
        sig (signal, optional): See signal module constants. Defaults to signal.SIGTERM.
        timeout (int, optional): How long to wait for a process to terminate (seconds) before raise exception
            or, if sure_kill=True, send a SIGKILL. Defaults to 5.
        sure_kill (bool, optional): Whether to fallback to SIGKILL if the timeout exceeded for the terminate operation or not. Defaults to False.

        j.exceptions.Runtime: In case killing the process failed.
        j.exceptions.Permission: In case the permission to perform this action is denied.
    proc = get_process_by_port(port, ipv6=ipv6, udp=udp)
    kill(proc, sig=sig, timeout=timeout, sure_kill=sure_kill)

def is_port_listening(port, ipv6=False):
    """Check if the TCP port is being used by any process

        port (int): Port number
        ipv6 (bool, optional): Whether to ipv6 localhost address instead of ipv4 localhost address. Defaults to False.

        bool: True if port is used, False otherwise.
    from jumpscale.sals import nettools

    ip6 = "::"
    ip4 = ""
    return nettools.tcp_connection_test(ip6 if ipv6 else ip4, port, timeout=5)

def get_process_by_port(port, ipv6=False, udp=False):
    """Returns the psutil.Process object that is listening on the given port.

        port (int): The port for which to find the process.
        ipv6 (bool, optional): Whether to search the connections that using ipv6 instead of ipv4. Defaults to False.
        udp (bool, optional): Whether to search the connections for UDP port instead of TCP. Defaults to False.

        j.exceptions.Runtime: pid is not retrievable.
        j.exceptions.NotFound: if the process is no longer exists.
        j.exceptions.Permission: if the process is not accessible by the user.

        psutil.Process: process object if found, otherwise None
    for conn in psutil.net_connections():  # TODO use kind parameter
            # should we check against ESTABLISHED status?
            # connection.status For UDP and UNIX sockets this is always going to be psutil.CONN_NONE
            if (
                conn.laddr.port == port
                and conn.status in ["LISTEN", "NONE", "ESTABLISHED"]
                and ( == "AF_INET6") == ipv6
                and ( == "SOCK_DGRAM") == udp
                    return psutil.Process(
                    raise j.exceptions.Runtime("pid is not retrievable, not root?")
        except psutil.NoSuchProcess:
            raise j.exceptions.NotFound("Process is no longer exists")
        except psutil.AccessDenied:
            raise j.exceptions.Permission("Permission denied")

def get_defunct_processes():
    """Gets defunct (zombie) processes.

        list of int: List of processes ID(s).
    zombie_pids = []
    for proc in psutil.process_iter():
            if proc.status() == psutil.STATUS_ZOMBIE:
        except (psutil.AccessDenied, psutil.NoSuchProcess):
    return zombie_pids

def get_processes():
    """Get an interator for all running processes

        psutil.Process: for all processes running
    yield from psutil.process_iter()

def get_processes_info(user=None, sort="mem", filterstr=None, limit=25, desc=True):
    """Get information for top running processes sorted by memory usage or CPU usage.

        user ([type], optional): filter the processes by username. Defaults to None.
        sort (str, optional): sort processes by resource usage, Defaults to 'mem'.
            available option:
                'rss' and its alias 'mem': sort by processes which consumed the most memory (Resident Set Size).
                'cpu_times' and its alias 'cpu_time': sort by processes which consumed the most CPU time.
                'cpu_num': sort by the CPU number this process is currently running on.
                'cpu_percent': sort by a float representing the process CPU utilization as a percentage which can also\
                    be > 100.0 in case of a process running multiple threads on different CPUs.
                'memory_percent': sort py the process memory utilization, the process memory to total physical system memory as a percentage
                'create_time': the process creation time as a floating point number expressed in seconds since the epoch.
                'gids' and its alias 'egid': the effective group id of this process
                'uids' and its alias 'euid': the effective user id of this process
                'pid': sort by the process PID.
                'ppid': sort by the process parent PID
                'name': sort by the processes name
                'username': sort by the name of the user that owns the process.
                'status': sort by the current process status, one of the psutil.STATUS_* constants
        filterstr (str, optional): the string to match against process name or command used and filter the results based on.
        limit (int, optional): limit the results to specific number of processes, to disable set it to -1. Defaults to 25.
        desc (bool, optional): whether to sort the data returned in descending order or not. Defaults to True.

        dict: processes info as a dictionary
            available keys [

    def _get_sort_key(procObj):
        if sort == "mem":
            return procObj["rss"]
        if sort == "cpu_times":
            return procObj["cpu_time"]
        elif sort in ["gids", "egid"]:
            return procObj["gids"].effective
        elif sort in ["uids", "euid"]:
            return procObj["uids"].effective
                return procObj[sort]
            except KeyError:
                j.logger.error(f"bad field name for sorting: {sort}")
                raise j.exceptions.Value(f"bad field name for sorting: {sort}")

    processes_list = []
    if not filterstr:
        if user:
            p_source = get_user_processes(user=user)
            p_source = get_processes()
        # it makes sense that get_pids func should returns list of psutil.Process objects instead of list of pids
        if user:
            p_source = map(get_process_object, get_pids(process_name=filterstr, _alt_source=get_user_processes(user)))
            p_source = map(get_process_object, get_pids(process_name=filterstr))
    for proc in p_source:
        if proc:  # in case a race condition happened, and get_process_object returned None
                # Fetch process details as dict
                pinfo = proc.as_dict(
                pinfo["rss"] = proc.memory_info().rss / (
                    1024 * 1024
                )  # the non-swapped physical memory a process has used in Mb
                pinfo["cpu_time"] = sum(pinfo["cpu_times"][:2])  # cumulative, excluding children and iowait
                pinfo["ports"] = []
                    connections = proc.connections()  # need root
                except (psutil.AccessDenied):
                    if connections:
                        for conn in connections:
                            pinfo["ports"].append({"port": conn.laddr.port, "status": conn.status})
                # Append dict to list
            except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess) as e:
                    "ignoring and logging an exception that occurred while iterating over system processes, not root?",
    # sort the processes list by sort_key
    sorted_processes = sorted(processes_list, key=_get_sort_key, reverse=desc)[:limit]
    return sorted_processes

def get_ports_mapping(status=psutil.CONN_LISTEN):
    """Get a mapping for process to ports with a status filter

    It will skip any process in case of errors (e.g. permission error)

        >>> from jumpscale.loader import j
        >>> import psutil
        >>> j.sals.process.get_ports_mapping(psutil.CONN_ESTABLISHED)
        >>> # or
        >>> j.sals.process.get_ports_mapping("ESTABLISHED")

        status (psutil.CONN_CONSTANT): `psutil` CONN_* constant as a filter. Defaults to psutil.CONN_LISTEN.

        defaultdict: a mapping between process and ports
    ports = defaultdict(list)

    for process in get_processes():
            connections = process.connections()
        except psutil.Error:

        if connections:
            for conn in connections:
                if conn.status == status:

    return ports

def get_memory_usage():
    """Get memory status

        dict: Memory status info, available keys ('total', 'used', 'percent')
            'total': total physical memory in Gb (exclusive swap).
            'used': memory used in Gb, calculated differently depending on the platform and designed for informational purposes only.
                total - free does not necessarily match used.
            'percent': the percentage of used memory.
    memory_usage = {}
    memory_data = dict(psutil.virtual_memory()._asdict())
    memory_usage["total"] = math.ceil(
        memory_data.get("total") / (1024 * 1024 * 1024)
    )  # total physical memory (exclusive swap).
    memory_usage["used"] = math.ceil(memory_data.get("used") / (1024 * 1024 * 1024))
    memory_usage["percent"] = memory_data.get("percent")
    return memory_usage

def get_environ(pid):
    """Gets env vars for a specific process based on pid

        pid (int): process pid

        j.exceptions.NotFound: if the process is no longer exists.
        j.exceptions.Permission: if the process is not accessible by the user.

        dict: dict of env variables
        proc = get_process_object(pid, die=True)
        return proc.environ()
    except psutil.NoSuchProcess:
        raise j.exceptions.NotFound("Process is no longer exists")
    except psutil.AccessDenied:
        raise j.exceptions.Permission("Permission denied")

def kill_proc_tree(
    parent, sig=signal.SIGTERM, include_parent=True, include_grand_children=True, timeout=5, sure_kill=False
    """Kill a process and its children (including grandchildren) with signal `sig`

        proc (int or psutil.Process): Target process ID (PID) or psutil.Process object.
        sig (signal, optional): See signal module constants. Defaults to signal.SIGTERM.
        include_parent (): Whether to kill the process itself. Defaults to True.
        include_grand_children (): whether to kill recursively all grandchildren. Defaults to True.
        timeout (int, optional): How long to wait for a process to terminate (seconds) before raise exception\
            or, if sure_kill=True, send a SIGKILL. Defaults to 5.
        sure_kill (bool, optional): Whether to fallback to SIGKILL if the timeout exceeded for the terminate operation or not. Defaults to False.

        list of psutil.Process: represents the objects of the processes remaning alive if any.

        AssertionError: in case the given `parent` is the current process
    if isinstance(parent, int):
        parent = get_process_object(parent)
        if parent is None:
            return  # already dead

    # should be checked on any killing function
    # here we first need to make sure taht `include_parent` is True
    # and/or better check inside the below for loop
    assert != os.getpid(), "won't kill myself"

    processes = parent.children(recursive=include_grand_children)[::-1]
    failed = []
    if include_parent:

    for p in processes:
            kill(p, sig=sig, timeout=timeout, sure_kill=sure_kill)
        except (j.exceptions.Runtime, j.exceptions.Permission):

    # making sure
    if failed:
        gone, failed = psutil.wait_procs(failed, timeout=0)
    return failed

def in_docker():
    """will check if we are in a docker.

        bool: True if in docker. False otherwise.
    rc, out, _ = j.sals.process.execute("cat /proc/1/cgroup", die=False, showout=False)
    return rc == 0 and "/docker/" in out

def in_host():
    """Will check if we are in a host.

        bool: True if in host. False otherwise.
    return not in_docker()


def check_process_for_pid(pid, process_name)

Check whether a given pid actually does belong to a given process name.


pid : int
Process ID
process : str
String to match againset candidate processes name using equality operator


True if process_name matched process name of the pid, False otherwise.
Expand source code
def check_process_for_pid(pid, process_name):
    """Check whether a given pid actually does belong to a given process name.

        pid (int): Process ID
        process (str): String to match againset candidate processes name using equality operator

        bool: True if process_name matched process name of the pid, False otherwise.
        proc = psutil.Process(pid)
        return == process_name
    except (psutil.AccessDenied, psutil.NoSuchProcess):
        return False
def check_running(process_name, min=1)

Check if there are a specific number of running processes that match the given name.

Function will check string against, Process.exe() and Process.cmdline().


process_name : str
the target process name
min : int, optional
min number of instances required to be running. Defaults to 1.


true if process is running, otherwise False
Expand source code
def check_running(process_name, min=1):
    """Check if there are a specific number of running processes that match the given name.

    Function will check string against, Process.exe() and Process.cmdline().

        process_name (str): the target process name
        min (int, optional): min number of instances required to be running. Defaults to 1.

        bool: true if process is running, otherwise False
    pids = get_pids(process_name, limit=min)
    return len(pids) == min
def check_start(cmd, filterstr, n_instances=1, retry=1, timeout=2, delay=0.5)

Run command (possibly multiple times) and check if it is started based on filterstr


cmd : str or list of str
Command to be executed.
filterstr : str
Filter string. will match against against, Process.exe() and Process.cmdline()
n_instances : int, optional
Number of needed instances. Defaults to 1.
retry : int, optional
Number of retries to execute the command and check. Defaults to 1.
timout : int, optional
how long the function should wait for process to finish (first or parnet process started with the command)
delay : int, optional
how long the function should delay the checking process after the process finished or timeout exceeded (in case the first process started another one).


will be raised if we didn't reach number of required instances.
Expand source code
def check_start(cmd, filterstr, n_instances=1, retry=1, timeout=2, delay=0.5):
    """Run command (possibly multiple times) and check if it is started based on filterstr

        cmd (str or list of str): Command to be executed.
        filterstr (str): Filter string. will match against against, Process.exe() and Process.cmdline()
        n_instances (int, optional): Number of needed instances. Defaults to 1.
        retry (int, optional): Number of retries to execute the command and check. Defaults to 1.
        timout (int, optional): how long the function should wait for process to finish (first or parnet process started with the command)
        delay (int, optional): how long the function should delay the checking process after the process finished or timeout exceeded (in case the first process started another one).

        j.exceptions.Runtime: will be raised if we didn't reach number of required instances.
    for i in range(retry):
        if isinstance(cmd, str):
            args = shlex.split(cmd)
        proc = psutil.Popen(args, close_fds=True, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE)
            rc = proc.wait(timeout)  # makesure the process is stable
            if rc != 0:  # executing the command succeeded but exited immediately!
                output, error_output = proc.communicate()
                j.logger.error(f"the start command exited with error: {error_output}")  # the process exited with error
        except psutil.TimeoutExpired:
            pass  # still running
        # wait extra delay to allow any subprocess spawned from the process we just started to finish
        # this may not needed in many cases
        # TODO check based on command
        if check_running(filterstr, min=n_instances):
            # found at least {n_instances} instances using the filter string {filterstr}
            # the required number of instances using the filter string {filterstr} not found yet!
    j.logger.error(f"could not start the required number of instances ({n_instances}) after {i} attempts.")
    raise j.exceptions.Runtime("could not start the required number of instances.")
def check_stop(cmd, filterstr, retry=1, n_instances=0, timeout=2, delay=0.5)

Executes a stop command (possibly multiple times) and check if it is already stopped based on filterstr


cmd : str
Command to be executed.
filterstr : str
Filter string.
retry : int, optional
Number of retries. Defaults to 1.
n_inst : int, optional
Number of instances after stop. Defaults to 0.
timout : int, optional
how long the function should wait for process to finish (first or parnet process started with the command)
delay : int, optional
how long the function should delay the checking process after the process finished or timeout exceeded (in case the first process started another one).


if number of instances not matched
Expand source code
def check_stop(cmd, filterstr, retry=1, n_instances=0, timeout=2, delay=0.5):
    """Executes a stop command (possibly multiple times) and check if it is already stopped based on filterstr

        cmd (str): Command to be executed.
        filterstr (str): Filter string.
        retry (int, optional): Number of retries. Defaults to 1.
        n_inst (int, optional): Number of instances after stop. Defaults to 0.
        timout (int, optional): how long the function should wait for process to finish (first or parnet process started with the command)
        delay (int, optional): how long the function should delay the checking process after the process finished or timeout exceeded (in case the first process started another one).

        j.exceptions.Runtime: if number of instances not matched

    for i in range(retry):
        if isinstance(cmd, str):
            args = shlex.split(cmd)
        proc = psutil.Popen(args, close_fds=True, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE)
            rc = proc.wait(timeout)  # makesure the process is stable
            # print(f'rc: {rc}')
            if rc != 0:
                # the process exited with error
                output, error_output = proc.communicate()
                j.logger.warnnig(f"the stop command exited with error: {error_output}")
        except psutil.TimeoutExpired:
            # still running
        # wait extra delay to allow any subprocess spawned from the process we just started to finish
        # this may not needed in many cases
        found = get_pids(filterstr)
        if len(found) == n_instances:
            # the required {n_instances} matching the instances found using the filter string: {filterstr}
            # the required {n_instances} not matching the instances number found using the filter string: {filterstr} yet
    # could not match the required number of instances {n_instances} after {i} attempts.
    raise j.exceptions.Runtime(f"could not stop {cmd}, found {len(found)} of instances instead of {n_instances}")
def execute(cmd, showout=False, cwd=None, shell='/bin/bash', timeout=600, asynchronous=False, env=None, replace_env=False, die=False)

Execute a command.

Accepts command as a list too, with auto-escaping.


cmd : str or list of str
Command to be executed, e.g. "ls -la" or ["ls", "-la"]
showout : bool, optional
Whether to show stdout of the command or not. Defaults to False.
cwd : str, optional
Path to cd into before running command. Defaults to None.
shell : str, optional
Specify a working directory for the command. Defaults to "/bin/bash".
timeout : int, optional
Timeout before kill the process. Defaults to 600.
asynchronous : bool, optional
Whether to execute in asynchronous mode or not. Defaults to False.
env : dict, optional
Add environment variables here. Defaults to None.
replace_env : bool, optional
Whether to replace the entire environment with env. Defaults to False.
die : bool, optional
Whether to raise exception if command failed or not. Defaults to False.


tuple[return_code: int, stdout: str, stderr: str]
Expand source code
def execute(
    """Execute a command.

    Accepts command as a list too, with auto-escaping.

        cmd (str or list of str): Command to be executed, e.g. "ls -la" or ["ls", "-la"]
        showout (bool, optional): Whether to show stdout of the command or not. Defaults to False.
        cwd (str, optional): Path to `cd` into before running command. Defaults to None.
        shell (str, optional): Specify a working directory for the command. Defaults to "/bin/bash".
        timeout (int, optional): Timeout before kill the process. Defaults to 600.
        asynchronous (bool, optional): Whether to execute in asynchronous mode or not. Defaults to False.
        env (dict, optional): Add environment variables here. Defaults to None.
        replace_env (bool, optional): Whether to replace the entire environment with env. Defaults to False.
        die (bool, optional): Whether to raise exception if command failed or not. Defaults to False.

        tuple: tuple[return_code: int, stdout: str, stderr: str]
    return j.core.executors.run_local(
        hide=not showout,
        env=env or {},
        warn=not die,
def get_defunct_processes()

Gets defunct (zombie) processes.


list of int
List of processes ID(s).
Expand source code
def get_defunct_processes():
    """Gets defunct (zombie) processes.

        list of int: List of processes ID(s).
    zombie_pids = []
    for proc in psutil.process_iter():
            if proc.status() == psutil.STATUS_ZOMBIE:
        except (psutil.AccessDenied, psutil.NoSuchProcess):
    return zombie_pids
def get_environ(pid)

Gets env vars for a specific process based on pid


pid : int
process pid


if the process is no longer exists.
if the process is not accessible by the user.


dict of env variables
Expand source code
def get_environ(pid):
    """Gets env vars for a specific process based on pid

        pid (int): process pid

        j.exceptions.NotFound: if the process is no longer exists.
        j.exceptions.Permission: if the process is not accessible by the user.

        dict: dict of env variables
        proc = get_process_object(pid, die=True)
        return proc.environ()
    except psutil.NoSuchProcess:
        raise j.exceptions.NotFound("Process is no longer exists")
    except psutil.AccessDenied:
        raise j.exceptions.Permission("Permission denied")
def get_filtered_pids(filterstr, excludes=None)

Get pids filtered by filterstr and excludes, matching against the full command line used to start the process.


filterstr : str
the String to filter based on.
excludes : list[str]
exclude list. Defaults to None.


list of int
List of the processes IDs
Expand source code
def get_filtered_pids(filterstr, excludes=None):
    """Get pids filtered by filterstr and excludes, matching against the full command line used to start the process.

        filterstr (str): the String to filter based on.
        excludes (list[str]): exclude list. Defaults to None.

        list of int: List of the processes IDs
    pids = []
    for proc in psutil.process_iter(["name", "cmdline"]):
        cmd_line = " ".join(["cmdline"])
        if["cmdline"] and filterstr in cmd_line:
            # found filter string: {filterstr} in command line: {cmd_line}
            if excludes:
                for exclude in excludes:
                    if exclude in cmd_line:
                        # we excluded this because it contain exclude string
                else:  # intended `for/else` meaning for loop finished normally with no break
                    pids.append(  # may yield proc instead
                pids.append(  # may yield proc instead
    # if pids is empty root could be needed
    return pids
def get_memory_usage()

Get memory status


Memory status info, available keys ('total', 'used', 'percent') 'total': total physical memory in Gb (exclusive swap). 'used': memory used in Gb, calculated differently depending on the platform and designed for informational purposes only. total - free does not necessarily match used. 'percent': the percentage of used memory.
Expand source code
def get_memory_usage():
    """Get memory status

        dict: Memory status info, available keys ('total', 'used', 'percent')
            'total': total physical memory in Gb (exclusive swap).
            'used': memory used in Gb, calculated differently depending on the platform and designed for informational purposes only.
                total - free does not necessarily match used.
            'percent': the percentage of used memory.
    memory_usage = {}
    memory_data = dict(psutil.virtual_memory()._asdict())
    memory_usage["total"] = math.ceil(
        memory_data.get("total") / (1024 * 1024 * 1024)
    )  # total physical memory (exclusive swap).
    memory_usage["used"] = math.ceil(memory_data.get("used") / (1024 * 1024 * 1024))
    memory_usage["percent"] = memory_data.get("percent")
    return memory_usage
def get_my_process()

Get psutil.Process object of the current process.


Process object of the current process.
Expand source code
def get_my_process():
    """Get psutil.Process object of the current process.

        psutil.Process: Process object of the current process.
    return get_process_object(os.getpid(), die=True)
def get_pid_by_port(port, ipv6=False, udp=False)

Returns the PID of the process that is listening on the given port


port : int
Port number to lookup for.
ipv6 : bool, optional
Whether to search the connections that using ipv6 instead of ipv4. Defaults to False.
udp : bool, optional
Whether to search the connections for UDP port instead of TCP. Defaults to False.


int or None
PID for the proceses that listen on that port.
Expand source code
def get_pid_by_port(port, ipv6=False, udp=False):
    """Returns the PID of the process that is listening on the given port

        port (int): Port number to lookup for.
        ipv6 (bool, optional): Whether to search the connections that using ipv6 instead of ipv4. Defaults to False.
        udp (bool, optional): Whether to search the connections for UDP port instead of TCP. Defaults to False.

        int or None: PID for the proceses that listen on that port.

    process = get_process_by_port(port, ipv6=ipv6, udp=udp)
    if process:
def get_pids(process_name, match_predicate=None, limit=0, include_zombie=False, full_cmd_line=False)

Return a list of processes ID(s) matching a given process name.

Function will check string against, Process.exe() and Process.cmdline()


process_name : str
The target process name
match_predicate : callable, optional
Function that does matching between found processes and the targeted process. the function should accept two arguments and return a boolean. Defaults to None.
limit : int, optional
If not equal to 0, function will return as fast as the number of PID(s) found become equal to limit value.
_alt_source(callable or iterable, optional): Can be used to specify an alternative source of the psutil.Process objects to match against.
ex: get_user_processes func, or get_similar_processes.
if not specified, psutil.process_iter will be used. Defaults to None.
include_zombie : bool, optional
Whether to include pid for zombie proccesses or not. Defaults to False.
full_cmd_line : bool, optional
The pattern is normally only matched against the process name. if full_cmd_line is set to True, the full command line is used. Defaults to False.


list of int
List of the processes IDs.
Expand source code
def get_pids(process_name, match_predicate=None, limit=0, _alt_source=None, include_zombie=False, full_cmd_line=False):
    """Return a list of processes ID(s) matching a given process name.

    Function will check string against, Process.exe() and Process.cmdline()

        process_name (str): The target process name
        match_predicate (callable, optional): Function that does matching between found processes and the targeted process.
            the function should accept two arguments and return a boolean. Defaults to None.
        limit (int, optional): If not equal to 0, function will return as fast as the number of PID(s) found become equal to `limit` value.
        _alt_source(callable or iterable, optional): Can be used to specify an alternative source of the psutil.Process objects to match against.
            ex: get_user_processes func, or get_similar_processes.
            if not specified, psutil.process_iter will be used. Defaults to None.
        include_zombie (bool, optional): Whether to include pid for zombie proccesses or not. Defaults to False.
        full_cmd_line (bool, optional): The pattern is normally only matched against the process name.
            if full_cmd_line is set to True, the full command line is used. Defaults to False.

        list of int: List of the processes IDs.
    # default match predicate
    def default_predicate(target, given):
        if isinstance(given, list):
            return target in given
            return target.strip().lower() == given.lower()

    default_processes_source = psutil.process_iter(["name", "exe", "cmdline"])

    match_predicate = match_predicate or default_predicate
    p_source = _alt_source or default_processes_source

    pids = []
    for proc in p_source:
            if not include_zombie and proc.status() == psutil.STATUS_ZOMBIE:
                # {} is a zombie process, ignoring it

            candidates = [["name"]]
                if full_cmd_line:

            if any([match_predicate(process_name, candidate) for candidate in candidates]):
                # return early if no need to iterate over all running process
                if limit and len(pids) == limit:
                    return pids
        except psutil.Error:
    return pids
def get_pids_filtered_by_regex(regex_list)

Get pids of a process filtered by Regex list, matching against the full command line used to start the process.


regex_list : list[str]
List of regex expressions.


list of int
List of the processes IDs.
Expand source code
def get_pids_filtered_by_regex(regex_list):
    """Get pids of a process filtered by Regex list, matching against the full command line used to start the process.

        regex_list (list[str]): List of regex expressions.

        list of int: List of the processes IDs.
    res = []
    for process in psutil.process_iter(attrs=["cmdline"]):
            cmdline = " ".join(["cmdline"])
            for r in regex_list:
                if re.match(r, cmdline):
    return res
def get_pids_filtered_sorted(filterstr, sortkey=None, desc=False)

Get pids of process by a filter string and optionally sort by sortkey


filterstr : str
filter string.
sortkey : str, optional
Defaults to None. (if no sortkey used it will sort by pid(s) in ascending order). sortkey can be one of the following: %cpu cpu utilization of the process in %mem ratio of the process's resident set size to the physical memory on the machine, expressed as a percentage. cputime cumulative CPU time, "[DD-]hh:mm:ss" format. (alias time). egid effective group ID number of the process as a decimal integer. (alias gid). egroup effective group ID of the process. This will be the textual group ID, if it can be obtained and the field width permits, or a decimal representation otherwise. (alias group). euid effective user ID (alias uid). euser effective user name. gid see egid. (alias egid). pid a number representing the process ID (alias tgid). ppid parent process ID. psr processor that process is currently assigned to. start_time starting time or date of the process.
(bool, optional): Whether to sort the processes in descending order or not(asc). Defaults to False (asc).


list of int
list of the processes IDs
Expand source code
def get_pids_filtered_sorted(filterstr, sortkey=None, desc=False):
    """Get pids of process by a filter string and optionally sort by sortkey

        filterstr (str): filter string.
        sortkey (str, optional): Defaults to None. (if no sortkey used it will sort by pid(s) in ascending order).
            sortkey can be one of the following:
            %cpu           cpu utilization of the process in
            %mem           ratio of the process's resident set size  to the physical memory on the machine, expressed as a percentage.
            cputime        cumulative CPU time, "[DD-]hh:mm:ss" format.  (alias time).
            egid           effective group ID number of the process as a decimal integer.  (alias gid).
            egroup         effective group ID of the process. This will be the textual group ID, if it can be obtained and the field width permits, or a decimal representation otherwise.  (alias group).
            euid           effective user ID (alias uid).
            euser          effective user name.
            gid            see egid. (alias egid).
            pid            a number representing the process ID (alias tgid).
            ppid           parent process ID.
            psr            processor that process is currently assigned to.
            start_time     starting time or date of the process.
        desc: (bool, optional): Whether to sort the processes in descending order or not(asc). Defaults to False (asc).

        list of int: list of the processes IDs
    ps_to_psutil_map = {
        "%cpu": "cpu_percent",
        "%mem": "memory_percent",
        "cputime": "cpu_time",
        "psr": "cpu_num",
        "start_time": "create_time",
        "egid": "egid",
        "gid": "egid",
        "euid": "euid",
        "uid": "euid",
        "euser": "username",
        "pid": "pid",
        "ppid": "ppid",
    if sortkey is None:  # mimic default ps commnad sorting behavior
        sortkey = "pid"
    # return pids from process objects
    return [p["pid"] for p in get_processes_info(sort=ps_to_psutil_map[sortkey], filterstr=filterstr, desc=desc)]
def get_ports_mapping(status='LISTEN')

Get a mapping for process to ports with a status filter

It will skip any process in case of errors (e.g. permission error)


>>> from jumpscale.loader import j
>>> import psutil
>>> j.sals.process.get_ports_mapping(psutil.CONN_ESTABLISHED)
>>> # or
>>> j.sals.process.get_ports_mapping("ESTABLISHED")


status : psutil.CONN_CONSTANT
psutil CONN_* constant as a filter. Defaults to psutil.CONN_LISTEN.


a mapping between process and ports
Expand source code
def get_ports_mapping(status=psutil.CONN_LISTEN):
    """Get a mapping for process to ports with a status filter

    It will skip any process in case of errors (e.g. permission error)

        >>> from jumpscale.loader import j
        >>> import psutil
        >>> j.sals.process.get_ports_mapping(psutil.CONN_ESTABLISHED)
        >>> # or
        >>> j.sals.process.get_ports_mapping("ESTABLISHED")

        status (psutil.CONN_CONSTANT): `psutil` CONN_* constant as a filter. Defaults to psutil.CONN_LISTEN.

        defaultdict: a mapping between process and ports
    ports = defaultdict(list)

    for process in get_processes():
            connections = process.connections()
        except psutil.Error:

        if connections:
            for conn in connections:
                if conn.status == status:

    return ports
def get_process_by_port(port, ipv6=False, udp=False)

Returns the psutil.Process object that is listening on the given port.


port : int
The port for which to find the process.
ipv6 : bool, optional
Whether to search the connections that using ipv6 instead of ipv4. Defaults to False.
udp : bool, optional
Whether to search the connections for UDP port instead of TCP. Defaults to False.


pid is not retrievable.
if the process is no longer exists.
if the process is not accessible by the user.


process object if found, otherwise None
Expand source code
def get_process_by_port(port, ipv6=False, udp=False):
    """Returns the psutil.Process object that is listening on the given port.

        port (int): The port for which to find the process.
        ipv6 (bool, optional): Whether to search the connections that using ipv6 instead of ipv4. Defaults to False.
        udp (bool, optional): Whether to search the connections for UDP port instead of TCP. Defaults to False.

        j.exceptions.Runtime: pid is not retrievable.
        j.exceptions.NotFound: if the process is no longer exists.
        j.exceptions.Permission: if the process is not accessible by the user.

        psutil.Process: process object if found, otherwise None
    for conn in psutil.net_connections():  # TODO use kind parameter
            # should we check against ESTABLISHED status?
            # connection.status For UDP and UNIX sockets this is always going to be psutil.CONN_NONE
            if (
                conn.laddr.port == port
                and conn.status in ["LISTEN", "NONE", "ESTABLISHED"]
                and ( == "AF_INET6") == ipv6
                and ( == "SOCK_DGRAM") == udp
                    return psutil.Process(
                    raise j.exceptions.Runtime("pid is not retrievable, not root?")
        except psutil.NoSuchProcess:
            raise j.exceptions.NotFound("Process is no longer exists")
        except psutil.AccessDenied:
            raise j.exceptions.Permission("Permission denied")
def get_process_object(pid, die=False)

Get psutil.Process object of a given process ID (PID).


pid : int
Process ID (PID) to get
die : bool, optional
Whether to raise an exception if no process with the given PID is found in the current process list or not. Defaults to False.


If process with the given PID is not found and die set to True.
If permission denied.


psutil.Process or None
The Process object of the given PID if found, otherwise None, if die set to False.
Expand source code
def get_process_object(pid, die=False):
    """Get psutil.Process object of a given process ID (PID).

        pid (int): Process ID (PID) to get
        die (bool, optional): Whether to raise an exception if no process with the given PID is found in the
            current process list or not. Defaults to False.

        psutil.NoSuchProcess: If process with the given PID is not found and die set to True.
        psutil.AccessDenied: If permission denied.

        psutil.Process or None: The Process object of the given PID if found, otherwise None, if die set to False.
        return psutil.Process(pid)
    except (psutil.AccessDenied, psutil.NoSuchProcess) as e:
        # when you query processess owned by another user, especially on macOS and Windows you may get AccessDenied exception
        if die:
            raise e
            return None
def get_processes()

Get an interator for all running processes


for all processes running
Expand source code
def get_processes():
    """Get an interator for all running processes

        psutil.Process: for all processes running
    yield from psutil.process_iter()
def get_processes_info(user=None, sort='mem', filterstr=None, limit=25, desc=True)

Get information for top running processes sorted by memory usage or CPU usage.


user : [type], optional
filter the processes by username. Defaults to None.
sort : str, optional
sort processes by resource usage, Defaults to 'mem'. available option: 'rss' and its alias 'mem': sort by processes which consumed the most memory (Resident Set Size). 'cpu_times' and its alias 'cpu_time': sort by processes which consumed the most CPU time. 'cpu_num': sort by the CPU number this process is currently running on. 'cpu_percent': sort by a float representing the process CPU utilization as a percentage which can also be > 100.0 in case of a process running multiple threads on different CPUs. 'memory_percent': sort py the process memory utilization, the process memory to total physical system memory as a percentage 'create_time': the process creation time as a floating point number expressed in seconds since the epoch. 'gids' and its alias 'egid': the effective group id of this process 'uids' and its alias 'euid': the effective user id of this process 'pid': sort by the process PID. 'ppid': sort by the process parent PID 'name': sort by the processes name 'username': sort by the name of the user that owns the process. 'status': sort by the current process status, one of the psutil.STATUS_* constants
filterstr : str, optional
the string to match against process name or command used and filter the results based on.
limit : int, optional
limit the results to specific number of processes, to disable set it to -1. Defaults to 25.
desc : bool, optional
whether to sort the data returned in descending order or not. Defaults to True.


processes info as a dictionary available keys [ "cpu_num", "cpu_percent", "cpu_times", "create_time", "gids", "memory_percent", "name", "pid", "ppid", "status", "uids", "username", "rss", "cpu_time", "ports" ]
Expand source code
def get_processes_info(user=None, sort="mem", filterstr=None, limit=25, desc=True):
    """Get information for top running processes sorted by memory usage or CPU usage.

        user ([type], optional): filter the processes by username. Defaults to None.
        sort (str, optional): sort processes by resource usage, Defaults to 'mem'.
            available option:
                'rss' and its alias 'mem': sort by processes which consumed the most memory (Resident Set Size).
                'cpu_times' and its alias 'cpu_time': sort by processes which consumed the most CPU time.
                'cpu_num': sort by the CPU number this process is currently running on.
                'cpu_percent': sort by a float representing the process CPU utilization as a percentage which can also\
                    be > 100.0 in case of a process running multiple threads on different CPUs.
                'memory_percent': sort py the process memory utilization, the process memory to total physical system memory as a percentage
                'create_time': the process creation time as a floating point number expressed in seconds since the epoch.
                'gids' and its alias 'egid': the effective group id of this process
                'uids' and its alias 'euid': the effective user id of this process
                'pid': sort by the process PID.
                'ppid': sort by the process parent PID
                'name': sort by the processes name
                'username': sort by the name of the user that owns the process.
                'status': sort by the current process status, one of the psutil.STATUS_* constants
        filterstr (str, optional): the string to match against process name or command used and filter the results based on.
        limit (int, optional): limit the results to specific number of processes, to disable set it to -1. Defaults to 25.
        desc (bool, optional): whether to sort the data returned in descending order or not. Defaults to True.

        dict: processes info as a dictionary
            available keys [

    def _get_sort_key(procObj):
        if sort == "mem":
            return procObj["rss"]
        if sort == "cpu_times":
            return procObj["cpu_time"]
        elif sort in ["gids", "egid"]:
            return procObj["gids"].effective
        elif sort in ["uids", "euid"]:
            return procObj["uids"].effective
                return procObj[sort]
            except KeyError:
                j.logger.error(f"bad field name for sorting: {sort}")
                raise j.exceptions.Value(f"bad field name for sorting: {sort}")

    processes_list = []
    if not filterstr:
        if user:
            p_source = get_user_processes(user=user)
            p_source = get_processes()
        # it makes sense that get_pids func should returns list of psutil.Process objects instead of list of pids
        if user:
            p_source = map(get_process_object, get_pids(process_name=filterstr, _alt_source=get_user_processes(user)))
            p_source = map(get_process_object, get_pids(process_name=filterstr))
    for proc in p_source:
        if proc:  # in case a race condition happened, and get_process_object returned None
                # Fetch process details as dict
                pinfo = proc.as_dict(
                pinfo["rss"] = proc.memory_info().rss / (
                    1024 * 1024
                )  # the non-swapped physical memory a process has used in Mb
                pinfo["cpu_time"] = sum(pinfo["cpu_times"][:2])  # cumulative, excluding children and iowait
                pinfo["ports"] = []
                    connections = proc.connections()  # need root
                except (psutil.AccessDenied):
                    if connections:
                        for conn in connections:
                            pinfo["ports"].append({"port": conn.laddr.port, "status": conn.status})
                # Append dict to list
            except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess) as e:
                    "ignoring and logging an exception that occurred while iterating over system processes, not root?",
    # sort the processes list by sort_key
    sorted_processes = sorted(processes_list, key=_get_sort_key, reverse=desc)[:limit]
    return sorted_processes
def get_similar_processes(target_proc=None)

Gets similar processes to current process, started with same command line and same options.


target_proc : int or psutil.Process, optional
pid, or psutil.Process object. if None then pid for current process will be used. Defaults to None.


psutil.Process object for all processes similar to a given process.
Expand source code
def get_similar_processes(target_proc=None):
    """Gets similar processes to current process, started with same command line and same options.

        target_proc (int or psutil.Process, optional): pid, or psutil.Process object.
            if None then pid for current process will be used. Defaults to None.

        psutil.Process: psutil.Process object for all processes similar to a given process.
        if target_proc is None:
            target_proc = get_my_process()
        elif isinstance(target_proc, int):
            target_proc = get_process_object(target_proc, die=True)
        for proc in psutil.process_iter(["name", "exe", "cmdline"]):
            if["cmdline"] and target_proc.cmdline() and["cmdline"] == target_proc.cmdline():
                yield proc
    except (psutil.AccessDenied, psutil.NoSuchProcess):
def get_user_processes(user)

Get all process for a specific user.


user : str
The user name to match against.


process object for all processes owned by user.
Expand source code
def get_user_processes(user):
    """Get all process for a specific user.

        user (str): The user name to match against.

        psutil.Process: process object for all processes owned by `user`.
        for process in psutil.process_iter(["name", "exe", "cmdline"]):
            if process.username() == user:
                yield process
    except (psutil.AccessDenied, psutil.NoSuchProcess):
def in_docker()

will check if we are in a docker.


True if in docker. False otherwise.
Expand source code
def in_docker():
    """will check if we are in a docker.

        bool: True if in docker. False otherwise.
    rc, out, _ = j.sals.process.execute("cat /proc/1/cgroup", die=False, showout=False)
    return rc == 0 and "/docker/" in out
def in_host()

Will check if we are in a host.


True if in host. False otherwise.
Expand source code
def in_host():
    """Will check if we are in a host.

        bool: True if in host. False otherwise.
    return not in_docker()
def is_alive(pid)

Check whether the given PID exists in the current process list.


pid : int
Process ID (PID) to be checked.


True if the given PID exists in the current process list, False otherwise.
Expand source code
def is_alive(pid):
    """Check whether the given PID exists in the current process list.

        pid (int): Process ID (PID) to be checked.

        bool: True if the given PID exists in the current process list, False otherwise.
    return psutil.pid_exists(pid)
def is_installed(cmd)

Checks if a specific command is available on system e.g. curl.


cmd : str
Command to be checked.


True if command is available, False otherwise.
Expand source code
def is_installed(cmd):
    """Checks if a specific command is available on system e.g. curl.

        cmd (str): Command to be checked.

        bool: True if command is available, False otherwise.
    rc, _, _ = execute(f"which {cmd}", die=False)
    return rc == 0
def is_port_listening(port, ipv6=False)

Check if the TCP port is being used by any process


port : int
Port number
ipv6 : bool, optional
Whether to ipv6 localhost address instead of ipv4 localhost address. Defaults to False.


True if port is used, False otherwise.
Expand source code
def is_port_listening(port, ipv6=False):
    """Check if the TCP port is being used by any process

        port (int): Port number
        ipv6 (bool, optional): Whether to ipv6 localhost address instead of ipv4 localhost address. Defaults to False.

        bool: True if port is used, False otherwise.
    from jumpscale.sals import nettools

    ip6 = "::"
    ip4 = ""
    return nettools.tcp_connection_test(ip6 if ipv6 else ip4, port, timeout=5)
def kill(proc, sig=Signals.SIGTERM, timeout=5, sure_kill=False)

Kill a process with a specified signal.


proc : int or psutil.Process
Target process ID (PID) or psutil.Process object.
sig : signal, optional
See signal module constants. Defaults to signal.SIGTERM.
timeout : int, optional
How long to wait for a process to terminate (seconds) before raise exception or, if sure_kill=True, send a SIGKILL. Defaults to 5.
sure_kill : bool, optional
Whether to fallback to SIGKILL if the timeout exceeded for the terminate operation or not. Defaults to False.


In case killing the process failed.
In case the permission to perform this action is denied.
Expand source code
def kill(proc, sig=signal.SIGTERM, timeout=5, sure_kill=False):
    """Kill a process with a specified signal.

        proc (int or psutil.Process): Target process ID (PID) or psutil.Process object.
        sig (signal, optional): See signal module constants. Defaults to signal.SIGTERM.
        timeout (int, optional): How long to wait for a process to terminate (seconds) before raise exception
            or, if sure_kill=True, send a SIGKILL. Defaults to 5.
        sure_kill (bool, optional): Whether to fallback to SIGKILL if the timeout exceeded for the terminate operation or not. Defaults to False.

        j.exceptions.Runtime: In case killing the process failed.
        j.exceptions.Permission: In case the permission to perform this action is denied.
        if isinstance(proc, int):
            proc = get_process_object(proc, die=True)
        if proc.status() == psutil.STATUS_ZOMBIE:
        # Wait for a process to terminate
        # If PID no longer exists return None immediately
        # If timeout exceeded and the process is still alive raise TimeoutExpired exception
        # the process with PID {} was terminated with sig {sig}
    except psutil.TimeoutExpired as e:
        # timeout expires and process is still alive.
        if sure_kill and sig != signal.SIGKILL and != "nt":
            # SIGKILL not supported in windows
            # If a process gets this signal it must quit immediately and will not perform any clean-up operations
            # SIGKILL signal sent
                # the process with PID {} was terminated with sig {signal.SIGKILL}
            except psutil.TimeoutExpired as e:
                if proc.status() == psutil.STATUS_ZOMBIE:
                    # the process with PID: {} becomes a zombie and should be considered a dead.
                # the process may be in an uninterruptible sleep
                j.logger.warning(f"Could not kill the process with pid: {} with {sig}. Timeout: {timeout}")
                raise j.exceptions.Runtime(f"Could not kill process with pid {}, {proc.status()}") from e
            raise j.exceptions.Runtime(f"Could not kill process with pid {}") from e
    except psutil.AccessDenied as e:
        # permission to perform an action is denied
        raise j.exceptions.Permission("Permission to perform this action is denied!") from e
    except psutil.NoSuchProcess:
        # Process no longer exists or Zombie (already dead)
def kill_all_pids(pids, sig=Signals.SIGTERM, timeout=5, sure_kill=False)

Kill all processes with given pids.


pids : list of int
The target processes IDs.
sig : signal, optional
See signal module constants. Defaults to signal.SIGKILL.
timeout : int, optional
How long to wait for a process to terminate (seconds) before raise exception or, if sure_kill=True, send a SIGKILL. Defaults to 5.
sure_kill : bool, optional
Whether to fallback to SIGKILL if the timeout exceeded for the terminate operation or not. Defaults to False.


list of int
represents the IDs of the processes remaning alive if any.
Expand source code
def kill_all_pids(pids, sig=signal.SIGTERM, timeout=5, sure_kill=False):
    """Kill all processes with given pids.

        pids (list of int): The target processes IDs.
        sig (signal, optional): See signal module constants. Defaults to signal.SIGKILL.
        timeout (int, optional): How long to wait for a process to terminate (seconds) before raise exception\
            or, if sure_kill=True, send a SIGKILL. Defaults to 5.
        sure_kill (bool, optional): Whether to fallback to SIGKILL if the timeout exceeded for the terminate operation or not. Defaults to False.

        list of int: represents the IDs of the processes remaning alive if any.
    failed_processes = []
    for pid in pids:
            kill(pid, sig, timeout=timeout, sure_kill=sure_kill)
        except (j.exceptions.Runtime, j.exceptions.Permission):
    return failed_processes
def kill_proc_tree(parent, sig=Signals.SIGTERM, include_parent=True, include_grand_children=True, timeout=5, sure_kill=False)

Kill a process and its children (including grandchildren) with signal sig


proc : int or psutil.Process
Target process ID (PID) or psutil.Process object.
sig : signal, optional
See signal module constants. Defaults to signal.SIGTERM.
include_parent (): Whether to kill the process itself. Defaults to True.
include_grand_children (): whether to kill recursively all grandchildren. Defaults to True.
timeout : int, optional
How long to wait for a process to terminate (seconds) before raise exception or, if sure_kill=True, send a SIGKILL. Defaults to 5.
sure_kill : bool, optional
Whether to fallback to SIGKILL if the timeout exceeded for the terminate operation or not. Defaults to False.


list of psutil.Process
represents the objects of the processes remaning alive if any.


in case the given parent is the current process
Expand source code
def kill_proc_tree(
    parent, sig=signal.SIGTERM, include_parent=True, include_grand_children=True, timeout=5, sure_kill=False
    """Kill a process and its children (including grandchildren) with signal `sig`

        proc (int or psutil.Process): Target process ID (PID) or psutil.Process object.
        sig (signal, optional): See signal module constants. Defaults to signal.SIGTERM.
        include_parent (): Whether to kill the process itself. Defaults to True.
        include_grand_children (): whether to kill recursively all grandchildren. Defaults to True.
        timeout (int, optional): How long to wait for a process to terminate (seconds) before raise exception\
            or, if sure_kill=True, send a SIGKILL. Defaults to 5.
        sure_kill (bool, optional): Whether to fallback to SIGKILL if the timeout exceeded for the terminate operation or not. Defaults to False.

        list of psutil.Process: represents the objects of the processes remaning alive if any.

        AssertionError: in case the given `parent` is the current process
    if isinstance(parent, int):
        parent = get_process_object(parent)
        if parent is None:
            return  # already dead

    # should be checked on any killing function
    # here we first need to make sure taht `include_parent` is True
    # and/or better check inside the below for loop
    assert != os.getpid(), "won't kill myself"

    processes = parent.children(recursive=include_grand_children)[::-1]
    failed = []
    if include_parent:

    for p in processes:
            kill(p, sig=sig, timeout=timeout, sure_kill=sure_kill)
        except (j.exceptions.Runtime, j.exceptions.Permission):

    # making sure
    if failed:
        gone, failed = psutil.wait_procs(failed, timeout=0)
    return failed
def kill_process_by_name(process_name, sig=Signals.SIGTERM, match_predicate=None, timeout=5, sure_kill=False)

Kill all processes that match 'process_name'.


process_name : str
The target process name.
sig : signal, optional
See signal module constants. Defaults to signal.SIGKILL
match_predicate : callable, optional
Function that does matching between found processes and the targeted process, the function should accept two arguments and return a boolean. Defaults to None.
timeout : int, optional
How long to wait for a process to terminate (seconds) before raise exception or, if sure_kill=True, send a SIGKILL. Defaults to 5.
sure_kill : bool, optional
Whether to fallback to SIGKILL if the timeout exceeded for the terminate operation or not. Defaults to False.


list of int
represents the IDs of the processes remaning alive if any.
Expand source code
def kill_process_by_name(process_name, sig=signal.SIGTERM, match_predicate=None, timeout=5, sure_kill=False):
    """Kill all processes that match 'process_name'.

        process_name (str): The target process name.
        sig (signal, optional): See signal module constants. Defaults to signal.SIGKILL
        match_predicate (callable, optional): Function that does matching between\
            found processes and the targeted process, the function should accept\
            two arguments and return a boolean. Defaults to None.
        timeout (int, optional): How long to wait for a process to terminate (seconds) before raise exception\
            or, if sure_kill=True, send a SIGKILL. Defaults to 5.
        sure_kill (bool, optional): Whether to fallback to SIGKILL if the timeout exceeded for the terminate operation or not. Defaults to False.

        list of int: represents the IDs of the processes remaning alive if any.
    pids = get_pids(process_name, match_predicate=match_predicate)
    failed_processes = []
    for pid in pids:
            kill(pid, sig, timeout=timeout, sure_kill=sure_kill)
        except (j.exceptions.Runtime, j.exceptions.Permission):
    return failed_processes
def kill_process_by_port(port, ipv6=False, udp=False, sig=Signals.SIGTERM, timeout=5, sure_kill=False)

Kill process by port.


port : int
The port number.
ipv6 : bool, optional
Whether to search the connections that using ipv6 instead of ipv4. Defaults to False.
udp : bool, optional
Whether to search the connections for UDP port instead of TCP. Defaults to False.
sig : signal, optional
See signal module constants. Defaults to signal.SIGTERM.
timeout : int, optional
How long to wait for a process to terminate (seconds) before raise exception or, if sure_kill=True, send a SIGKILL. Defaults to 5.
sure_kill : bool, optional
Whether to fallback to SIGKILL if the timeout exceeded for the terminate operation or not. Defaults to False.


In case killing the process failed.
In case the permission to perform this action is denied.
Expand source code
def kill_process_by_port(port, ipv6=False, udp=False, sig=signal.SIGTERM, timeout=5, sure_kill=False):
    """Kill process by port.

        port (int): The port number.
        ipv6 (bool, optional): Whether to search the connections that using ipv6 instead of ipv4. Defaults to False.
        udp (bool, optional): Whether to search the connections for UDP port instead of TCP. Defaults to False.
        sig (signal, optional): See signal module constants. Defaults to signal.SIGTERM.
        timeout (int, optional): How long to wait for a process to terminate (seconds) before raise exception
            or, if sure_kill=True, send a SIGKILL. Defaults to 5.
        sure_kill (bool, optional): Whether to fallback to SIGKILL if the timeout exceeded for the terminate operation or not. Defaults to False.

        j.exceptions.Runtime: In case killing the process failed.
        j.exceptions.Permission: In case the permission to perform this action is denied.
    proc = get_process_by_port(port, ipv6=ipv6, udp=udp)
    kill(proc, sig=sig, timeout=timeout, sure_kill=sure_kill)
def kill_user_processes(user, sig=Signals.SIGTERM, timeout=5, sure_kill=False)

Kill all processes for a specific user.


user : str
The user name to match against.
sig : signal, optional
See signal module constants. Defaults to signal.SIGTERM.
timeout : int, optional
How long to wait for a process to terminate (seconds) before raise exception or, if sure_kill=True, send a SIGKILL. Defaults to 5.
sure_kill : bool, optional
Whether to fallback to SIGKILL if the timeout exceeded for the terminate operation or not. Defaults to False.


list of psutil.Process): list of process objects that remain alive if any.

Expand source code
def kill_user_processes(user, sig=signal.SIGTERM, timeout=5, sure_kill=False):
    """Kill all processes for a specific user.

        user (str): The user name to match against.
        sig (signal, optional): See signal module constants. Defaults to signal.SIGTERM.
        timeout (int, optional): How long to wait for a process to terminate (seconds) before raise exception or, if sure_kill=True, send a SIGKILL. Defaults to 5.
        sure_kill (bool, optional): Whether to fallback to SIGKILL if the timeout exceeded for the terminate operation or not. Defaults to False.

        list of psutil.Process): list of process objects that remain alive if any.
    failed_processes = []
    for proc in get_user_processes(user):
            kill(proc, sig=sig, timeout=timeout, sure_kill=sure_kill)
        except (j.exceptions.Runtime, j.exceptions.Permission) as e:
            j.logger.exception("ignoring an exception that occurred while iterating over user processes", exception=e)

    # making sure
    if failed_processes:
        gone, failed_processes = psutil.wait_procs(failed_processes, timeout=0)

    return failed_processes
def ps_find(process_name)

Check if there is any running process that match the given name.


process_name : str
The target process name. will match against against, Process.exe() and Process.cmdline()


True if process is found, False otherwise.
Expand source code
def ps_find(process_name):
    """Check if there is any running process that match the given name.

        process_name (str): The target process name. will match against against, Process.exe() and Process.cmdline()

        bool: True if process is found, False otherwise.
    return len(get_pids(process_name, limit=1)) == 1
def set_env_var(var_names, var_values)

Set the value of the environment variables {varnames}. Existing variable are overwritten

Such changes to the environment affect subprocesses started with os.system(), popen() or fork() and execv()


var_names : list of str
A list of the names of all the environment variables to set
varvalues : list of str
A list of all values for the environment variables


if error happened during setting the environment variables
Expand source code
def set_env_var(var_names, var_values):
    """Set the value of the environment variables {varnames}. Existing variable are overwritten

    Such changes to the environment affect subprocesses started with os.system(), popen() or fork() and execv()

        var_names (list of str): A list of the names of all the environment variables to set
        varvalues (list of str): A list of all values for the environment variables

        j.exceptions.RuntimeError: if error happened during setting the environment variables
    # Note:
    # On some platforms, including FreeBSD and Mac OS X, setting environ may cause memory leaks.
    # Refer to the system documentation for putenv().
    for i in range(len(var_names)):
        os.environ[var_names[i]] = str(var_values[i]).strip()