Module jumpscale.tools.syncer

Module to help syncing multiple machines with specific directories you have. used in the jsync tool.

JS-NG> xmonader = j.clients.sshkey.new("xmonader")
JS-NG> xmonader.private_key_path = "/home/xmonader/.ssh/id_rsa"
JS-NG> xmonader.load_from_file_system()
JS-NG> xmonader.save()
JS-NG> xmonader = j.clients.sshclient.new("xmonader")
JS-NG> xmonader.sshkey = "xmonader"
JS-NG> s = j.tools.syncer.Syncer(["xmonader"], {"/home/xmonader/wspace/tfchain-py":"/tmp/tfchain-py"})
JS-NG> s.start()
2019-09-03T11:38:47.183394+0200 - paths: {'/home/xmonader/wspace/tfchain-py': '/tmp/tfchain-py'}
Expand source code
"""Module to help syncing multiple machines with specific directories you have.
used in the jsync tool.
```
JS-NG> xmonader = j.clients.sshkey.new("xmonader")
JS-NG> xmonader.private_key_path = "/home/xmonader/.ssh/id_rsa"
JS-NG> xmonader.load_from_file_system()
JS-NG> xmonader.save()
JS-NG> xmonader = j.clients.sshclient.new("xmonader")
JS-NG> xmonader.sshkey = "xmonader"
JS-NG> s = j.tools.syncer.Syncer(["xmonader"], {"/home/xmonader/wspace/tfchain-py":"/tmp/tfchain-py"})
JS-NG> s.start()
2019-09-03T11:38:47.183394+0200 - paths: {'/home/xmonader/wspace/tfchain-py': '/tmp/tfchain-py'}
```
"""

from jumpscale.loader import j
from watchdog.events import PatternMatchingEventHandler
from watchdog.observers import Observer
import gevent
from typing import List, Dict, Optional

DEFAULT_IGNORED_PATTERNS = [".git", ".pyc", "__pycache__", ".swp", ".swx"]


class Syncer(PatternMatchingEventHandler):
    def __init__(
        self,
        sshclients_names: List[str],
        paths: Dict[str, str],
        patterns: Optional[List[str]] = None,
        ignore_patterns: Optional[List[str]] = None,
        ignore_directories: Optional[List[str]] = False,
        case_sensitive: bool = True,
    ):
        """Creates new syncer tool

        Arguments:
            sshclients_names {List[str]} -- list of sshclient names
            paths {Dict[str, str]} -- paths to watch src/dest form of dict {'/tmp/myproj':'/root/proj'}

        Keyword Arguments:
            patterns {Optional[List[str]]} -- optional list of patterns to watch (default: {None})
            ignore_patterns {Optional[List[str]]} -- patterns to ignore, e.g .git, __pycache__ (default: {None})
            ignore_directories {Optional[List[str]]} -- directories to ignore (default: {False})
            case_sensitive {bool} -- case sensitive watching  (default: {True})

        Returns:
            Syncer -- Syncer object
        """
        ignore_patterns = ignore_patterns or DEFAULT_IGNORED_PATTERNS
        super().__init__(patterns, ignore_patterns, ignore_directories, case_sensitive)
        self.observer = Observer()
        self.sshclients_names = sshclients_names
        self.paths = paths or {}  # src:dst

    def _get_dest_path(self, src_path: str) -> str:
        """returns destination path in remote machine

        Arguments:
            src_path {str} -- path in source machine

        Returns:
            str -- path in remote machine
        """
        j.logger.debug(f"paths: {self.paths} and path: {src_path}")

        for path in self.paths.keys():
            if path.startswith(src_path):
                return self.paths[src_path]

    def _rewrite_path_for_dest(self, src_path: str) -> str:
        """rewrite src_path to remote_path
        e.g
            local: /tmp/myproj/file.py
            remote: /root/myproj/file.py

        Arguments:
            src_path {str} -- source machine path

        Returns:
            str -- rewritten path for remote
        """
        src_path = str(src_path)
        for path in self.paths.keys():
            if src_path.startswith(path):
                return src_path.replace(path, self.paths[path])

    def _get_sshclients(self):
        """Returns list of sshclient objects.

        Returns:
            List[SSHClient] -- list of ssh clients
        """
        clients = []
        for name in self.sshclients_names:
            clients.append(j.clients.sshclient.get(name))
        return clients

    def sync(self):
        """Sync directory structure and files"""
        j.logger.debug(f"paths: {self.paths}")

        def ensure_dirs():
            """For every directory in watched paths we make sure it's full path exists on remote."""
            for path in self.paths:
                for src_dir in j.sals.fs.walk_dirs(path):
                    dest_dir = str(self._rewrite_path_for_dest(src_dir))
                    for cl in self._get_sshclients():
                        j.logger.debug(f"making dir {dest_dir}")
                        cl.sshclient.run(f"mkdir -p {dest_dir}")
                        self.observer.schedule(self, src_dir)

        def sync_file(e):
            """Sync single file to all registered sshclients

            Arguments:
                e {str} -- file path
            """
            dest_path = self._rewrite_path_for_dest(e)
            j.logger.debug(f"syncing {e} to machines into {dest_path}")

            for cl in self._get_sshclients():
                cl.sshclient.run(f"mkdir -p {j.sals.fs.parent(dest_path)}")
                cl.sshclient.sftp.put(e, self._rewrite_path_for_dest(e))

        def filter_ignored(e):
            return True

        ensure_dirs()

        for path in self.paths:
            for f in j.sals.fs.walk_files(path, sync_file):
                sync_file(f)

    def start(self, sync=True):
        """Start syncing/watching paths to remote machines

        Keyword Arguments:
            sync {bool} -- sync dirs/files first (default: {True})
        """
        if sync:
            self.sync()

        for path in self.paths.keys():
            self.observer.schedule(self, path)

        self.observer.start()
        try:
            while True:
                gevent.sleep(0.1)
        except KeyboardInterrupt:
            self.observer.unschedule_all()
            self.observer.stop()
        self.observer.join()

    def on_moved(self, event):
        super().on_moved(event)

        what = "directory" if event.is_directory else "file"
        j.logger.info(f"Moved {what}: from {event.src_path} to {event.dest_path}")
        dest_path = self._rewrite_path_for_dest(event.dest_path)
        j.logger.debug(f"will move to {dest_path}")
        j.logger.debug(f"will delete original in {self._rewrite_path_for_dest(event.src_path)}")
        for cl in self._get_sshclients():
            if not event.is_directory:
                try:
                    # in case file is moved
                    cl.sshclient.sftp.put(event.dest_path, dest_path)
                    cl.sshclient.run(f"rm {self._rewrite_path_for_dest(event.src_path)}")
                except:
                    j.logger.debug(f"Ignoring {dest_path}. Path was not found during move event")
            else:
                # in case file is directory
                cl.sshclient.sftp.posix_rename(self._rewrite_path_for_dest(event.src_path), dest_path)

    def on_created(self, event):
        super().on_created(event)
        what = "directory" if event.is_directory else "file"
        j.logger.debug(f"Created {what}: {event.src_path}")

        dest_path = self._rewrite_path_for_dest(event.src_path)
        j.logger.debug(f"will create in {dest_path}")

        for cl in self._get_sshclients():
            if what == "directory":
                cl.sshclient.run(f"mkdir -p {dest_path}")
                self.observer.schedule(self, event.src_path)
            else:
                try:
                    cl.sshclient.run(f"mkdir -p {j.sals.fs.parent(dest_path)}")
                    cl.sshclient.run(f"touch {dest_path}")
                except:
                    j.logger.debug(f"Ignoring {dest_path}. Path was not found during create event")

    def on_deleted(self, event):
        super().on_deleted(event)

        what = "directory" if event.is_directory else "file"
        j.logger.debug(f"Deleted {what}: {event.src_path}")

        dest_path = self._rewrite_path_for_dest(event.src_path)
        j.logger.debug(f"will delete in {dest_path}")
        for cl in self._get_sshclients():
            if what == "directory":
                cl.sshclient.run(f"rm -rf {dest_path}")
            else:
                try:
                    cl.sshclient.run(f"rm {dest_path}")
                except:
                    j.logger.debug(f"Ignoring {dest_path}. Path was not found during delete event")

    def on_modified(self, event):
        super().on_modified(event)
        what = "directory" if event.is_directory else "file"
        j.logger.debug(f"Modified {what}: {event.src_path}")

        dest_path = self._rewrite_path_for_dest(event.src_path)
        j.logger.debug(f"will modify in {dest_path}")

        for cl in self._get_sshclients():
            if what == "directory":
                j.logger.debug(f"Folder {dest_path} was modified")
            else:
                try:
                    cl.sshclient.sftp.put(event.src_path, dest_path)
                except:
                    j.logger.debug(f"Ignoring {dest_path}. Path was not found during modify event")

Classes

class Syncer (sshclients_names: List[str], paths: Dict[str, str], patterns: Optional[List[str]] = None, ignore_patterns: Optional[List[str]] = None, ignore_directories: Optional[List[str]] = False, case_sensitive: bool = True)

Matches given patterns with file paths associated with occurring events.

Creates new syncer tool

Arguments

sshclients_names {List[str]} – list of sshclient names paths {Dict[str, str]} – paths to watch src/dest form of dict {'/tmp/myproj':'/root/proj'}

Keyword Arguments: patterns {Optional[List[str]]} – optional list of patterns to watch (default: {None}) ignore_patterns {Optional[List[str]]} – patterns to ignore, e.g .git, pycache (default: {None}) ignore_directories {Optional[List[str]]} – directories to ignore (default: {False}) case_sensitive {bool} – case sensitive watching (default: {True})

Returns

Syncer – Syncer object

Expand source code
class Syncer(PatternMatchingEventHandler):
    def __init__(
        self,
        sshclients_names: List[str],
        paths: Dict[str, str],
        patterns: Optional[List[str]] = None,
        ignore_patterns: Optional[List[str]] = None,
        ignore_directories: Optional[List[str]] = False,
        case_sensitive: bool = True,
    ):
        """Creates new syncer tool

        Arguments:
            sshclients_names {List[str]} -- list of sshclient names
            paths {Dict[str, str]} -- paths to watch src/dest form of dict {'/tmp/myproj':'/root/proj'}

        Keyword Arguments:
            patterns {Optional[List[str]]} -- optional list of patterns to watch (default: {None})
            ignore_patterns {Optional[List[str]]} -- patterns to ignore, e.g .git, __pycache__ (default: {None})
            ignore_directories {Optional[List[str]]} -- directories to ignore (default: {False})
            case_sensitive {bool} -- case sensitive watching  (default: {True})

        Returns:
            Syncer -- Syncer object
        """
        ignore_patterns = ignore_patterns or DEFAULT_IGNORED_PATTERNS
        super().__init__(patterns, ignore_patterns, ignore_directories, case_sensitive)
        self.observer = Observer()
        self.sshclients_names = sshclients_names
        self.paths = paths or {}  # src:dst

    def _get_dest_path(self, src_path: str) -> str:
        """returns destination path in remote machine

        Arguments:
            src_path {str} -- path in source machine

        Returns:
            str -- path in remote machine
        """
        j.logger.debug(f"paths: {self.paths} and path: {src_path}")

        for path in self.paths.keys():
            if path.startswith(src_path):
                return self.paths[src_path]

    def _rewrite_path_for_dest(self, src_path: str) -> str:
        """rewrite src_path to remote_path
        e.g
            local: /tmp/myproj/file.py
            remote: /root/myproj/file.py

        Arguments:
            src_path {str} -- source machine path

        Returns:
            str -- rewritten path for remote
        """
        src_path = str(src_path)
        for path in self.paths.keys():
            if src_path.startswith(path):
                return src_path.replace(path, self.paths[path])

    def _get_sshclients(self):
        """Returns list of sshclient objects.

        Returns:
            List[SSHClient] -- list of ssh clients
        """
        clients = []
        for name in self.sshclients_names:
            clients.append(j.clients.sshclient.get(name))
        return clients

    def sync(self):
        """Sync directory structure and files"""
        j.logger.debug(f"paths: {self.paths}")

        def ensure_dirs():
            """For every directory in watched paths we make sure it's full path exists on remote."""
            for path in self.paths:
                for src_dir in j.sals.fs.walk_dirs(path):
                    dest_dir = str(self._rewrite_path_for_dest(src_dir))
                    for cl in self._get_sshclients():
                        j.logger.debug(f"making dir {dest_dir}")
                        cl.sshclient.run(f"mkdir -p {dest_dir}")
                        self.observer.schedule(self, src_dir)

        def sync_file(e):
            """Sync single file to all registered sshclients

            Arguments:
                e {str} -- file path
            """
            dest_path = self._rewrite_path_for_dest(e)
            j.logger.debug(f"syncing {e} to machines into {dest_path}")

            for cl in self._get_sshclients():
                cl.sshclient.run(f"mkdir -p {j.sals.fs.parent(dest_path)}")
                cl.sshclient.sftp.put(e, self._rewrite_path_for_dest(e))

        def filter_ignored(e):
            return True

        ensure_dirs()

        for path in self.paths:
            for f in j.sals.fs.walk_files(path, sync_file):
                sync_file(f)

    def start(self, sync=True):
        """Start syncing/watching paths to remote machines

        Keyword Arguments:
            sync {bool} -- sync dirs/files first (default: {True})
        """
        if sync:
            self.sync()

        for path in self.paths.keys():
            self.observer.schedule(self, path)

        self.observer.start()
        try:
            while True:
                gevent.sleep(0.1)
        except KeyboardInterrupt:
            self.observer.unschedule_all()
            self.observer.stop()
        self.observer.join()

    def on_moved(self, event):
        super().on_moved(event)

        what = "directory" if event.is_directory else "file"
        j.logger.info(f"Moved {what}: from {event.src_path} to {event.dest_path}")
        dest_path = self._rewrite_path_for_dest(event.dest_path)
        j.logger.debug(f"will move to {dest_path}")
        j.logger.debug(f"will delete original in {self._rewrite_path_for_dest(event.src_path)}")
        for cl in self._get_sshclients():
            if not event.is_directory:
                try:
                    # in case file is moved
                    cl.sshclient.sftp.put(event.dest_path, dest_path)
                    cl.sshclient.run(f"rm {self._rewrite_path_for_dest(event.src_path)}")
                except:
                    j.logger.debug(f"Ignoring {dest_path}. Path was not found during move event")
            else:
                # in case file is directory
                cl.sshclient.sftp.posix_rename(self._rewrite_path_for_dest(event.src_path), dest_path)

    def on_created(self, event):
        super().on_created(event)
        what = "directory" if event.is_directory else "file"
        j.logger.debug(f"Created {what}: {event.src_path}")

        dest_path = self._rewrite_path_for_dest(event.src_path)
        j.logger.debug(f"will create in {dest_path}")

        for cl in self._get_sshclients():
            if what == "directory":
                cl.sshclient.run(f"mkdir -p {dest_path}")
                self.observer.schedule(self, event.src_path)
            else:
                try:
                    cl.sshclient.run(f"mkdir -p {j.sals.fs.parent(dest_path)}")
                    cl.sshclient.run(f"touch {dest_path}")
                except:
                    j.logger.debug(f"Ignoring {dest_path}. Path was not found during create event")

    def on_deleted(self, event):
        super().on_deleted(event)

        what = "directory" if event.is_directory else "file"
        j.logger.debug(f"Deleted {what}: {event.src_path}")

        dest_path = self._rewrite_path_for_dest(event.src_path)
        j.logger.debug(f"will delete in {dest_path}")
        for cl in self._get_sshclients():
            if what == "directory":
                cl.sshclient.run(f"rm -rf {dest_path}")
            else:
                try:
                    cl.sshclient.run(f"rm {dest_path}")
                except:
                    j.logger.debug(f"Ignoring {dest_path}. Path was not found during delete event")

    def on_modified(self, event):
        super().on_modified(event)
        what = "directory" if event.is_directory else "file"
        j.logger.debug(f"Modified {what}: {event.src_path}")

        dest_path = self._rewrite_path_for_dest(event.src_path)
        j.logger.debug(f"will modify in {dest_path}")

        for cl in self._get_sshclients():
            if what == "directory":
                j.logger.debug(f"Folder {dest_path} was modified")
            else:
                try:
                    cl.sshclient.sftp.put(event.src_path, dest_path)
                except:
                    j.logger.debug(f"Ignoring {dest_path}. Path was not found during modify event")

Ancestors

  • watchdog.events.PatternMatchingEventHandler
  • watchdog.events.FileSystemEventHandler

Methods

def on_created(self, event)

Called when a file or directory is created.

:param event: Event representing file/directory creation. :type event: :class:DirCreatedEvent or :class:FileCreatedEvent

Expand source code
def on_created(self, event):
    super().on_created(event)
    what = "directory" if event.is_directory else "file"
    j.logger.debug(f"Created {what}: {event.src_path}")

    dest_path = self._rewrite_path_for_dest(event.src_path)
    j.logger.debug(f"will create in {dest_path}")

    for cl in self._get_sshclients():
        if what == "directory":
            cl.sshclient.run(f"mkdir -p {dest_path}")
            self.observer.schedule(self, event.src_path)
        else:
            try:
                cl.sshclient.run(f"mkdir -p {j.sals.fs.parent(dest_path)}")
                cl.sshclient.run(f"touch {dest_path}")
            except:
                j.logger.debug(f"Ignoring {dest_path}. Path was not found during create event")
def on_deleted(self, event)

Called when a file or directory is deleted.

:param event: Event representing file/directory deletion. :type event: :class:DirDeletedEvent or :class:FileDeletedEvent

Expand source code
def on_deleted(self, event):
    super().on_deleted(event)

    what = "directory" if event.is_directory else "file"
    j.logger.debug(f"Deleted {what}: {event.src_path}")

    dest_path = self._rewrite_path_for_dest(event.src_path)
    j.logger.debug(f"will delete in {dest_path}")
    for cl in self._get_sshclients():
        if what == "directory":
            cl.sshclient.run(f"rm -rf {dest_path}")
        else:
            try:
                cl.sshclient.run(f"rm {dest_path}")
            except:
                j.logger.debug(f"Ignoring {dest_path}. Path was not found during delete event")
def on_modified(self, event)

Called when a file or directory is modified.

:param event: Event representing file/directory modification. :type event: :class:DirModifiedEvent or :class:FileModifiedEvent

Expand source code
def on_modified(self, event):
    super().on_modified(event)
    what = "directory" if event.is_directory else "file"
    j.logger.debug(f"Modified {what}: {event.src_path}")

    dest_path = self._rewrite_path_for_dest(event.src_path)
    j.logger.debug(f"will modify in {dest_path}")

    for cl in self._get_sshclients():
        if what == "directory":
            j.logger.debug(f"Folder {dest_path} was modified")
        else:
            try:
                cl.sshclient.sftp.put(event.src_path, dest_path)
            except:
                j.logger.debug(f"Ignoring {dest_path}. Path was not found during modify event")
def on_moved(self, event)

Called when a file or a directory is moved or renamed.

:param event: Event representing file/directory movement. :type event: :class:DirMovedEvent or :class:FileMovedEvent

Expand source code
def on_moved(self, event):
    super().on_moved(event)

    what = "directory" if event.is_directory else "file"
    j.logger.info(f"Moved {what}: from {event.src_path} to {event.dest_path}")
    dest_path = self._rewrite_path_for_dest(event.dest_path)
    j.logger.debug(f"will move to {dest_path}")
    j.logger.debug(f"will delete original in {self._rewrite_path_for_dest(event.src_path)}")
    for cl in self._get_sshclients():
        if not event.is_directory:
            try:
                # in case file is moved
                cl.sshclient.sftp.put(event.dest_path, dest_path)
                cl.sshclient.run(f"rm {self._rewrite_path_for_dest(event.src_path)}")
            except:
                j.logger.debug(f"Ignoring {dest_path}. Path was not found during move event")
        else:
            # in case file is directory
            cl.sshclient.sftp.posix_rename(self._rewrite_path_for_dest(event.src_path), dest_path)
def start(self, sync=True)

Start syncing/watching paths to remote machines

Keyword Arguments: sync {bool} – sync dirs/files first (default: {True})

Expand source code
def start(self, sync=True):
    """Start syncing/watching paths to remote machines

    Keyword Arguments:
        sync {bool} -- sync dirs/files first (default: {True})
    """
    if sync:
        self.sync()

    for path in self.paths.keys():
        self.observer.schedule(self, path)

    self.observer.start()
    try:
        while True:
            gevent.sleep(0.1)
    except KeyboardInterrupt:
        self.observer.unschedule_all()
        self.observer.stop()
    self.observer.join()
def sync(self)

Sync directory structure and files

Expand source code
def sync(self):
    """Sync directory structure and files"""
    j.logger.debug(f"paths: {self.paths}")

    def ensure_dirs():
        """For every directory in watched paths we make sure it's full path exists on remote."""
        for path in self.paths:
            for src_dir in j.sals.fs.walk_dirs(path):
                dest_dir = str(self._rewrite_path_for_dest(src_dir))
                for cl in self._get_sshclients():
                    j.logger.debug(f"making dir {dest_dir}")
                    cl.sshclient.run(f"mkdir -p {dest_dir}")
                    self.observer.schedule(self, src_dir)

    def sync_file(e):
        """Sync single file to all registered sshclients

        Arguments:
            e {str} -- file path
        """
        dest_path = self._rewrite_path_for_dest(e)
        j.logger.debug(f"syncing {e} to machines into {dest_path}")

        for cl in self._get_sshclients():
            cl.sshclient.run(f"mkdir -p {j.sals.fs.parent(dest_path)}")
            cl.sshclient.sftp.put(e, self._rewrite_path_for_dest(e))

    def filter_ignored(e):
        return True

    ensure_dirs()

    for path in self.paths:
        for f in j.sals.fs.walk_files(path, sync_file):
            sync_file(f)