Module jumpscale.tools.git
Git module helps with everything around git management like pulling, cloning .. etc
TODO: examples
Expand source code
"""Git module helps with everything around git management like pulling, cloning .. etc
# TODO: examples
"""
import urllib
import re
from jumpscale.loader import j
SSH_URL_MATCH = "^(git@)(?P<netloc>.*?)(:|/)(?P<path>.*?)/?$"
HTTP_REPO_URL = r"^(https://)?(?P<provider>.+)(?P<suffix>\..+)\/(?P<account>.+)\/(?P<repo>.+)/?$"
def rewrite_git_https_url_to_ssh(url):
"""
:param url: (str) https url to rewrite as ssh
:return (tuple) repo name and rewritten ssh url
"""
parsed_url = urllib.parse.urlparse(url)
if parsed_url.scheme != "https":
raise j.exceptions.Input("not a https url: {}".format(url))
return "git@{}:{}.git".format(parsed_url.netloc, parsed_url.path.lstrip("/"))
def rewrite_git_ssh_url_to_https(url, login=None, passwd=None):
"""
:param url: (str) ssh url to rewrite ass https
:param login: (str) authentication login name
:param passwd: (str) authentication login password
:return (tuple) repo name and rewritten https url
"""
match = re.match(SSH_URL_MATCH, url)
url_data = match.groupdict()
if not match:
raise j.exceptions.Input("not a ssh url: {}".format(url))
if (login or passwd) and not (login and passwd):
raise j.exceptions.Input("Need to specify both login and passwd if either one is specified")
elif login and passwd:
rewrite_url = "https://{login}:{passwd}@{netloc}/{path}".format(**match.groupdict(), login=login, passwd=passwd)
else:
rewrite_url = "https://{netloc}/{path}".format(**url_data)
return rewrite_url
def ensure_repo(url: str, dest="", branch_or_tag="", commit_id="", discard_changes=False, depth=0):
"""Makes sure that repo exists in specified dest with correct branch
Args:
url (str): url of the repo
dest (str, optional): the repo path, if it doesn't exist will clone it. Will get path from url if not specified.
branch_or_tag (str, optional): clone from a specific branch or tag. Defaults to "".
commit_id (str, optional): commit to checkout to. Defaults to "".
discard_changes (bool, optional): Will remove changes in repo if True during pull. Defaults to False.
depth (int, optional): specify the depth of the clone. Defaults to 0.
"""
dest = dest or get_path_from_url(url)
if j.sals.fs.exists(dest):
revision = branch_or_tag or commit_id
pull_repo(dest, discard_changes, revision)
else:
parent_dir = str(j.sals.fs.parent(dest))
clone_repo(url, parent_dir, branch_or_tag, depth)
return dest
def clone_repo(url: str, dest: str, branch_or_tag="", depth=0, commit_id=""):
"""Clones repo with sepcified url at specified dest
Args:
url (str): url of the repo
dest (str): destination to clone in
branch_or_tag (str, optional): clone from a specific branch or tag. Defaults to "".
depth (int, optional): specify the depth of the clone. Defaults to 0.
commit_id (int, optional): commit to checkout to. Defauls to "".
"""
j.sals.fs.mkdirs(dest)
prefix = f"cd {dest} && "
cmd = prefix + f"git clone --single-branch {url}"
if branch_or_tag:
cmd += f" -b {branch_or_tag}"
if depth != 0:
cmd += f" --depth={depth}"
rc, _, err = j.core.executors.run_local(cmd, warn=True)
if rc > 0:
raise j.exceptions.Runtime(f"Error in execute {cmd}\n{err}")
repo_name = j.sals.fs.basename(url).split(".git")[0]
if commit_id:
prefix = f"cd {dest}/{repo_name} && "
checkout_cmd = prefix + f"git checkout {commit_id}"
rc, _, err = j.core.executors.run_local(checkout_cmd, warn=True)
if rc > 0:
raise j.exceptions.Runtime(f"Error in execute {checkout_cmd}\n{err}")
return repo_name
def pull_repo(path: str, discard_changes=False, revision=""):
"""Pull repo at the given path
Args:
path (str): path of the git repo to pull.
discard_changes (bool, optional): Will remove changes in repo if True. Defaults to False.
revision (str, optional): change to specified branch, tag, commit. Defaults to "".
"""
prefix = f"cd {path} && "
if discard_changes:
j.core.executors.run_local(prefix + "git checkout .")
j.core.executors.run_local(prefix + "git pull")
if revision:
j.core.executors.run_local(prefix + f"git checkout {revision}")
def giturl_parse(url):
"""
:return
- repository_url the full url to the repo but rewritten
- relpath: path inside the git repo
- branch: branch specified in the url
example Input
- https://github.com/threefoldtech/jumpscale_/NOS/blob/master/specs/NOS_1.0.0.md
- https://github.com/threefoldtech/jumpscale_/jumpscaleX_core/blob/8.1.2/lib/Jumpscale/tools/docsite/macros/dot.py
- https://github.com/threefoldtech/jumpscale_/jumpscaleX_core/tree/8.2.0/lib/Jumpscale/tools/docsite/macros
- https://github.com/threefoldtech/jumpscale_/jumpscaleX_core/tree/master/lib/Jumpscale/tools/docsite/macros
"""
url_end = None
branch = path = ""
if "tree" in url:
url, url_end = url.split("tree")
elif "blob" in url:
url, url_end = url.split("blob")
if url_end is not None:
url_end = url_end.strip("/")
if url_end.find("/") == -1:
path = ""
branch = url_end
if branch.endswith(".git"):
branch = branch[:-4]
else:
branch, path = url_end.split("/", 1)
if path.endswith(".git"):
path = path[:-4]
return url, path, branch
def get_path_from_url(url: str):
"""returns repo path in the filesystem based on the urk
Args:
url (str): repo url
"""
def _get_path(netloc, path):
return f"{j.core.dirs.CODEDIR}/{netloc.split('.')[0]}/{path.split('.git')[0]}"
match = re.match(SSH_URL_MATCH, url)
if match:
url_data = match.groupdict()
path = _get_path(**url_data)
else:
url = giturl_parse(url)[0]
parsed_url = urllib.parse.urlparse(url)
path = _get_path(parsed_url.netloc, parsed_url.path)
return path
def find(account="", name=""):
"""find all repo paths in j.core.dirs.CODEDIR
Args:
account (str, optional): returns repos under this account only. Defaults to "".
name (str, optional): retuns repos with this name only. Defaults to "".
Returns:
list: list of repos path
"""
if name:
name += "/.git"
def _filter_paths(path):
if f"{account}/{name}" in str(path):
return True
else:
return False
return [
j.sals.fs.parent(path) for path in j.sals.fs.walk(j.core.dirs.CODEDIR, pat="*.git", filter_fun=_filter_paths)
]
def find_git_path(path, die=True):
"""
given a path, check if this path or any of its parents is a git repo, return the first git repo
:param path: (String) path from where to start search
:returns (String) the first path which is a git repo
:raises Exception when no git path can be found
"""
while path != "":
if j.sals.fs.exists(path=j.sals.fs.join_paths(path, ".git")):
return path
path = j.sals.fs.parent(path)
if die:
raise j.exceptions.Input("Cannot find git path in:%s" % path)
def get_latest_remote_tag(repo_path):
"""
retrive the latest tag of a remote upstream repository
Args:
repo_path (str): path to the local git repository
Returns:
str: the latest tag of the remote repository
"""
rc, out, err = j.sals.process.execute(
"git ls-remote --tags --refs | sort -t '/' -k 3 -V | tail -n1 | sed 's/.*\///'", cwd=repo_path
)
if rc != 0:
raise j.exceptions.Runtime(f"Failed to fetch latest remote release. {err}")
latest_remote_tag = out.rstrip("\n")
return latest_remote_tag
Functions
def clone_repo(url: str, dest: str, branch_or_tag='', depth=0, commit_id='')
-
Clones repo with sepcified url at specified dest
Args
url
:str
- url of the repo
dest
:str
- destination to clone in
branch_or_tag
:str
, optional- clone from a specific branch or tag. Defaults to "".
depth
:int
, optional- specify the depth of the clone. Defaults to 0.
commit_id
:int
, optional- commit to checkout to. Defauls to "".
Expand source code
def clone_repo(url: str, dest: str, branch_or_tag="", depth=0, commit_id=""): """Clones repo with sepcified url at specified dest Args: url (str): url of the repo dest (str): destination to clone in branch_or_tag (str, optional): clone from a specific branch or tag. Defaults to "". depth (int, optional): specify the depth of the clone. Defaults to 0. commit_id (int, optional): commit to checkout to. Defauls to "". """ j.sals.fs.mkdirs(dest) prefix = f"cd {dest} && " cmd = prefix + f"git clone --single-branch {url}" if branch_or_tag: cmd += f" -b {branch_or_tag}" if depth != 0: cmd += f" --depth={depth}" rc, _, err = j.core.executors.run_local(cmd, warn=True) if rc > 0: raise j.exceptions.Runtime(f"Error in execute {cmd}\n{err}") repo_name = j.sals.fs.basename(url).split(".git")[0] if commit_id: prefix = f"cd {dest}/{repo_name} && " checkout_cmd = prefix + f"git checkout {commit_id}" rc, _, err = j.core.executors.run_local(checkout_cmd, warn=True) if rc > 0: raise j.exceptions.Runtime(f"Error in execute {checkout_cmd}\n{err}") return repo_name
def ensure_repo(url: str, dest='', branch_or_tag='', commit_id='', discard_changes=False, depth=0)
-
Makes sure that repo exists in specified dest with correct branch
Args
url
:str
- url of the repo
dest
:str
, optional- the repo path, if it doesn't exist will clone it. Will get path from url if not specified.
branch_or_tag
:str
, optional- clone from a specific branch or tag. Defaults to "".
commit_id
:str
, optional- commit to checkout to. Defaults to "".
discard_changes
:bool
, optional- Will remove changes in repo if True during pull. Defaults to False.
depth
:int
, optional- specify the depth of the clone. Defaults to 0.
Expand source code
def ensure_repo(url: str, dest="", branch_or_tag="", commit_id="", discard_changes=False, depth=0): """Makes sure that repo exists in specified dest with correct branch Args: url (str): url of the repo dest (str, optional): the repo path, if it doesn't exist will clone it. Will get path from url if not specified. branch_or_tag (str, optional): clone from a specific branch or tag. Defaults to "". commit_id (str, optional): commit to checkout to. Defaults to "". discard_changes (bool, optional): Will remove changes in repo if True during pull. Defaults to False. depth (int, optional): specify the depth of the clone. Defaults to 0. """ dest = dest or get_path_from_url(url) if j.sals.fs.exists(dest): revision = branch_or_tag or commit_id pull_repo(dest, discard_changes, revision) else: parent_dir = str(j.sals.fs.parent(dest)) clone_repo(url, parent_dir, branch_or_tag, depth) return dest
def find(account='', name='')
-
find all repo paths in j.core.dirs.CODEDIR
Args
account
:str
, optional- returns repos under this account only. Defaults to "".
name
:str
, optional- retuns repos with this name only. Defaults to "".
Returns
list
- list of repos path
Expand source code
def find(account="", name=""): """find all repo paths in j.core.dirs.CODEDIR Args: account (str, optional): returns repos under this account only. Defaults to "". name (str, optional): retuns repos with this name only. Defaults to "". Returns: list: list of repos path """ if name: name += "/.git" def _filter_paths(path): if f"{account}/{name}" in str(path): return True else: return False return [ j.sals.fs.parent(path) for path in j.sals.fs.walk(j.core.dirs.CODEDIR, pat="*.git", filter_fun=_filter_paths) ]
def find_git_path(path, die=True)
-
given a path, check if this path or any of its parents is a git repo, return the first git repo :param path: (String) path from where to start search :returns (String) the first path which is a git repo :raises Exception when no git path can be found
Expand source code
def find_git_path(path, die=True): """ given a path, check if this path or any of its parents is a git repo, return the first git repo :param path: (String) path from where to start search :returns (String) the first path which is a git repo :raises Exception when no git path can be found """ while path != "": if j.sals.fs.exists(path=j.sals.fs.join_paths(path, ".git")): return path path = j.sals.fs.parent(path) if die: raise j.exceptions.Input("Cannot find git path in:%s" % path)
def get_latest_remote_tag(repo_path)
-
retrive the latest tag of a remote upstream repository
Args
repo_path
:str
- path to the local git repository
Returns
str
- the latest tag of the remote repository
Expand source code
def get_latest_remote_tag(repo_path): """ retrive the latest tag of a remote upstream repository Args: repo_path (str): path to the local git repository Returns: str: the latest tag of the remote repository """ rc, out, err = j.sals.process.execute( "git ls-remote --tags --refs | sort -t '/' -k 3 -V | tail -n1 | sed 's/.*\///'", cwd=repo_path ) if rc != 0: raise j.exceptions.Runtime(f"Failed to fetch latest remote release. {err}") latest_remote_tag = out.rstrip("\n") return latest_remote_tag
def get_path_from_url(url: str)
-
returns repo path in the filesystem based on the urk
Args
url
:str
- repo url
Expand source code
def get_path_from_url(url: str): """returns repo path in the filesystem based on the urk Args: url (str): repo url """ def _get_path(netloc, path): return f"{j.core.dirs.CODEDIR}/{netloc.split('.')[0]}/{path.split('.git')[0]}" match = re.match(SSH_URL_MATCH, url) if match: url_data = match.groupdict() path = _get_path(**url_data) else: url = giturl_parse(url)[0] parsed_url = urllib.parse.urlparse(url) path = _get_path(parsed_url.netloc, parsed_url.path) return path
def giturl_parse(url)
-
:return - repository_url the full url to the repo but rewritten - relpath: path inside the git repo - branch: branch specified in the url
example Input - https://github.com/threefoldtech/jumpscale_/NOS/blob/master/specs/NOS_1.0.0.md - https://github.com/threefoldtech/jumpscale_/jumpscaleX_core/blob/8.1.2/lib/Jumpscale/tools/docsite/macros/dot.py - https://github.com/threefoldtech/jumpscale_/jumpscaleX_core/tree/8.2.0/lib/Jumpscale/tools/docsite/macros - https://github.com/threefoldtech/jumpscale_/jumpscaleX_core/tree/master/lib/Jumpscale/tools/docsite/macros
Expand source code
def giturl_parse(url): """ :return - repository_url the full url to the repo but rewritten - relpath: path inside the git repo - branch: branch specified in the url example Input - https://github.com/threefoldtech/jumpscale_/NOS/blob/master/specs/NOS_1.0.0.md - https://github.com/threefoldtech/jumpscale_/jumpscaleX_core/blob/8.1.2/lib/Jumpscale/tools/docsite/macros/dot.py - https://github.com/threefoldtech/jumpscale_/jumpscaleX_core/tree/8.2.0/lib/Jumpscale/tools/docsite/macros - https://github.com/threefoldtech/jumpscale_/jumpscaleX_core/tree/master/lib/Jumpscale/tools/docsite/macros """ url_end = None branch = path = "" if "tree" in url: url, url_end = url.split("tree") elif "blob" in url: url, url_end = url.split("blob") if url_end is not None: url_end = url_end.strip("/") if url_end.find("/") == -1: path = "" branch = url_end if branch.endswith(".git"): branch = branch[:-4] else: branch, path = url_end.split("/", 1) if path.endswith(".git"): path = path[:-4] return url, path, branch
def pull_repo(path: str, discard_changes=False, revision='')
-
Pull repo at the given path
Args
path
:str
- path of the git repo to pull.
discard_changes
:bool
, optional- Will remove changes in repo if True. Defaults to False.
revision
:str
, optional- change to specified branch, tag, commit. Defaults to "".
Expand source code
def pull_repo(path: str, discard_changes=False, revision=""): """Pull repo at the given path Args: path (str): path of the git repo to pull. discard_changes (bool, optional): Will remove changes in repo if True. Defaults to False. revision (str, optional): change to specified branch, tag, commit. Defaults to "". """ prefix = f"cd {path} && " if discard_changes: j.core.executors.run_local(prefix + "git checkout .") j.core.executors.run_local(prefix + "git pull") if revision: j.core.executors.run_local(prefix + f"git checkout {revision}")
def rewrite_git_https_url_to_ssh(url)
-
:param url: (str) https url to rewrite as ssh :return (tuple) repo name and rewritten ssh url
Expand source code
def rewrite_git_https_url_to_ssh(url): """ :param url: (str) https url to rewrite as ssh :return (tuple) repo name and rewritten ssh url """ parsed_url = urllib.parse.urlparse(url) if parsed_url.scheme != "https": raise j.exceptions.Input("not a https url: {}".format(url)) return "git@{}:{}.git".format(parsed_url.netloc, parsed_url.path.lstrip("/"))
def rewrite_git_ssh_url_to_https(url, login=None, passwd=None)
-
:param url: (str) ssh url to rewrite ass https :param login: (str) authentication login name :param passwd: (str) authentication login password :return (tuple) repo name and rewritten https url
Expand source code
def rewrite_git_ssh_url_to_https(url, login=None, passwd=None): """ :param url: (str) ssh url to rewrite ass https :param login: (str) authentication login name :param passwd: (str) authentication login password :return (tuple) repo name and rewritten https url """ match = re.match(SSH_URL_MATCH, url) url_data = match.groupdict() if not match: raise j.exceptions.Input("not a ssh url: {}".format(url)) if (login or passwd) and not (login and passwd): raise j.exceptions.Input("Need to specify both login and passwd if either one is specified") elif login and passwd: rewrite_url = "https://{login}:{passwd}@{netloc}/{path}".format(**match.groupdict(), login=login, passwd=passwd) else: rewrite_url = "https://{netloc}/{path}".format(**url_data) return rewrite_url