Module jumpscale.clients.syncthing.syncthing
Expand source code
from jumpscale.clients.base import Client
from jumpscale.core.base import fields
from jumpscale.loader import j
class SyncthingClient(Client):
    host = fields.String()
    port = fields.Integer()
    apikey = fields.String()
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.__url = None
        self.__session = None
        self.__config = None
    @property
    def url(self):
        if not self.__url:
            self.__url = f"http://{self.host}:{self.port}/rest"
        return self.__url
    @property
    def config(self):
        if not self.__config:
            self.__config = self._call("system/config")
        return self.__config
    @property
    def session(self):
        if not self.__session:
            self.__session = j.tools.http.Session()
            self.__session.headers = {
                "Content-Type": "application/json",
                "User-Agent": "Syncthing Python client",
                "X-API-Key": self.apikey,
            }
        return self.__session
    def _call(self, endpoint, method="get", data=None, return_json=True):
        method_call = getattr(self.session, method)
        response = method_call(f"{self.url}/{endpoint}", json=data)
        response.raise_for_status()
        if return_json:
            return response.json()
        else:
            return response.content
    def restart(self):
        self._call("system/restart", "post")
        if not j.sals.nettools.wait_connection_test(self.host, self.port, timeout=3):
            j.exceptions.Timeout("Server didn't start in 3 seconds")
    def get_status(self):
        return self._call("system/status")
    def get_id(self):
        return self.get_status()["myID"]
    def reload_config(self):
        self.__config = None
    def set_config(self, config=None):
        self._call("system/config", "post", config or self.config, False)
    def get_folders(self):
        return self.config["folders"]
    def get_devices(self):
        return self.config["devices"]
    def _get_folder(self, name):
        for idx, folder in enumerate(self.get_folders()):
            if folder["id"] == name:
                return idx
    def _get_device(self, name):
        for idx, device in enumerate(self.get_devices()):
            if device["name"] == name:
                return idx
    def check_folder(self, name):
        return self._get_folder(name) is not None
    def check_device(self, name):
        return self._get_device(name) is not None
    def delete_folder(self, name):
        folders = self.get_folders()
        idx = self._get_folder(name)
        if idx:
            folders.pop(idx)
            return self.set_config(self.config)
    def delete_device(self, name):
        devices = self.get_devices()
        idx = self._get_device(name)
        if idx:
            devices.pop(idx)
            return self.set_config(self.config)
    def add_folder(
        self, name, path, ignore_perms=False, read_only=False, rescan_intervals=10, devices=None, overwrite=False,
    ):
        folders = self.get_folders()
        idx = self._get_folder(name)
        if idx:
            if overwrite:
                folders.pop(idx)
            else:
                raise j.exceptions.Input(f"Folder with name: {name} already exists")
        devices = devices or []
        my_id = self.get_id()
        if my_id not in devices:
            devices.append(my_id)
        devices = [{"deviceID": device} for device in devices]
        folder = {
            "autoNormalize": False,
            "copiers": 0,
            "devices": devices,
            "hashers": 0,
            "id": name,
            "ignoreDelete": False,
            "ignorePerms": ignore_perms,
            "invalid": "",
            "minDiskFreePct": 5,
            "order": "random",
            "path": path,
            "pullers": 0,
            "readOnly": read_only,
            "rescanIntervalS": rescan_intervals,
            "versioning": {"params": {}, "type": ""},
        }
        folders.append(folder)
        return self.set_config(self.config)
    def add_device(self, name, device_id, introducer=False, compression="always", overwrite=False):
        devices = self.get_devices()
        idx = self._get_device(name)
        if idx:
            if overwrite:
                devices.pop(idx)
            else:
                raise j.exceptions.Input(f"Device with name: {name} already exists")
        device = {
            "addresses": ["dynamic"],
            "certName": "",
            "compression": compression,
            "deviceID": device_id,
            "introducer": introducer,
            "name": name,
        }
        devices.append(device)
        return self.set_config(self.config)
Classes
class SyncthingClient (*args, **kwargs)- 
A simple attribute-based namespace.
SimpleNamespace(**kwargs)
base class implementation for any class with fields which supports getting/setting raw data for any instance fields.
any instance can have an optional name and a parent.
class Person(Base): name = fields.String() age = fields.Float() p = Person(name="ahmed", age="19") print(p.name, p.age)Args
parent_:Base, optional- parent instance. Defaults to None.
 instance_name_:str, optional- instance name. Defaults to None.
 **values- any given field values to initiate the instance with
 
Expand source code
class SyncthingClient(Client): host = fields.String() port = fields.Integer() apikey = fields.String() def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.__url = None self.__session = None self.__config = None @property def url(self): if not self.__url: self.__url = f"http://{self.host}:{self.port}/rest" return self.__url @property def config(self): if not self.__config: self.__config = self._call("system/config") return self.__config @property def session(self): if not self.__session: self.__session = j.tools.http.Session() self.__session.headers = { "Content-Type": "application/json", "User-Agent": "Syncthing Python client", "X-API-Key": self.apikey, } return self.__session def _call(self, endpoint, method="get", data=None, return_json=True): method_call = getattr(self.session, method) response = method_call(f"{self.url}/{endpoint}", json=data) response.raise_for_status() if return_json: return response.json() else: return response.content def restart(self): self._call("system/restart", "post") if not j.sals.nettools.wait_connection_test(self.host, self.port, timeout=3): j.exceptions.Timeout("Server didn't start in 3 seconds") def get_status(self): return self._call("system/status") def get_id(self): return self.get_status()["myID"] def reload_config(self): self.__config = None def set_config(self, config=None): self._call("system/config", "post", config or self.config, False) def get_folders(self): return self.config["folders"] def get_devices(self): return self.config["devices"] def _get_folder(self, name): for idx, folder in enumerate(self.get_folders()): if folder["id"] == name: return idx def _get_device(self, name): for idx, device in enumerate(self.get_devices()): if device["name"] == name: return idx def check_folder(self, name): return self._get_folder(name) is not None def check_device(self, name): return self._get_device(name) is not None def delete_folder(self, name): folders = self.get_folders() idx = self._get_folder(name) if idx: folders.pop(idx) return self.set_config(self.config) def delete_device(self, name): devices = self.get_devices() idx = self._get_device(name) if idx: devices.pop(idx) return self.set_config(self.config) def add_folder( self, name, path, ignore_perms=False, read_only=False, rescan_intervals=10, devices=None, overwrite=False, ): folders = self.get_folders() idx = self._get_folder(name) if idx: if overwrite: folders.pop(idx) else: raise j.exceptions.Input(f"Folder with name: {name} already exists") devices = devices or [] my_id = self.get_id() if my_id not in devices: devices.append(my_id) devices = [{"deviceID": device} for device in devices] folder = { "autoNormalize": False, "copiers": 0, "devices": devices, "hashers": 0, "id": name, "ignoreDelete": False, "ignorePerms": ignore_perms, "invalid": "", "minDiskFreePct": 5, "order": "random", "path": path, "pullers": 0, "readOnly": read_only, "rescanIntervalS": rescan_intervals, "versioning": {"params": {}, "type": ""}, } folders.append(folder) return self.set_config(self.config) def add_device(self, name, device_id, introducer=False, compression="always", overwrite=False): devices = self.get_devices() idx = self._get_device(name) if idx: if overwrite: devices.pop(idx) else: raise j.exceptions.Input(f"Device with name: {name} already exists") device = { "addresses": ["dynamic"], "certName": "", "compression": compression, "deviceID": device_id, "introducer": introducer, "name": name, } devices.append(device) return self.set_config(self.config)Ancestors
Instance variables
var apikey- 
getter method this property
will call
_get_value, which would if the value is already defined and will get the default value if notReturns
any- the field value
 
Expand source code
def getter(self): """ getter method this property will call `_get_value`, which would if the value is already defined and will get the default value if not Returns: any: the field value """ return self._get_value(name, field) var config- 
Expand source code
@property def config(self): if not self.__config: self.__config = self._call("system/config") return self.__config var host- 
getter method this property
will call
_get_value, which would if the value is already defined and will get the default value if notReturns
any- the field value
 
Expand source code
def getter(self): """ getter method this property will call `_get_value`, which would if the value is already defined and will get the default value if not Returns: any: the field value """ return self._get_value(name, field) var port- 
getter method this property
will call
_get_value, which would if the value is already defined and will get the default value if notReturns
any- the field value
 
Expand source code
def getter(self): """ getter method this property will call `_get_value`, which would if the value is already defined and will get the default value if not Returns: any: the field value """ return self._get_value(name, field) var session- 
Expand source code
@property def session(self): if not self.__session: self.__session = j.tools.http.Session() self.__session.headers = { "Content-Type": "application/json", "User-Agent": "Syncthing Python client", "X-API-Key": self.apikey, } return self.__session var url- 
Expand source code
@property def url(self): if not self.__url: self.__url = f"http://{self.host}:{self.port}/rest" return self.__url 
Methods
def add_device(self, name, device_id, introducer=False, compression='always', overwrite=False)- 
Expand source code
def add_device(self, name, device_id, introducer=False, compression="always", overwrite=False): devices = self.get_devices() idx = self._get_device(name) if idx: if overwrite: devices.pop(idx) else: raise j.exceptions.Input(f"Device with name: {name} already exists") device = { "addresses": ["dynamic"], "certName": "", "compression": compression, "deviceID": device_id, "introducer": introducer, "name": name, } devices.append(device) return self.set_config(self.config) def add_folder(self, name, path, ignore_perms=False, read_only=False, rescan_intervals=10, devices=None, overwrite=False)- 
Expand source code
def add_folder( self, name, path, ignore_perms=False, read_only=False, rescan_intervals=10, devices=None, overwrite=False, ): folders = self.get_folders() idx = self._get_folder(name) if idx: if overwrite: folders.pop(idx) else: raise j.exceptions.Input(f"Folder with name: {name} already exists") devices = devices or [] my_id = self.get_id() if my_id not in devices: devices.append(my_id) devices = [{"deviceID": device} for device in devices] folder = { "autoNormalize": False, "copiers": 0, "devices": devices, "hashers": 0, "id": name, "ignoreDelete": False, "ignorePerms": ignore_perms, "invalid": "", "minDiskFreePct": 5, "order": "random", "path": path, "pullers": 0, "readOnly": read_only, "rescanIntervalS": rescan_intervals, "versioning": {"params": {}, "type": ""}, } folders.append(folder) return self.set_config(self.config) def check_device(self, name)- 
Expand source code
def check_device(self, name): return self._get_device(name) is not None def check_folder(self, name)- 
Expand source code
def check_folder(self, name): return self._get_folder(name) is not None def delete_device(self, name)- 
Expand source code
def delete_device(self, name): devices = self.get_devices() idx = self._get_device(name) if idx: devices.pop(idx) return self.set_config(self.config) def delete_folder(self, name)- 
Expand source code
def delete_folder(self, name): folders = self.get_folders() idx = self._get_folder(name) if idx: folders.pop(idx) return self.set_config(self.config) def get_devices(self)- 
Expand source code
def get_devices(self): return self.config["devices"] def get_folders(self)- 
Expand source code
def get_folders(self): return self.config["folders"] def get_id(self)- 
Expand source code
def get_id(self): return self.get_status()["myID"] def get_status(self)- 
Expand source code
def get_status(self): return self._call("system/status") def reload_config(self)- 
Expand source code
def reload_config(self): self.__config = None def restart(self)- 
Expand source code
def restart(self): self._call("system/restart", "post") if not j.sals.nettools.wait_connection_test(self.host, self.port, timeout=3): j.exceptions.Timeout("Server didn't start in 3 seconds") def set_config(self, config=None)- 
Expand source code
def set_config(self, config=None): self._call("system/config", "post", config or self.config, False) 
Inherited members