Module jumpscale.tools.restic.restic

Restic client

Tool to create restic repo using different backends.

prerequisites

Make sure to isntall restic(https://restic.readthedocs.io/en/latest/020_installation.html) On linux it can be installed as follows:

apt-get install restic

Usage

Basic usage

instance = j.clients.restic.get("instance")
instance.repo = /path/to/repo  # Where restic will store its data can be local or SFTP or rest check https://restic.readthedocs.io/en/latest/030_preparing_a_new_repo.html
instance.password = pass  # Password for the repo needed to access the data after init
instance.init_repo()

Other Backends

To use backend such as minio you will need to set extra environment for accessing the servers.

instance.extra_env = {'AWS_ACCESS_KEY_ID': 'KEY',
                      'AWS_SECRET_ACCESS_KEY': 'SECRET'}

Backing up

instance.backup("/my/path")

Listing snapshots

instance.list_snapshots()
[{'time': '2020-07-15T13:44:05.767265958Z',
  'tree': '93254cf97720d264d362c9b8d91889643e45886fe0d6b80027d2cef3910bd43d',
  'paths': ['/tmp/stc'],
  'hostname': 'jsng',
  'username': 'root',
  'id': 'e3330cf26ff4ada6c80868c08626f38fa2fc0185b0510c1bb7c5fa880bc67d0c',
  'short_id': 'e3330cf2'}]

Restoring

instance.restore("/path/to/store", "e3330cf26ff4ada6c80868c08626f38fa2fc0185b0510c1bb7c5fa880bc67d0c")  # Will restore specified snapshot to path
instance.restore("/path/to/store", latest=True, path="/tmp/stc", host="jsng")  # Will return latest snapshot with the specified filters

Pruning data

instance.forget(keep_last=10)  # Will prune all snapshots and keep the last 10 taken only
Expand source code
"""
# Restic client

Tool to create restic repo using different backends.

## prerequisites
Make sure to isntall restic(https://restic.readthedocs.io/en/latest/020_installation.html)
On linux it can be installed as follows:

```bash
apt-get install restic
```

## Usage

### Basic usage

```python
instance = j.clients.restic.get("instance")
instance.repo = /path/to/repo  # Where restic will store its data can be local or SFTP or rest check https://restic.readthedocs.io/en/latest/030_preparing_a_new_repo.html
instance.password = pass  # Password for the repo needed to access the data after init
instance.init_repo()
```

### Other Backends

To use backend such as minio you will need to set extra environment for accessing the servers.

```python
instance.extra_env = {'AWS_ACCESS_KEY_ID': 'KEY',
                      'AWS_SECRET_ACCESS_KEY': 'SECRET'}
```

### Backing up

```python
instance.backup("/my/path")
```

### Listing snapshots

```python
instance.list_snapshots()
[{'time': '2020-07-15T13:44:05.767265958Z',
  'tree': '93254cf97720d264d362c9b8d91889643e45886fe0d6b80027d2cef3910bd43d',
  'paths': ['/tmp/stc'],
  'hostname': 'jsng',
  'username': 'root',
  'id': 'e3330cf26ff4ada6c80868c08626f38fa2fc0185b0510c1bb7c5fa880bc67d0c',
  'short_id': 'e3330cf2'}]
```

### Restoring

```python
instance.restore("/path/to/store", "e3330cf26ff4ada6c80868c08626f38fa2fc0185b0510c1bb7c5fa880bc67d0c")  # Will restore specified snapshot to path
instance.restore("/path/to/store", latest=True, path="/tmp/stc", host="jsng")  # Will return latest snapshot with the specified filters
```

### Pruning data

```python
instance.forget(keep_last=10)  # Will prune all snapshots and keep the last 10 taken only
```
"""
from jumpscale.core.base import Base, fields
from jumpscale.core.exceptions import NotFound, Runtime
from io import BytesIO

import subprocess
import json
import os


CRON_SCRIPT = """
epoch=$(date +%s)

export RESTIC_REPOSITORY={repo}
export RESTIC_PASSWORD={password}

restic unlock &
wait $!

restic backup --one-file-system --tag $epoch {path} &
wait $!

restic forget --keep-last {keep_last} --prune &
wait $!

restic check &
wait $!
"""

WATCHDOG_SCRIPT = """
epoch=$(date +%s)

export RESTIC_REPOSITORY={repo}
export RESTIC_PASSWORD={password}
export AWS_ACCESS_KEY_ID={AWS_ACCESS_KEY_ID}
export AWS_SECRET_ACCESS_KEY={AWS_SECRET_ACCESS_KEY}


latest_backup=`restic snapshots --last -c -q | sed -n 3p |cut -d " " -f3`
latest_backup_epoch=`date "+%s" -d "$latest_backup"`
wait $!

current_date=`date +"%s"`

if [[ $(( (current_date - latest_backup_epoch) / 86400 )) > 2 ]]
then
    echo "No backups is being taken since 2 days, sending escalation mail..."
    jsng 'j.clients.mail.escalation_instance.send("{ESCALATION_MAIL}", subject="{THREEBOT_NAME} 3Bot backups warning", message="We noticed that there were no backups taken since 2 days for {THREEBOT_NAME} 3Bot. Please check the support")'
fi

"""


class ResticRepo(Base):

    repo = fields.String(required=True)
    password = fields.Secret(required=True)
    extra_env = fields.Typed(dict, default={})

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._check_install("restic")
        self._env = None

    def _check_install(self, binary):
        if subprocess.call(["which", binary], stdout=subprocess.DEVNULL):
            raise NotFound(f"{binary} not installed")

    @property
    def env(self):
        self.validate()
        self._env = os.environ.copy()
        self._env.update({"RESTIC_PASSWORD": self.password, "RESTIC_REPOSITORY": self.repo}, **self.extra_env)
        return self._env

    def _run_cmd(self, cmd, check=True):
        proc = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=self.env)
        if check and proc.returncode:
            raise Runtime(f"Restic command failed with {proc.stderr.decode()}")
        return proc

    def init_repo(self):
        """Init restic repo with data specified in the instances
        """
        proc = self._run_cmd(["restic", "cat", "config"], False)
        if proc.returncode > 0:
            self._run_cmd(["restic", "init"])

    def backup(self, path, tags=None, exclude=None):
        """Backup a path to the repo.

        Args:
            path (str or list of str): Local path/s to backup.
            tags (list): List of tags to set to the backup.
            exclude (list of str): This instructs restic to exclude files matching a given pattern/s.
        """
        if not path:
            raise ValueError("Please specify path/s to backup")
        cmd = ["restic", "backup"]
        tags = tags or []
        for tag in tags:
            cmd.extend(["--tag", tag])
        if exclude:
            for pattern in exclude:
                cmd.extend([f"--exclude={pattern}"])
        if isinstance(path, list):
            cmd.extend(path)
        else:
            cmd.extend([path])
        self._run_cmd(cmd)

    def restore(self, target_path, snapshot_id=None, latest=True, path=None, host=None, tags=None):
        """Restores a snapshot.

        Args:
            target_path (str): a path to restore to
            snapshot_id (str, optional): Id of the snapshot. Defaults to None.
            latest (bool, optional): if True will use latest snapshot. Defaults to True.
            path (str, optional): Filter on the path when using latest. Defaults to None.
            host (str, optional): Filter on the hostname when using latest. Defaults to None.
            tags (list, optional): List of tags to filter on.
        """
        cmd = ["restic", "--target", target_path, "restore"]
        if snapshot_id:
            cmd.append(snapshot_id)
            self._run_cmd(cmd)
        elif latest:
            args = ["latest"]
            if path:
                args.extend(["--path", path])
            if host:
                args.extend(["--host", host])
            if tags:
                for tag in tags:
                    cmd.extend(["--tag", tag])
            self._run_cmd(cmd + args)
        else:
            raise ValueError("Please specify either `snapshot_id` or `latest` flag")

    def list_snapshots(self, tags=None, last=False, path=None):
        """List all snapshots in the repo.

        Args:
            tags (list, optional): a list of tags to filter on. Defaults to None.
            last (bool): If True will get last snapshot only while respecting the other filters. Defaults to False.
            path (str): a path to filter on. Defaults to None.

        Returns
            list : all snapshots as dicts
        """
        tags = tags or []
        cmd = ["restic", "snapshots", "--json"]
        for tag in tags:
            cmd.extend(["--tag", tag])

        if path:
            cmd.extend(["--path", path])
        if last:
            cmd.append("--last")
        proc = self._run_cmd(cmd)
        return json.loads(proc.stdout)

    def forget(self, keep_last=10, prune=True, snapshots=None, tags=None):
        """Remove snapshots and Optionally remove the data that was referenced by those snapshots.
        During a prune operation, the repository is locked and backups cannot be completed.

        Args:
            keep_last (str, optional): How many items to keep. Defaults to 10.
            prune (bool, optional): Whether to actually remove the data or not. Defaults to True.
            snapshots (list, optional): a list of specifics snapshot ids to forget. if given, the value of keep_last parm will be ignored. Defaults to None.
            tags (list, optional): if given, Only the snapshots which have specified tags are considered. Defaults to None.
        """
        cmd = ["restic", "forget"]
        if tags:
            for tag in tags:
                cmd.extend(["--tag", tag])
        if keep_last and not snapshots:  # will be ignored in case if passing snapshot id/s. this is a restic behaviour.
            cmd.extend(["--keep-last", str(keep_last)])
        if prune:
            cmd.append("--prune")
        if snapshots:
            cmd.extend(snapshots)
        self._run_cmd(cmd)

    def _get_script_path(self, path):
        return os.path.join(path, f"{self.instance_name}_restic_cron")

    def _get_crons_jobs(self):
        proc = subprocess.run(["crontab", "-l"], stderr=subprocess.DEVNULL, stdout=subprocess.PIPE)
        return proc.stdout.decode()

    def auto_backup(self, path, keep_last=20):
        """Runs a cron job that backups the repo and prunes the last specified backups.

        Args:
            path (str): Local path to backup.
            keep_last (int, optional): How many items to keep in every forgot operation. Defaults to 20.
        """
        self._check_install("crontab")
        script_path = self._get_script_path(path)
        cronjobs = self._get_crons_jobs()
        if not self.auto_backup_running(path):  # Check if cron job already running
            cron_script = CRON_SCRIPT.format(repo=self.repo, password=self.password, path=path, keep_last=keep_last)
            with open(script_path, "w") as rfd:
                rfd.write(cron_script)

            cron_cmd = cronjobs + f"0 0 * * * bash {script_path} \n"
            proc = subprocess.Popen(["crontab", "-"], stdin=subprocess.PIPE, stderr=subprocess.PIPE)
            proc_res = proc.communicate(input=cron_cmd.encode())
            if proc.returncode > 0:
                raise Runtime(f"Couldn't start cron job, failed with {proc_res[1]}")

    def auto_backup_running(self, path):
        """Checks if auto backup for the specified path is running or not

        Args:
            path (str): Local path to backup in the cron job.

        Returns:
            bool: Whether it is running or not.
        """
        script_path = self._get_script_path(path)
        cronjobs = self._get_crons_jobs()
        return cronjobs.find(script_path) >= 0

    def disable_auto_backup(self, path):
        """Removes cron jon based on the path being backed.

        Args:
            path (str): Local path to backup in the cron job.
        """
        script_path = self._get_script_path(path)
        cronjobs = self._get_crons_jobs()
        other_crons = []
        for cronjob in cronjobs.splitlines():
            if script_path not in cronjob:
                other_crons.append(cronjob)
        proc = subprocess.Popen(["crontab", "-"], stdin=subprocess.PIPE, stderr=subprocess.PIPE)
        cron_cmd = "\n".join(other_crons) + "\n"
        proc_res = proc.communicate(input=cron_cmd.encode())
        if proc.returncode > 0:
            raise Runtime(f"Couldn't remove cron job, failed with {proc_res[1]}")

    def backup_watchdog_running(self, script_path) -> bool:
        """Watches a cronjob to watch backups using last snapshot time.

        Args:
            script_path (str): a path to the script to run the cronjob.
        Returns:
            bool: True if the backup watchdog running otherwise False
        """
        script_path = self._get_script_path(script_path)
        cronjobs = self._get_crons_jobs()
        return cronjobs.find(script_path) >= 0

    def start_watch_backup(self, path):
        """Runs a cron job that backups the repo and prunes the last specified backups.

        Args:
            path (str): Local path to backup.
            keep_last (int, optional): How many items to keep in every forgot operation. Defaults to 20.
        """
        self._check_install("crontab")
        script_path = self._get_script_path(path)
        cronjobs = self._get_crons_jobs()
        if not self.backup_watchdog_running(path):  # Check if cron job already running
            cron_script = WATCHDOG_SCRIPT.format(
                repo=self.repo,
                password=self.password,
                path=path,
                AWS_SECRET_ACCESS_KEY=self.extra_env.get("AWS_SECRET_ACCESS_KEY"),
                AWS_ACCESS_KEY_ID=self.extra_env.get("AWS_ACCESS_KEY_ID"),
                THREEBOT_NAME=os.environ.get("THREEBOT_NAME"),
                ESCALATION_MAIL=os.environ.get("ESCALATION_MAIL"),
            )

            with open(script_path, "w") as rfd:
                rfd.write(cron_script)

            cron_cmd = cronjobs + f"0 0 */2 * * bash {script_path} \n"
            proc = subprocess.Popen(["crontab", "-"], stdin=subprocess.PIPE, stderr=subprocess.PIPE)
            proc_res = proc.communicate(input=cron_cmd.encode())
            if proc.returncode > 0:
                raise Runtime(f"Couldn't start cron job, failed with {proc_res[1]}")

Classes

class ResticRepo (*args, **kwargs)

A simple attribute-based namespace.

SimpleNamespace(**kwargs)

base class implementation for any class with fields which supports getting/setting raw data for any instance fields.

any instance can have an optional name and a parent.

class Person(Base):
    name = fields.String()
    age = fields.Float()

p = Person(name="ahmed", age="19")
print(p.name, p.age)

Args

parent_ : Base, optional
parent instance. Defaults to None.
instance_name_ : str, optional
instance name. Defaults to None.
**values
any given field values to initiate the instance with
Expand source code
class ResticRepo(Base):

    repo = fields.String(required=True)
    password = fields.Secret(required=True)
    extra_env = fields.Typed(dict, default={})

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

        self._check_install("restic")
        self._env = None

    def _check_install(self, binary):
        if subprocess.call(["which", binary], stdout=subprocess.DEVNULL):
            raise NotFound(f"{binary} not installed")

    @property
    def env(self):
        self.validate()
        self._env = os.environ.copy()
        self._env.update({"RESTIC_PASSWORD": self.password, "RESTIC_REPOSITORY": self.repo}, **self.extra_env)
        return self._env

    def _run_cmd(self, cmd, check=True):
        proc = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=self.env)
        if check and proc.returncode:
            raise Runtime(f"Restic command failed with {proc.stderr.decode()}")
        return proc

    def init_repo(self):
        """Init restic repo with data specified in the instances
        """
        proc = self._run_cmd(["restic", "cat", "config"], False)
        if proc.returncode > 0:
            self._run_cmd(["restic", "init"])

    def backup(self, path, tags=None, exclude=None):
        """Backup a path to the repo.

        Args:
            path (str or list of str): Local path/s to backup.
            tags (list): List of tags to set to the backup.
            exclude (list of str): This instructs restic to exclude files matching a given pattern/s.
        """
        if not path:
            raise ValueError("Please specify path/s to backup")
        cmd = ["restic", "backup"]
        tags = tags or []
        for tag in tags:
            cmd.extend(["--tag", tag])
        if exclude:
            for pattern in exclude:
                cmd.extend([f"--exclude={pattern}"])
        if isinstance(path, list):
            cmd.extend(path)
        else:
            cmd.extend([path])
        self._run_cmd(cmd)

    def restore(self, target_path, snapshot_id=None, latest=True, path=None, host=None, tags=None):
        """Restores a snapshot.

        Args:
            target_path (str): a path to restore to
            snapshot_id (str, optional): Id of the snapshot. Defaults to None.
            latest (bool, optional): if True will use latest snapshot. Defaults to True.
            path (str, optional): Filter on the path when using latest. Defaults to None.
            host (str, optional): Filter on the hostname when using latest. Defaults to None.
            tags (list, optional): List of tags to filter on.
        """
        cmd = ["restic", "--target", target_path, "restore"]
        if snapshot_id:
            cmd.append(snapshot_id)
            self._run_cmd(cmd)
        elif latest:
            args = ["latest"]
            if path:
                args.extend(["--path", path])
            if host:
                args.extend(["--host", host])
            if tags:
                for tag in tags:
                    cmd.extend(["--tag", tag])
            self._run_cmd(cmd + args)
        else:
            raise ValueError("Please specify either `snapshot_id` or `latest` flag")

    def list_snapshots(self, tags=None, last=False, path=None):
        """List all snapshots in the repo.

        Args:
            tags (list, optional): a list of tags to filter on. Defaults to None.
            last (bool): If True will get last snapshot only while respecting the other filters. Defaults to False.
            path (str): a path to filter on. Defaults to None.

        Returns
            list : all snapshots as dicts
        """
        tags = tags or []
        cmd = ["restic", "snapshots", "--json"]
        for tag in tags:
            cmd.extend(["--tag", tag])

        if path:
            cmd.extend(["--path", path])
        if last:
            cmd.append("--last")
        proc = self._run_cmd(cmd)
        return json.loads(proc.stdout)

    def forget(self, keep_last=10, prune=True, snapshots=None, tags=None):
        """Remove snapshots and Optionally remove the data that was referenced by those snapshots.
        During a prune operation, the repository is locked and backups cannot be completed.

        Args:
            keep_last (str, optional): How many items to keep. Defaults to 10.
            prune (bool, optional): Whether to actually remove the data or not. Defaults to True.
            snapshots (list, optional): a list of specifics snapshot ids to forget. if given, the value of keep_last parm will be ignored. Defaults to None.
            tags (list, optional): if given, Only the snapshots which have specified tags are considered. Defaults to None.
        """
        cmd = ["restic", "forget"]
        if tags:
            for tag in tags:
                cmd.extend(["--tag", tag])
        if keep_last and not snapshots:  # will be ignored in case if passing snapshot id/s. this is a restic behaviour.
            cmd.extend(["--keep-last", str(keep_last)])
        if prune:
            cmd.append("--prune")
        if snapshots:
            cmd.extend(snapshots)
        self._run_cmd(cmd)

    def _get_script_path(self, path):
        return os.path.join(path, f"{self.instance_name}_restic_cron")

    def _get_crons_jobs(self):
        proc = subprocess.run(["crontab", "-l"], stderr=subprocess.DEVNULL, stdout=subprocess.PIPE)
        return proc.stdout.decode()

    def auto_backup(self, path, keep_last=20):
        """Runs a cron job that backups the repo and prunes the last specified backups.

        Args:
            path (str): Local path to backup.
            keep_last (int, optional): How many items to keep in every forgot operation. Defaults to 20.
        """
        self._check_install("crontab")
        script_path = self._get_script_path(path)
        cronjobs = self._get_crons_jobs()
        if not self.auto_backup_running(path):  # Check if cron job already running
            cron_script = CRON_SCRIPT.format(repo=self.repo, password=self.password, path=path, keep_last=keep_last)
            with open(script_path, "w") as rfd:
                rfd.write(cron_script)

            cron_cmd = cronjobs + f"0 0 * * * bash {script_path} \n"
            proc = subprocess.Popen(["crontab", "-"], stdin=subprocess.PIPE, stderr=subprocess.PIPE)
            proc_res = proc.communicate(input=cron_cmd.encode())
            if proc.returncode > 0:
                raise Runtime(f"Couldn't start cron job, failed with {proc_res[1]}")

    def auto_backup_running(self, path):
        """Checks if auto backup for the specified path is running or not

        Args:
            path (str): Local path to backup in the cron job.

        Returns:
            bool: Whether it is running or not.
        """
        script_path = self._get_script_path(path)
        cronjobs = self._get_crons_jobs()
        return cronjobs.find(script_path) >= 0

    def disable_auto_backup(self, path):
        """Removes cron jon based on the path being backed.

        Args:
            path (str): Local path to backup in the cron job.
        """
        script_path = self._get_script_path(path)
        cronjobs = self._get_crons_jobs()
        other_crons = []
        for cronjob in cronjobs.splitlines():
            if script_path not in cronjob:
                other_crons.append(cronjob)
        proc = subprocess.Popen(["crontab", "-"], stdin=subprocess.PIPE, stderr=subprocess.PIPE)
        cron_cmd = "\n".join(other_crons) + "\n"
        proc_res = proc.communicate(input=cron_cmd.encode())
        if proc.returncode > 0:
            raise Runtime(f"Couldn't remove cron job, failed with {proc_res[1]}")

    def backup_watchdog_running(self, script_path) -> bool:
        """Watches a cronjob to watch backups using last snapshot time.

        Args:
            script_path (str): a path to the script to run the cronjob.
        Returns:
            bool: True if the backup watchdog running otherwise False
        """
        script_path = self._get_script_path(script_path)
        cronjobs = self._get_crons_jobs()
        return cronjobs.find(script_path) >= 0

    def start_watch_backup(self, path):
        """Runs a cron job that backups the repo and prunes the last specified backups.

        Args:
            path (str): Local path to backup.
            keep_last (int, optional): How many items to keep in every forgot operation. Defaults to 20.
        """
        self._check_install("crontab")
        script_path = self._get_script_path(path)
        cronjobs = self._get_crons_jobs()
        if not self.backup_watchdog_running(path):  # Check if cron job already running
            cron_script = WATCHDOG_SCRIPT.format(
                repo=self.repo,
                password=self.password,
                path=path,
                AWS_SECRET_ACCESS_KEY=self.extra_env.get("AWS_SECRET_ACCESS_KEY"),
                AWS_ACCESS_KEY_ID=self.extra_env.get("AWS_ACCESS_KEY_ID"),
                THREEBOT_NAME=os.environ.get("THREEBOT_NAME"),
                ESCALATION_MAIL=os.environ.get("ESCALATION_MAIL"),
            )

            with open(script_path, "w") as rfd:
                rfd.write(cron_script)

            cron_cmd = cronjobs + f"0 0 */2 * * bash {script_path} \n"
            proc = subprocess.Popen(["crontab", "-"], stdin=subprocess.PIPE, stderr=subprocess.PIPE)
            proc_res = proc.communicate(input=cron_cmd.encode())
            if proc.returncode > 0:
                raise Runtime(f"Couldn't start cron job, failed with {proc_res[1]}")

Ancestors

  • Base
  • types.SimpleNamespace

Instance variables

var env
Expand source code
@property
def env(self):
    self.validate()
    self._env = os.environ.copy()
    self._env.update({"RESTIC_PASSWORD": self.password, "RESTIC_REPOSITORY": self.repo}, **self.extra_env)
    return self._env
var extra_env

getter method this property

will call _get_value, which would if the value is already defined and will get the default value if not

Returns

any
the field value
Expand source code
def getter(self):
    """
    getter method this property

    will call `_get_value`, which would if the value is already defined
    and will get the default value if not

    Returns:
        any: the field value
    """
    return self._get_value(name, field)
var password

getter method this property

will call _get_value, which would if the value is already defined and will get the default value if not

Returns

any
the field value
Expand source code
def getter(self):
    """
    getter method this property

    will call `_get_value`, which would if the value is already defined
    and will get the default value if not

    Returns:
        any: the field value
    """
    return self._get_value(name, field)
var repo

getter method this property

will call _get_value, which would if the value is already defined and will get the default value if not

Returns

any
the field value
Expand source code
def getter(self):
    """
    getter method this property

    will call `_get_value`, which would if the value is already defined
    and will get the default value if not

    Returns:
        any: the field value
    """
    return self._get_value(name, field)

Methods

def auto_backup(self, path, keep_last=20)

Runs a cron job that backups the repo and prunes the last specified backups.

Args

path : str
Local path to backup.
keep_last : int, optional
How many items to keep in every forgot operation. Defaults to 20.
Expand source code
def auto_backup(self, path, keep_last=20):
    """Runs a cron job that backups the repo and prunes the last specified backups.

    Args:
        path (str): Local path to backup.
        keep_last (int, optional): How many items to keep in every forgot operation. Defaults to 20.
    """
    self._check_install("crontab")
    script_path = self._get_script_path(path)
    cronjobs = self._get_crons_jobs()
    if not self.auto_backup_running(path):  # Check if cron job already running
        cron_script = CRON_SCRIPT.format(repo=self.repo, password=self.password, path=path, keep_last=keep_last)
        with open(script_path, "w") as rfd:
            rfd.write(cron_script)

        cron_cmd = cronjobs + f"0 0 * * * bash {script_path} \n"
        proc = subprocess.Popen(["crontab", "-"], stdin=subprocess.PIPE, stderr=subprocess.PIPE)
        proc_res = proc.communicate(input=cron_cmd.encode())
        if proc.returncode > 0:
            raise Runtime(f"Couldn't start cron job, failed with {proc_res[1]}")
def auto_backup_running(self, path)

Checks if auto backup for the specified path is running or not

Args

path : str
Local path to backup in the cron job.

Returns

bool
Whether it is running or not.
Expand source code
def auto_backup_running(self, path):
    """Checks if auto backup for the specified path is running or not

    Args:
        path (str): Local path to backup in the cron job.

    Returns:
        bool: Whether it is running or not.
    """
    script_path = self._get_script_path(path)
    cronjobs = self._get_crons_jobs()
    return cronjobs.find(script_path) >= 0
def backup(self, path, tags=None, exclude=None)

Backup a path to the repo.

Args

path : str or list of str
Local path/s to backup.
tags : list
List of tags to set to the backup.
exclude : list of str
This instructs restic to exclude files matching a given pattern/s.
Expand source code
def backup(self, path, tags=None, exclude=None):
    """Backup a path to the repo.

    Args:
        path (str or list of str): Local path/s to backup.
        tags (list): List of tags to set to the backup.
        exclude (list of str): This instructs restic to exclude files matching a given pattern/s.
    """
    if not path:
        raise ValueError("Please specify path/s to backup")
    cmd = ["restic", "backup"]
    tags = tags or []
    for tag in tags:
        cmd.extend(["--tag", tag])
    if exclude:
        for pattern in exclude:
            cmd.extend([f"--exclude={pattern}"])
    if isinstance(path, list):
        cmd.extend(path)
    else:
        cmd.extend([path])
    self._run_cmd(cmd)
def backup_watchdog_running(self, script_path) ‑> bool

Watches a cronjob to watch backups using last snapshot time.

Args

script_path : str
a path to the script to run the cronjob.

Returns

bool
True if the backup watchdog running otherwise False
Expand source code
def backup_watchdog_running(self, script_path) -> bool:
    """Watches a cronjob to watch backups using last snapshot time.

    Args:
        script_path (str): a path to the script to run the cronjob.
    Returns:
        bool: True if the backup watchdog running otherwise False
    """
    script_path = self._get_script_path(script_path)
    cronjobs = self._get_crons_jobs()
    return cronjobs.find(script_path) >= 0
def disable_auto_backup(self, path)

Removes cron jon based on the path being backed.

Args

path : str
Local path to backup in the cron job.
Expand source code
def disable_auto_backup(self, path):
    """Removes cron jon based on the path being backed.

    Args:
        path (str): Local path to backup in the cron job.
    """
    script_path = self._get_script_path(path)
    cronjobs = self._get_crons_jobs()
    other_crons = []
    for cronjob in cronjobs.splitlines():
        if script_path not in cronjob:
            other_crons.append(cronjob)
    proc = subprocess.Popen(["crontab", "-"], stdin=subprocess.PIPE, stderr=subprocess.PIPE)
    cron_cmd = "\n".join(other_crons) + "\n"
    proc_res = proc.communicate(input=cron_cmd.encode())
    if proc.returncode > 0:
        raise Runtime(f"Couldn't remove cron job, failed with {proc_res[1]}")
def forget(self, keep_last=10, prune=True, snapshots=None, tags=None)

Remove snapshots and Optionally remove the data that was referenced by those snapshots. During a prune operation, the repository is locked and backups cannot be completed.

Args

keep_last : str, optional
How many items to keep. Defaults to 10.
prune : bool, optional
Whether to actually remove the data or not. Defaults to True.
snapshots : list, optional
a list of specifics snapshot ids to forget. if given, the value of keep_last parm will be ignored. Defaults to None.
tags : list, optional
if given, Only the snapshots which have specified tags are considered. Defaults to None.
Expand source code
def forget(self, keep_last=10, prune=True, snapshots=None, tags=None):
    """Remove snapshots and Optionally remove the data that was referenced by those snapshots.
    During a prune operation, the repository is locked and backups cannot be completed.

    Args:
        keep_last (str, optional): How many items to keep. Defaults to 10.
        prune (bool, optional): Whether to actually remove the data or not. Defaults to True.
        snapshots (list, optional): a list of specifics snapshot ids to forget. if given, the value of keep_last parm will be ignored. Defaults to None.
        tags (list, optional): if given, Only the snapshots which have specified tags are considered. Defaults to None.
    """
    cmd = ["restic", "forget"]
    if tags:
        for tag in tags:
            cmd.extend(["--tag", tag])
    if keep_last and not snapshots:  # will be ignored in case if passing snapshot id/s. this is a restic behaviour.
        cmd.extend(["--keep-last", str(keep_last)])
    if prune:
        cmd.append("--prune")
    if snapshots:
        cmd.extend(snapshots)
    self._run_cmd(cmd)
def init_repo(self)

Init restic repo with data specified in the instances

Expand source code
def init_repo(self):
    """Init restic repo with data specified in the instances
    """
    proc = self._run_cmd(["restic", "cat", "config"], False)
    if proc.returncode > 0:
        self._run_cmd(["restic", "init"])
def list_snapshots(self, tags=None, last=False, path=None)

List all snapshots in the repo.

Args

tags : list, optional
a list of tags to filter on. Defaults to None.
last : bool
If True will get last snapshot only while respecting the other filters. Defaults to False.
path : str
a path to filter on. Defaults to None.

Returns list : all snapshots as dicts

Expand source code
def list_snapshots(self, tags=None, last=False, path=None):
    """List all snapshots in the repo.

    Args:
        tags (list, optional): a list of tags to filter on. Defaults to None.
        last (bool): If True will get last snapshot only while respecting the other filters. Defaults to False.
        path (str): a path to filter on. Defaults to None.

    Returns
        list : all snapshots as dicts
    """
    tags = tags or []
    cmd = ["restic", "snapshots", "--json"]
    for tag in tags:
        cmd.extend(["--tag", tag])

    if path:
        cmd.extend(["--path", path])
    if last:
        cmd.append("--last")
    proc = self._run_cmd(cmd)
    return json.loads(proc.stdout)
def restore(self, target_path, snapshot_id=None, latest=True, path=None, host=None, tags=None)

Restores a snapshot.

Args

target_path : str
a path to restore to
snapshot_id : str, optional
Id of the snapshot. Defaults to None.
latest : bool, optional
if True will use latest snapshot. Defaults to True.
path : str, optional
Filter on the path when using latest. Defaults to None.
host : str, optional
Filter on the hostname when using latest. Defaults to None.
tags : list, optional
List of tags to filter on.
Expand source code
def restore(self, target_path, snapshot_id=None, latest=True, path=None, host=None, tags=None):
    """Restores a snapshot.

    Args:
        target_path (str): a path to restore to
        snapshot_id (str, optional): Id of the snapshot. Defaults to None.
        latest (bool, optional): if True will use latest snapshot. Defaults to True.
        path (str, optional): Filter on the path when using latest. Defaults to None.
        host (str, optional): Filter on the hostname when using latest. Defaults to None.
        tags (list, optional): List of tags to filter on.
    """
    cmd = ["restic", "--target", target_path, "restore"]
    if snapshot_id:
        cmd.append(snapshot_id)
        self._run_cmd(cmd)
    elif latest:
        args = ["latest"]
        if path:
            args.extend(["--path", path])
        if host:
            args.extend(["--host", host])
        if tags:
            for tag in tags:
                cmd.extend(["--tag", tag])
        self._run_cmd(cmd + args)
    else:
        raise ValueError("Please specify either `snapshot_id` or `latest` flag")
def start_watch_backup(self, path)

Runs a cron job that backups the repo and prunes the last specified backups.

Args

path : str
Local path to backup.
keep_last : int, optional
How many items to keep in every forgot operation. Defaults to 20.
Expand source code
def start_watch_backup(self, path):
    """Runs a cron job that backups the repo and prunes the last specified backups.

    Args:
        path (str): Local path to backup.
        keep_last (int, optional): How many items to keep in every forgot operation. Defaults to 20.
    """
    self._check_install("crontab")
    script_path = self._get_script_path(path)
    cronjobs = self._get_crons_jobs()
    if not self.backup_watchdog_running(path):  # Check if cron job already running
        cron_script = WATCHDOG_SCRIPT.format(
            repo=self.repo,
            password=self.password,
            path=path,
            AWS_SECRET_ACCESS_KEY=self.extra_env.get("AWS_SECRET_ACCESS_KEY"),
            AWS_ACCESS_KEY_ID=self.extra_env.get("AWS_ACCESS_KEY_ID"),
            THREEBOT_NAME=os.environ.get("THREEBOT_NAME"),
            ESCALATION_MAIL=os.environ.get("ESCALATION_MAIL"),
        )

        with open(script_path, "w") as rfd:
            rfd.write(cron_script)

        cron_cmd = cronjobs + f"0 0 */2 * * bash {script_path} \n"
        proc = subprocess.Popen(["crontab", "-"], stdin=subprocess.PIPE, stderr=subprocess.PIPE)
        proc_res = proc.communicate(input=cron_cmd.encode())
        if proc.returncode > 0:
            raise Runtime(f"Couldn't start cron job, failed with {proc_res[1]}")

Inherited members