Module jumpscale.tools.restic.restic
Restic client
Tool to create restic repo using different backends.
prerequisites
Make sure to isntall restic(https://restic.readthedocs.io/en/latest/020_installation.html) On linux it can be installed as follows:
apt-get install restic
Usage
Basic usage
instance = j.clients.restic.get("instance")
instance.repo = /path/to/repo # Where restic will store its data can be local or SFTP or rest check https://restic.readthedocs.io/en/latest/030_preparing_a_new_repo.html
instance.password = pass # Password for the repo needed to access the data after init
instance.init_repo()
Other Backends
To use backend such as minio you will need to set extra environment for accessing the servers.
instance.extra_env = {'AWS_ACCESS_KEY_ID': 'KEY',
'AWS_SECRET_ACCESS_KEY': 'SECRET'}
Backing up
instance.backup("/my/path")
Listing snapshots
instance.list_snapshots()
[{'time': '2020-07-15T13:44:05.767265958Z',
'tree': '93254cf97720d264d362c9b8d91889643e45886fe0d6b80027d2cef3910bd43d',
'paths': ['/tmp/stc'],
'hostname': 'jsng',
'username': 'root',
'id': 'e3330cf26ff4ada6c80868c08626f38fa2fc0185b0510c1bb7c5fa880bc67d0c',
'short_id': 'e3330cf2'}]
Restoring
instance.restore("/path/to/store", "e3330cf26ff4ada6c80868c08626f38fa2fc0185b0510c1bb7c5fa880bc67d0c") # Will restore specified snapshot to path
instance.restore("/path/to/store", latest=True, path="/tmp/stc", host="jsng") # Will return latest snapshot with the specified filters
Pruning data
instance.forget(keep_last=10) # Will prune all snapshots and keep the last 10 taken only
Expand source code
"""
# Restic client
Tool to create restic repo using different backends.
## prerequisites
Make sure to isntall restic(https://restic.readthedocs.io/en/latest/020_installation.html)
On linux it can be installed as follows:
```bash
apt-get install restic
```
## Usage
### Basic usage
```python
instance = j.clients.restic.get("instance")
instance.repo = /path/to/repo # Where restic will store its data can be local or SFTP or rest check https://restic.readthedocs.io/en/latest/030_preparing_a_new_repo.html
instance.password = pass # Password for the repo needed to access the data after init
instance.init_repo()
```
### Other Backends
To use backend such as minio you will need to set extra environment for accessing the servers.
```python
instance.extra_env = {'AWS_ACCESS_KEY_ID': 'KEY',
'AWS_SECRET_ACCESS_KEY': 'SECRET'}
```
### Backing up
```python
instance.backup("/my/path")
```
### Listing snapshots
```python
instance.list_snapshots()
[{'time': '2020-07-15T13:44:05.767265958Z',
'tree': '93254cf97720d264d362c9b8d91889643e45886fe0d6b80027d2cef3910bd43d',
'paths': ['/tmp/stc'],
'hostname': 'jsng',
'username': 'root',
'id': 'e3330cf26ff4ada6c80868c08626f38fa2fc0185b0510c1bb7c5fa880bc67d0c',
'short_id': 'e3330cf2'}]
```
### Restoring
```python
instance.restore("/path/to/store", "e3330cf26ff4ada6c80868c08626f38fa2fc0185b0510c1bb7c5fa880bc67d0c") # Will restore specified snapshot to path
instance.restore("/path/to/store", latest=True, path="/tmp/stc", host="jsng") # Will return latest snapshot with the specified filters
```
### Pruning data
```python
instance.forget(keep_last=10) # Will prune all snapshots and keep the last 10 taken only
```
"""
from jumpscale.core.base import Base, fields
from jumpscale.core.exceptions import NotFound, Runtime
from io import BytesIO
import subprocess
import json
import os
CRON_SCRIPT = """
epoch=$(date +%s)
export RESTIC_REPOSITORY={repo}
export RESTIC_PASSWORD={password}
restic unlock &
wait $!
restic backup --one-file-system --tag $epoch {path} &
wait $!
restic forget --keep-last {keep_last} --prune &
wait $!
restic check &
wait $!
"""
WATCHDOG_SCRIPT = """
epoch=$(date +%s)
export RESTIC_REPOSITORY={repo}
export RESTIC_PASSWORD={password}
export AWS_ACCESS_KEY_ID={AWS_ACCESS_KEY_ID}
export AWS_SECRET_ACCESS_KEY={AWS_SECRET_ACCESS_KEY}
latest_backup=`restic snapshots --last -c -q | sed -n 3p |cut -d " " -f3`
latest_backup_epoch=`date "+%s" -d "$latest_backup"`
wait $!
current_date=`date +"%s"`
if [[ $(( (current_date - latest_backup_epoch) / 86400 )) > 2 ]]
then
echo "No backups is being taken since 2 days, sending escalation mail..."
jsng 'j.clients.mail.escalation_instance.send("{ESCALATION_MAIL}", subject="{THREEBOT_NAME} 3Bot backups warning", message="We noticed that there were no backups taken since 2 days for {THREEBOT_NAME} 3Bot. Please check the support")'
fi
"""
class ResticRepo(Base):
repo = fields.String(required=True)
password = fields.Secret(required=True)
extra_env = fields.Typed(dict, default={})
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._check_install("restic")
self._env = None
def _check_install(self, binary):
if subprocess.call(["which", binary], stdout=subprocess.DEVNULL):
raise NotFound(f"{binary} not installed")
@property
def env(self):
self.validate()
self._env = os.environ.copy()
self._env.update({"RESTIC_PASSWORD": self.password, "RESTIC_REPOSITORY": self.repo}, **self.extra_env)
return self._env
def _run_cmd(self, cmd, check=True):
proc = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=self.env)
if check and proc.returncode:
raise Runtime(f"Restic command failed with {proc.stderr.decode()}")
return proc
def init_repo(self):
"""Init restic repo with data specified in the instances
"""
proc = self._run_cmd(["restic", "cat", "config"], False)
if proc.returncode > 0:
self._run_cmd(["restic", "init"])
def backup(self, path, tags=None, exclude=None):
"""Backup a path to the repo.
Args:
path (str or list of str): Local path/s to backup.
tags (list): List of tags to set to the backup.
exclude (list of str): This instructs restic to exclude files matching a given pattern/s.
"""
if not path:
raise ValueError("Please specify path/s to backup")
cmd = ["restic", "backup"]
tags = tags or []
for tag in tags:
cmd.extend(["--tag", tag])
if exclude:
for pattern in exclude:
cmd.extend([f"--exclude={pattern}"])
if isinstance(path, list):
cmd.extend(path)
else:
cmd.extend([path])
self._run_cmd(cmd)
def restore(self, target_path, snapshot_id=None, latest=True, path=None, host=None, tags=None):
"""Restores a snapshot.
Args:
target_path (str): a path to restore to
snapshot_id (str, optional): Id of the snapshot. Defaults to None.
latest (bool, optional): if True will use latest snapshot. Defaults to True.
path (str, optional): Filter on the path when using latest. Defaults to None.
host (str, optional): Filter on the hostname when using latest. Defaults to None.
tags (list, optional): List of tags to filter on.
"""
cmd = ["restic", "--target", target_path, "restore"]
if snapshot_id:
cmd.append(snapshot_id)
self._run_cmd(cmd)
elif latest:
args = ["latest"]
if path:
args.extend(["--path", path])
if host:
args.extend(["--host", host])
if tags:
for tag in tags:
cmd.extend(["--tag", tag])
self._run_cmd(cmd + args)
else:
raise ValueError("Please specify either `snapshot_id` or `latest` flag")
def list_snapshots(self, tags=None, last=False, path=None):
"""List all snapshots in the repo.
Args:
tags (list, optional): a list of tags to filter on. Defaults to None.
last (bool): If True will get last snapshot only while respecting the other filters. Defaults to False.
path (str): a path to filter on. Defaults to None.
Returns
list : all snapshots as dicts
"""
tags = tags or []
cmd = ["restic", "snapshots", "--json"]
for tag in tags:
cmd.extend(["--tag", tag])
if path:
cmd.extend(["--path", path])
if last:
cmd.append("--last")
proc = self._run_cmd(cmd)
return json.loads(proc.stdout)
def forget(self, keep_last=10, prune=True, snapshots=None, tags=None):
"""Remove snapshots and Optionally remove the data that was referenced by those snapshots.
During a prune operation, the repository is locked and backups cannot be completed.
Args:
keep_last (str, optional): How many items to keep. Defaults to 10.
prune (bool, optional): Whether to actually remove the data or not. Defaults to True.
snapshots (list, optional): a list of specifics snapshot ids to forget. if given, the value of keep_last parm will be ignored. Defaults to None.
tags (list, optional): if given, Only the snapshots which have specified tags are considered. Defaults to None.
"""
cmd = ["restic", "forget"]
if tags:
for tag in tags:
cmd.extend(["--tag", tag])
if keep_last and not snapshots: # will be ignored in case if passing snapshot id/s. this is a restic behaviour.
cmd.extend(["--keep-last", str(keep_last)])
if prune:
cmd.append("--prune")
if snapshots:
cmd.extend(snapshots)
self._run_cmd(cmd)
def _get_script_path(self, path):
return os.path.join(path, f"{self.instance_name}_restic_cron")
def _get_crons_jobs(self):
proc = subprocess.run(["crontab", "-l"], stderr=subprocess.DEVNULL, stdout=subprocess.PIPE)
return proc.stdout.decode()
def auto_backup(self, path, keep_last=20):
"""Runs a cron job that backups the repo and prunes the last specified backups.
Args:
path (str): Local path to backup.
keep_last (int, optional): How many items to keep in every forgot operation. Defaults to 20.
"""
self._check_install("crontab")
script_path = self._get_script_path(path)
cronjobs = self._get_crons_jobs()
if not self.auto_backup_running(path): # Check if cron job already running
cron_script = CRON_SCRIPT.format(repo=self.repo, password=self.password, path=path, keep_last=keep_last)
with open(script_path, "w") as rfd:
rfd.write(cron_script)
cron_cmd = cronjobs + f"0 0 * * * bash {script_path} \n"
proc = subprocess.Popen(["crontab", "-"], stdin=subprocess.PIPE, stderr=subprocess.PIPE)
proc_res = proc.communicate(input=cron_cmd.encode())
if proc.returncode > 0:
raise Runtime(f"Couldn't start cron job, failed with {proc_res[1]}")
def auto_backup_running(self, path):
"""Checks if auto backup for the specified path is running or not
Args:
path (str): Local path to backup in the cron job.
Returns:
bool: Whether it is running or not.
"""
script_path = self._get_script_path(path)
cronjobs = self._get_crons_jobs()
return cronjobs.find(script_path) >= 0
def disable_auto_backup(self, path):
"""Removes cron jon based on the path being backed.
Args:
path (str): Local path to backup in the cron job.
"""
script_path = self._get_script_path(path)
cronjobs = self._get_crons_jobs()
other_crons = []
for cronjob in cronjobs.splitlines():
if script_path not in cronjob:
other_crons.append(cronjob)
proc = subprocess.Popen(["crontab", "-"], stdin=subprocess.PIPE, stderr=subprocess.PIPE)
cron_cmd = "\n".join(other_crons) + "\n"
proc_res = proc.communicate(input=cron_cmd.encode())
if proc.returncode > 0:
raise Runtime(f"Couldn't remove cron job, failed with {proc_res[1]}")
def backup_watchdog_running(self, script_path) -> bool:
"""Watches a cronjob to watch backups using last snapshot time.
Args:
script_path (str): a path to the script to run the cronjob.
Returns:
bool: True if the backup watchdog running otherwise False
"""
script_path = self._get_script_path(script_path)
cronjobs = self._get_crons_jobs()
return cronjobs.find(script_path) >= 0
def start_watch_backup(self, path):
"""Runs a cron job that backups the repo and prunes the last specified backups.
Args:
path (str): Local path to backup.
keep_last (int, optional): How many items to keep in every forgot operation. Defaults to 20.
"""
self._check_install("crontab")
script_path = self._get_script_path(path)
cronjobs = self._get_crons_jobs()
if not self.backup_watchdog_running(path): # Check if cron job already running
cron_script = WATCHDOG_SCRIPT.format(
repo=self.repo,
password=self.password,
path=path,
AWS_SECRET_ACCESS_KEY=self.extra_env.get("AWS_SECRET_ACCESS_KEY"),
AWS_ACCESS_KEY_ID=self.extra_env.get("AWS_ACCESS_KEY_ID"),
THREEBOT_NAME=os.environ.get("THREEBOT_NAME"),
ESCALATION_MAIL=os.environ.get("ESCALATION_MAIL"),
)
with open(script_path, "w") as rfd:
rfd.write(cron_script)
cron_cmd = cronjobs + f"0 0 */2 * * bash {script_path} \n"
proc = subprocess.Popen(["crontab", "-"], stdin=subprocess.PIPE, stderr=subprocess.PIPE)
proc_res = proc.communicate(input=cron_cmd.encode())
if proc.returncode > 0:
raise Runtime(f"Couldn't start cron job, failed with {proc_res[1]}")
Classes
class ResticRepo (*args, **kwargs)
-
A simple attribute-based namespace.
SimpleNamespace(**kwargs)
base class implementation for any class with fields which supports getting/setting raw data for any instance fields.
any instance can have an optional name and a parent.
class Person(Base): name = fields.String() age = fields.Float() p = Person(name="ahmed", age="19") print(p.name, p.age)
Args
parent_
:Base
, optional- parent instance. Defaults to None.
instance_name_
:str
, optional- instance name. Defaults to None.
**values
- any given field values to initiate the instance with
Expand source code
class ResticRepo(Base): repo = fields.String(required=True) password = fields.Secret(required=True) extra_env = fields.Typed(dict, default={}) def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._check_install("restic") self._env = None def _check_install(self, binary): if subprocess.call(["which", binary], stdout=subprocess.DEVNULL): raise NotFound(f"{binary} not installed") @property def env(self): self.validate() self._env = os.environ.copy() self._env.update({"RESTIC_PASSWORD": self.password, "RESTIC_REPOSITORY": self.repo}, **self.extra_env) return self._env def _run_cmd(self, cmd, check=True): proc = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=self.env) if check and proc.returncode: raise Runtime(f"Restic command failed with {proc.stderr.decode()}") return proc def init_repo(self): """Init restic repo with data specified in the instances """ proc = self._run_cmd(["restic", "cat", "config"], False) if proc.returncode > 0: self._run_cmd(["restic", "init"]) def backup(self, path, tags=None, exclude=None): """Backup a path to the repo. Args: path (str or list of str): Local path/s to backup. tags (list): List of tags to set to the backup. exclude (list of str): This instructs restic to exclude files matching a given pattern/s. """ if not path: raise ValueError("Please specify path/s to backup") cmd = ["restic", "backup"] tags = tags or [] for tag in tags: cmd.extend(["--tag", tag]) if exclude: for pattern in exclude: cmd.extend([f"--exclude={pattern}"]) if isinstance(path, list): cmd.extend(path) else: cmd.extend([path]) self._run_cmd(cmd) def restore(self, target_path, snapshot_id=None, latest=True, path=None, host=None, tags=None): """Restores a snapshot. Args: target_path (str): a path to restore to snapshot_id (str, optional): Id of the snapshot. Defaults to None. latest (bool, optional): if True will use latest snapshot. Defaults to True. path (str, optional): Filter on the path when using latest. Defaults to None. host (str, optional): Filter on the hostname when using latest. Defaults to None. tags (list, optional): List of tags to filter on. """ cmd = ["restic", "--target", target_path, "restore"] if snapshot_id: cmd.append(snapshot_id) self._run_cmd(cmd) elif latest: args = ["latest"] if path: args.extend(["--path", path]) if host: args.extend(["--host", host]) if tags: for tag in tags: cmd.extend(["--tag", tag]) self._run_cmd(cmd + args) else: raise ValueError("Please specify either `snapshot_id` or `latest` flag") def list_snapshots(self, tags=None, last=False, path=None): """List all snapshots in the repo. Args: tags (list, optional): a list of tags to filter on. Defaults to None. last (bool): If True will get last snapshot only while respecting the other filters. Defaults to False. path (str): a path to filter on. Defaults to None. Returns list : all snapshots as dicts """ tags = tags or [] cmd = ["restic", "snapshots", "--json"] for tag in tags: cmd.extend(["--tag", tag]) if path: cmd.extend(["--path", path]) if last: cmd.append("--last") proc = self._run_cmd(cmd) return json.loads(proc.stdout) def forget(self, keep_last=10, prune=True, snapshots=None, tags=None): """Remove snapshots and Optionally remove the data that was referenced by those snapshots. During a prune operation, the repository is locked and backups cannot be completed. Args: keep_last (str, optional): How many items to keep. Defaults to 10. prune (bool, optional): Whether to actually remove the data or not. Defaults to True. snapshots (list, optional): a list of specifics snapshot ids to forget. if given, the value of keep_last parm will be ignored. Defaults to None. tags (list, optional): if given, Only the snapshots which have specified tags are considered. Defaults to None. """ cmd = ["restic", "forget"] if tags: for tag in tags: cmd.extend(["--tag", tag]) if keep_last and not snapshots: # will be ignored in case if passing snapshot id/s. this is a restic behaviour. cmd.extend(["--keep-last", str(keep_last)]) if prune: cmd.append("--prune") if snapshots: cmd.extend(snapshots) self._run_cmd(cmd) def _get_script_path(self, path): return os.path.join(path, f"{self.instance_name}_restic_cron") def _get_crons_jobs(self): proc = subprocess.run(["crontab", "-l"], stderr=subprocess.DEVNULL, stdout=subprocess.PIPE) return proc.stdout.decode() def auto_backup(self, path, keep_last=20): """Runs a cron job that backups the repo and prunes the last specified backups. Args: path (str): Local path to backup. keep_last (int, optional): How many items to keep in every forgot operation. Defaults to 20. """ self._check_install("crontab") script_path = self._get_script_path(path) cronjobs = self._get_crons_jobs() if not self.auto_backup_running(path): # Check if cron job already running cron_script = CRON_SCRIPT.format(repo=self.repo, password=self.password, path=path, keep_last=keep_last) with open(script_path, "w") as rfd: rfd.write(cron_script) cron_cmd = cronjobs + f"0 0 * * * bash {script_path} \n" proc = subprocess.Popen(["crontab", "-"], stdin=subprocess.PIPE, stderr=subprocess.PIPE) proc_res = proc.communicate(input=cron_cmd.encode()) if proc.returncode > 0: raise Runtime(f"Couldn't start cron job, failed with {proc_res[1]}") def auto_backup_running(self, path): """Checks if auto backup for the specified path is running or not Args: path (str): Local path to backup in the cron job. Returns: bool: Whether it is running or not. """ script_path = self._get_script_path(path) cronjobs = self._get_crons_jobs() return cronjobs.find(script_path) >= 0 def disable_auto_backup(self, path): """Removes cron jon based on the path being backed. Args: path (str): Local path to backup in the cron job. """ script_path = self._get_script_path(path) cronjobs = self._get_crons_jobs() other_crons = [] for cronjob in cronjobs.splitlines(): if script_path not in cronjob: other_crons.append(cronjob) proc = subprocess.Popen(["crontab", "-"], stdin=subprocess.PIPE, stderr=subprocess.PIPE) cron_cmd = "\n".join(other_crons) + "\n" proc_res = proc.communicate(input=cron_cmd.encode()) if proc.returncode > 0: raise Runtime(f"Couldn't remove cron job, failed with {proc_res[1]}") def backup_watchdog_running(self, script_path) -> bool: """Watches a cronjob to watch backups using last snapshot time. Args: script_path (str): a path to the script to run the cronjob. Returns: bool: True if the backup watchdog running otherwise False """ script_path = self._get_script_path(script_path) cronjobs = self._get_crons_jobs() return cronjobs.find(script_path) >= 0 def start_watch_backup(self, path): """Runs a cron job that backups the repo and prunes the last specified backups. Args: path (str): Local path to backup. keep_last (int, optional): How many items to keep in every forgot operation. Defaults to 20. """ self._check_install("crontab") script_path = self._get_script_path(path) cronjobs = self._get_crons_jobs() if not self.backup_watchdog_running(path): # Check if cron job already running cron_script = WATCHDOG_SCRIPT.format( repo=self.repo, password=self.password, path=path, AWS_SECRET_ACCESS_KEY=self.extra_env.get("AWS_SECRET_ACCESS_KEY"), AWS_ACCESS_KEY_ID=self.extra_env.get("AWS_ACCESS_KEY_ID"), THREEBOT_NAME=os.environ.get("THREEBOT_NAME"), ESCALATION_MAIL=os.environ.get("ESCALATION_MAIL"), ) with open(script_path, "w") as rfd: rfd.write(cron_script) cron_cmd = cronjobs + f"0 0 */2 * * bash {script_path} \n" proc = subprocess.Popen(["crontab", "-"], stdin=subprocess.PIPE, stderr=subprocess.PIPE) proc_res = proc.communicate(input=cron_cmd.encode()) if proc.returncode > 0: raise Runtime(f"Couldn't start cron job, failed with {proc_res[1]}")
Ancestors
- Base
- types.SimpleNamespace
Instance variables
var env
-
Expand source code
@property def env(self): self.validate() self._env = os.environ.copy() self._env.update({"RESTIC_PASSWORD": self.password, "RESTIC_REPOSITORY": self.repo}, **self.extra_env) return self._env
var extra_env
-
getter method this property
will call
_get_value
, which would if the value is already defined and will get the default value if notReturns
any
- the field value
Expand source code
def getter(self): """ getter method this property will call `_get_value`, which would if the value is already defined and will get the default value if not Returns: any: the field value """ return self._get_value(name, field)
var password
-
getter method this property
will call
_get_value
, which would if the value is already defined and will get the default value if notReturns
any
- the field value
Expand source code
def getter(self): """ getter method this property will call `_get_value`, which would if the value is already defined and will get the default value if not Returns: any: the field value """ return self._get_value(name, field)
var repo
-
getter method this property
will call
_get_value
, which would if the value is already defined and will get the default value if notReturns
any
- the field value
Expand source code
def getter(self): """ getter method this property will call `_get_value`, which would if the value is already defined and will get the default value if not Returns: any: the field value """ return self._get_value(name, field)
Methods
def auto_backup(self, path, keep_last=20)
-
Runs a cron job that backups the repo and prunes the last specified backups.
Args
path
:str
- Local path to backup.
keep_last
:int
, optional- How many items to keep in every forgot operation. Defaults to 20.
Expand source code
def auto_backup(self, path, keep_last=20): """Runs a cron job that backups the repo and prunes the last specified backups. Args: path (str): Local path to backup. keep_last (int, optional): How many items to keep in every forgot operation. Defaults to 20. """ self._check_install("crontab") script_path = self._get_script_path(path) cronjobs = self._get_crons_jobs() if not self.auto_backup_running(path): # Check if cron job already running cron_script = CRON_SCRIPT.format(repo=self.repo, password=self.password, path=path, keep_last=keep_last) with open(script_path, "w") as rfd: rfd.write(cron_script) cron_cmd = cronjobs + f"0 0 * * * bash {script_path} \n" proc = subprocess.Popen(["crontab", "-"], stdin=subprocess.PIPE, stderr=subprocess.PIPE) proc_res = proc.communicate(input=cron_cmd.encode()) if proc.returncode > 0: raise Runtime(f"Couldn't start cron job, failed with {proc_res[1]}")
def auto_backup_running(self, path)
-
Checks if auto backup for the specified path is running or not
Args
path
:str
- Local path to backup in the cron job.
Returns
bool
- Whether it is running or not.
Expand source code
def auto_backup_running(self, path): """Checks if auto backup for the specified path is running or not Args: path (str): Local path to backup in the cron job. Returns: bool: Whether it is running or not. """ script_path = self._get_script_path(path) cronjobs = self._get_crons_jobs() return cronjobs.find(script_path) >= 0
def backup(self, path, tags=None, exclude=None)
-
Backup a path to the repo.
Args
path
:str
orlist
ofstr
- Local path/s to backup.
tags
:list
- List of tags to set to the backup.
exclude
:list
ofstr
- This instructs restic to exclude files matching a given pattern/s.
Expand source code
def backup(self, path, tags=None, exclude=None): """Backup a path to the repo. Args: path (str or list of str): Local path/s to backup. tags (list): List of tags to set to the backup. exclude (list of str): This instructs restic to exclude files matching a given pattern/s. """ if not path: raise ValueError("Please specify path/s to backup") cmd = ["restic", "backup"] tags = tags or [] for tag in tags: cmd.extend(["--tag", tag]) if exclude: for pattern in exclude: cmd.extend([f"--exclude={pattern}"]) if isinstance(path, list): cmd.extend(path) else: cmd.extend([path]) self._run_cmd(cmd)
def backup_watchdog_running(self, script_path) ‑> bool
-
Watches a cronjob to watch backups using last snapshot time.
Args
script_path
:str
- a path to the script to run the cronjob.
Returns
bool
- True if the backup watchdog running otherwise False
Expand source code
def backup_watchdog_running(self, script_path) -> bool: """Watches a cronjob to watch backups using last snapshot time. Args: script_path (str): a path to the script to run the cronjob. Returns: bool: True if the backup watchdog running otherwise False """ script_path = self._get_script_path(script_path) cronjobs = self._get_crons_jobs() return cronjobs.find(script_path) >= 0
def disable_auto_backup(self, path)
-
Removes cron jon based on the path being backed.
Args
path
:str
- Local path to backup in the cron job.
Expand source code
def disable_auto_backup(self, path): """Removes cron jon based on the path being backed. Args: path (str): Local path to backup in the cron job. """ script_path = self._get_script_path(path) cronjobs = self._get_crons_jobs() other_crons = [] for cronjob in cronjobs.splitlines(): if script_path not in cronjob: other_crons.append(cronjob) proc = subprocess.Popen(["crontab", "-"], stdin=subprocess.PIPE, stderr=subprocess.PIPE) cron_cmd = "\n".join(other_crons) + "\n" proc_res = proc.communicate(input=cron_cmd.encode()) if proc.returncode > 0: raise Runtime(f"Couldn't remove cron job, failed with {proc_res[1]}")
def forget(self, keep_last=10, prune=True, snapshots=None, tags=None)
-
Remove snapshots and Optionally remove the data that was referenced by those snapshots. During a prune operation, the repository is locked and backups cannot be completed.
Args
keep_last
:str
, optional- How many items to keep. Defaults to 10.
prune
:bool
, optional- Whether to actually remove the data or not. Defaults to True.
snapshots
:list
, optional- a list of specifics snapshot ids to forget. if given, the value of keep_last parm will be ignored. Defaults to None.
tags
:list
, optional- if given, Only the snapshots which have specified tags are considered. Defaults to None.
Expand source code
def forget(self, keep_last=10, prune=True, snapshots=None, tags=None): """Remove snapshots and Optionally remove the data that was referenced by those snapshots. During a prune operation, the repository is locked and backups cannot be completed. Args: keep_last (str, optional): How many items to keep. Defaults to 10. prune (bool, optional): Whether to actually remove the data or not. Defaults to True. snapshots (list, optional): a list of specifics snapshot ids to forget. if given, the value of keep_last parm will be ignored. Defaults to None. tags (list, optional): if given, Only the snapshots which have specified tags are considered. Defaults to None. """ cmd = ["restic", "forget"] if tags: for tag in tags: cmd.extend(["--tag", tag]) if keep_last and not snapshots: # will be ignored in case if passing snapshot id/s. this is a restic behaviour. cmd.extend(["--keep-last", str(keep_last)]) if prune: cmd.append("--prune") if snapshots: cmd.extend(snapshots) self._run_cmd(cmd)
def init_repo(self)
-
Init restic repo with data specified in the instances
Expand source code
def init_repo(self): """Init restic repo with data specified in the instances """ proc = self._run_cmd(["restic", "cat", "config"], False) if proc.returncode > 0: self._run_cmd(["restic", "init"])
def list_snapshots(self, tags=None, last=False, path=None)
-
List all snapshots in the repo.
Args
tags
:list
, optional- a list of tags to filter on. Defaults to None.
last
:bool
- If True will get last snapshot only while respecting the other filters. Defaults to False.
path
:str
- a path to filter on. Defaults to None.
Returns list : all snapshots as dicts
Expand source code
def list_snapshots(self, tags=None, last=False, path=None): """List all snapshots in the repo. Args: tags (list, optional): a list of tags to filter on. Defaults to None. last (bool): If True will get last snapshot only while respecting the other filters. Defaults to False. path (str): a path to filter on. Defaults to None. Returns list : all snapshots as dicts """ tags = tags or [] cmd = ["restic", "snapshots", "--json"] for tag in tags: cmd.extend(["--tag", tag]) if path: cmd.extend(["--path", path]) if last: cmd.append("--last") proc = self._run_cmd(cmd) return json.loads(proc.stdout)
def restore(self, target_path, snapshot_id=None, latest=True, path=None, host=None, tags=None)
-
Restores a snapshot.
Args
target_path
:str
- a path to restore to
snapshot_id
:str
, optional- Id of the snapshot. Defaults to None.
latest
:bool
, optional- if True will use latest snapshot. Defaults to True.
path
:str
, optional- Filter on the path when using latest. Defaults to None.
host
:str
, optional- Filter on the hostname when using latest. Defaults to None.
tags
:list
, optional- List of tags to filter on.
Expand source code
def restore(self, target_path, snapshot_id=None, latest=True, path=None, host=None, tags=None): """Restores a snapshot. Args: target_path (str): a path to restore to snapshot_id (str, optional): Id of the snapshot. Defaults to None. latest (bool, optional): if True will use latest snapshot. Defaults to True. path (str, optional): Filter on the path when using latest. Defaults to None. host (str, optional): Filter on the hostname when using latest. Defaults to None. tags (list, optional): List of tags to filter on. """ cmd = ["restic", "--target", target_path, "restore"] if snapshot_id: cmd.append(snapshot_id) self._run_cmd(cmd) elif latest: args = ["latest"] if path: args.extend(["--path", path]) if host: args.extend(["--host", host]) if tags: for tag in tags: cmd.extend(["--tag", tag]) self._run_cmd(cmd + args) else: raise ValueError("Please specify either `snapshot_id` or `latest` flag")
def start_watch_backup(self, path)
-
Runs a cron job that backups the repo and prunes the last specified backups.
Args
path
:str
- Local path to backup.
keep_last
:int
, optional- How many items to keep in every forgot operation. Defaults to 20.
Expand source code
def start_watch_backup(self, path): """Runs a cron job that backups the repo and prunes the last specified backups. Args: path (str): Local path to backup. keep_last (int, optional): How many items to keep in every forgot operation. Defaults to 20. """ self._check_install("crontab") script_path = self._get_script_path(path) cronjobs = self._get_crons_jobs() if not self.backup_watchdog_running(path): # Check if cron job already running cron_script = WATCHDOG_SCRIPT.format( repo=self.repo, password=self.password, path=path, AWS_SECRET_ACCESS_KEY=self.extra_env.get("AWS_SECRET_ACCESS_KEY"), AWS_ACCESS_KEY_ID=self.extra_env.get("AWS_ACCESS_KEY_ID"), THREEBOT_NAME=os.environ.get("THREEBOT_NAME"), ESCALATION_MAIL=os.environ.get("ESCALATION_MAIL"), ) with open(script_path, "w") as rfd: rfd.write(cron_script) cron_cmd = cronjobs + f"0 0 */2 * * bash {script_path} \n" proc = subprocess.Popen(["crontab", "-"], stdin=subprocess.PIPE, stderr=subprocess.PIPE) proc_res = proc.communicate(input=cron_cmd.encode()) if proc.returncode > 0: raise Runtime(f"Couldn't start cron job, failed with {proc_res[1]}")
Inherited members