Module jumpscale.clients.github.repo
Expand source code
import base64
import copy
import threading
# import collections
import urllib
from gevent import sleep
from jumpscale.clients.base import Client
from jumpscale.core.base import Base, fields
from jumpscale.loader import j
from github.GithubException import UnknownObjectException
from .base import replacelabels
from .helper import retry
from .issue import Issue
from .milestone import RepoMilestone
class GithubRepo:
TYPES = ["story", "ticket", "task", "bug", "feature", "question", "monitor", "unknown"]
PRIORITIES = ["critical", "urgent", "normal", "minor"]
STATES = ["new", "accepted", "question", "inprogress", "verification", "closed"]
def __init__(self, client, fullname):
self.client = client
self.fullname = fullname
self._repoclient = None
self._labels = None
self._issues = None
self._lock = threading.RLock()
self._milestones = None
def _log_info(self, s):
pass
@property
def api(self):
if self._repoclient is None:
self._repoclient = self.client.get_repo(self.fullname)
return self._repoclient
@property
def name(self):
return self.fullname.split("/", 1)[-1]
@property
def type(self):
if self.name in ["home"]:
return "home"
elif self.name.startswith("proj"):
return "proj"
elif self.name.startswith("org_"):
return "org"
elif self.name.startswith("www"):
return "www"
elif self.name.startswith("doc"):
return "doc"
elif self.name.startswith("cockpit"):
return "cockpit"
else:
return "code"
@property
def labelnames(self):
return [item.name for item in self.labels]
@property
def labels(self):
with self._lock:
if self._labels is None:
self._labels = [item for item in self.api.get_labels()]
return self._labels
@property
def branches(self):
"""list of `Branch` objects"""
return list(self.api.get_branches())
@property
def stories(self):
# walk overall issues find the stories (based on type)
# only for home type repo, otherwise return []
return self.issues_by_type("story")
@property
def tasks(self):
# walk overall issues find the stories (based on type)
# only for home type repo, otherwise return []
return self.issues_by_type("task")
def labelsSet(self, labels2set, ignoreDelete=["p_"], delete=True):
"""
@param ignore all labels starting with ignore will not be deleted
"""
for item in labels2set:
if not isinstance(item, str):
raise Exception("Labels to set need to be in string format, found:%s" % labels2set)
# walk over github existing labels
labelstowalk = copy.copy(self.labels)
for item in labelstowalk:
name = item.name.lower()
if name not in labels2set:
# label in repo does not correspond to label we need
if name in replacelabels:
nameNew = replacelabels[item.name.lower()]
if nameNew not in self.labelnames:
color = self.get_color(name)
self._log_info(
"change label in repo: %s oldlabel:'%s' to:'%s' color:%s"
% (self.fullname, item.name, nameNew, color)
)
item.edit(nameNew, color)
self._labels = None
else:
# no replacement
name = "type_unknown"
color = self.get_color(name)
try:
item.edit(name, color)
except BaseException:
item.delete()
self._labels = None
# walk over new labels we need to set
for name in labels2set:
if name not in self.labelnames:
# does not exist yet in repo
color = self.get_color(name)
self._log_info("create label: %s %s %s" % (self.fullname, name, color))
self.api.create_label(name, color)
self._labels = None
name = ""
if delete:
labelstowalk = copy.copy(self.labels)
for item in labelstowalk:
if item.name not in labels2set:
self._log_info("delete label: %s %s" % (self.fullname, item.name))
ignoreDeleteDo = False
for filteritem in ignoreDelete:
if item.name.startswith(filteritem):
ignoreDeleteDo = True
if ignoreDeleteDo is False:
item.delete()
self._labels = None
# check the colors
labelstowalk = copy.copy(self.labels)
for item in labelstowalk:
# we recognise the label
self._log_info("check color of repo:%s labelname:'%s'" % (self.fullname, item.name))
color = self.get_color(item.name)
if item.color != color:
self._log_info("change label color for repo %s %s" % (item.name, color))
item.edit(item.name, color)
self._labels = None
def getlabel(self, name):
for item in self.labels:
self._log_info("%s:look for name:'%s'" % (item.name, name))
if item.name == name:
return item
raise Exception("Dit not find label: '%s'" % name)
def get_issue_from_markdown(self, issueNumber, markdown):
i = self.get_issue(issueNumber, False)
i._loadMD(markdown)
self.issues.append(i)
return i
def get_issue(self, issueNumber, die=True):
for issue in self.issues:
if issue.number == issueNumber:
return issue
# not found in cache, try to load from github
github_issue = self.api.get_issue(issueNumber)
if github_issue:
issue = Issue(repo=self, githubObj=github_issue)
self._issues.append(issue)
return issue
if die:
raise Exception("cannot find issue:%s in repo:%s" % (issueNumber, self))
else:
i = Issue(self)
i._ddict["number"] = issueNumber
return i
def issues_by_type(self, *types):
"""
filter is method which takes issue as argument and returns True or False to include
"""
issues = []
for issue in self.issues:
if issue.type in types:
issues.append(issue)
return issues
def issues_by_state(self, filter=None):
"""
filter is method which takes issue as argument and returns True or False to include
"""
res = {}
for item in self.states:
res[item] = []
for issue in self.issues:
if issue.state == item:
if filter is None or filter(issue):
res[item].append(issue)
return res
def issues_by_priority(self, filter=None):
"""
filter is method which takes issue as argument and returns True or False to include
"""
res = {}
for item in self.priorities:
res[item] = []
for issue in self.issues:
if issue.priority == item:
if filter is None or filter(issue):
res[item].append(issue)
return res
def issues_by_type_state(self, filter=None, collapsepriority=True):
"""
filter is method which takes issue as argument and returns True or False to include
returns dict of dict keys: type, state and then issues sorted following priority
"""
res = {}
for type in self.types:
res[type] = {}
for state in self.states:
res[type][state] = {}
for priority in self.priorities:
res[type][state][priority] = []
for issue in self.issues:
if issue.type == type and issue.state == state:
if filter is None or filter(issue):
res[type][state][priority].append(issue)
if collapsepriority:
# sort the issues following priority
temp = res[type][state]
res[type][state] = []
for priority in self.priorities:
for subitem in temp[priority]:
res[type][state].append(subitem)
return res
@property
def types(self):
return GithubRepo.TYPES
@property
def priorities(self):
return GithubRepo.PRIORITIES
@property
def states(self):
return GithubRepo.STATES
@property
def milestones(self):
if self._milestones is None:
self._milestones = [RepoMilestone(self, x) for x in self.api.get_milestones()]
return self._milestones
@property
def milestone_titles(self):
return [item.title for item in self.milestones]
@property
def milestone_names(self):
return [item.name for item in self.milestones]
def get_milestone(self, name, die=True):
name = name.strip()
if name == "":
raise Exception("Name cannot be empty.")
for item in self.milestones:
if name == item.name.strip() or name == item.title.strip():
return item
if die:
raise Exception("Could not find milestone with name:%s" % name)
else:
return None
@retry
def create_milestone(self, name, title, description="", deadline="", owner=""):
self._log_info('Attempt to create milestone "%s" [%s] deadline %s' % (name, title, deadline))
def getBody(descr, name, owner):
out = "%s\n\n" % descr
out += "## name:%s\n" % name
out += "## owner:%s\n" % owner
return out
ms = None
for s in [name, title]:
ms = self.get_milestone(s, die=False)
if ms is not None:
break
if ms is not None:
if ms.title != title:
ms.title = title
# if ms.deadline != deadline:
# ms.deadline = deadline
tocheck = getBody(description.strip(), name, owner)
if ms.body.strip() != tocheck.strip():
ms.body = tocheck
else:
# due = j.data.time.epoch2pythonDateTime(int(j.data.time.getEpochFuture(deadline)))
self._log_info("Create milestone on %s: %s" % (self, title))
body = getBody(description.strip(), name, owner)
# workaround for https://github.com/PyGithub/PyGithub/issues/396
milestone = self.api.create_milestone(title=title, description=body)
milestone.edit(title=title)
self._milestones.append(RepoMilestone(self, milestone))
def delete_milestone(self, name):
if name.strip() == "":
raise Exception("Name cannot be empty.")
self._log_info("Delete milestone on %s: '%s'" % (self, name))
try:
ms = self.get_milestone(name)
ms.api.delete()
self._milestones = []
except Exception:
self._log_info("Milestone '%s' doesn't exist. no need to delete" % name)
def _labelsubset(self, cat):
res = []
for item in self.labels:
if item.startswith(cat):
item = item[len(cat) :].strip("_")
res.append(item)
res.sort()
return res
def get_color(self, name):
# colors={'state_question':'fbca04',
# 'priority_urgent':'d93f0b',
# 'state_verification':'006b75',
# 'priority_minor':'',
# 'type_task':'',
# 'type_feature':'',
# 'process_wontfix':"ffffff",
# 'priority_critical':"b60205",
# 'state_inprogress':"e6e6e6",
# 'priority_normal':"e6e6e6",
# 'type_story':"ee9a00",
# 'process_duplicate':"",
# 'state_closed':"5319e7",
# 'type_bug':"fc2929",
# 'state_accepted':"0e8a16",
# 'type_question':"fbca04",
# 'state_new':"1d76db"}
if name.startswith("state"):
return "c2e0c6" # light green
if name.startswith("process"):
return "d4c5f9" # light purple
if name.startswith("type"):
return "fef2c0" # light yellow
if name in ("priority_critical", "task_no_estimation"):
return "b60205" # red
if name.startswith("priority_urgent"):
return "d93f0b"
if name.startswith("priority"):
return "f9d0c4" # roze
return "ffffff"
@retry
def set_file(self, path, content, message="update file"):
"""
Creates or updates the file content at path with given content
:param path: file path `README.md`
:param content: Plain content of file
:return:
"""
bytes = content.encode()
encoded = base64.encodebytes(bytes)
params = {"message": message, "content": encoded.decode()}
path = urllib.parse.quote(path)
try:
obj = self.api.get_contents(path)
params["sha"] = obj.sha
if base64.decodebytes(obj.content.encode()) == bytes:
return
except UnknownObjectException:
pass
self._log_info('Updating file "%s"' % path)
self.api._requester.requestJsonAndCheck("PUT", self.api.url + "/contents/" + path, input=params)
@property
def issues(self):
with self._lock:
if self._issues is None:
issues = []
for item in self.api.get_issues(state="all"):
issues.append(Issue(self, githubObj=item))
self._issues = issues
return self._issues
def download_directory(self, src, download_dir, branch=None):
dest = j.sals.fs.join_paths(download_dir, self.api.full_name)
j.sals.fs.mkdirs(dest)
branch = branch or self.api.default_branch
contents = self.api.get_dir_contents(src, ref=branch)
for content in contents:
if content.type == "dir":
dir_path = j.sals.fs.join_paths(dest, content.path)
j.sals.fs.mkdirs(dir_path)
self.download_directory(content.path, download_dir, branch)
else:
file_path = j.sals.fs.join_paths(dest, content.path)
j.sals.fs.mkdirs(j.sals.fs.dirname(file_path))
file_content = self.api.get_contents(content.path, ref=branch)
with open(file_path, "+w") as f:
f.write(base64.b64decode(file_content.content).decode())
return j.sals.fs.join_paths(dest, src)
def get_git_tree(self, sha_or_branch):
"""return a list of `GitTreeElement` for every element in source tree"""
return self.api.get_git_tree(sha_or_branch).tree
def __str__(self):
return "gitrepo:%s" % self.fullname
__repr__ = __str__
Classes
class GithubRepo (client, fullname)
-
Expand source code
class GithubRepo: TYPES = ["story", "ticket", "task", "bug", "feature", "question", "monitor", "unknown"] PRIORITIES = ["critical", "urgent", "normal", "minor"] STATES = ["new", "accepted", "question", "inprogress", "verification", "closed"] def __init__(self, client, fullname): self.client = client self.fullname = fullname self._repoclient = None self._labels = None self._issues = None self._lock = threading.RLock() self._milestones = None def _log_info(self, s): pass @property def api(self): if self._repoclient is None: self._repoclient = self.client.get_repo(self.fullname) return self._repoclient @property def name(self): return self.fullname.split("/", 1)[-1] @property def type(self): if self.name in ["home"]: return "home" elif self.name.startswith("proj"): return "proj" elif self.name.startswith("org_"): return "org" elif self.name.startswith("www"): return "www" elif self.name.startswith("doc"): return "doc" elif self.name.startswith("cockpit"): return "cockpit" else: return "code" @property def labelnames(self): return [item.name for item in self.labels] @property def labels(self): with self._lock: if self._labels is None: self._labels = [item for item in self.api.get_labels()] return self._labels @property def branches(self): """list of `Branch` objects""" return list(self.api.get_branches()) @property def stories(self): # walk overall issues find the stories (based on type) # only for home type repo, otherwise return [] return self.issues_by_type("story") @property def tasks(self): # walk overall issues find the stories (based on type) # only for home type repo, otherwise return [] return self.issues_by_type("task") def labelsSet(self, labels2set, ignoreDelete=["p_"], delete=True): """ @param ignore all labels starting with ignore will not be deleted """ for item in labels2set: if not isinstance(item, str): raise Exception("Labels to set need to be in string format, found:%s" % labels2set) # walk over github existing labels labelstowalk = copy.copy(self.labels) for item in labelstowalk: name = item.name.lower() if name not in labels2set: # label in repo does not correspond to label we need if name in replacelabels: nameNew = replacelabels[item.name.lower()] if nameNew not in self.labelnames: color = self.get_color(name) self._log_info( "change label in repo: %s oldlabel:'%s' to:'%s' color:%s" % (self.fullname, item.name, nameNew, color) ) item.edit(nameNew, color) self._labels = None else: # no replacement name = "type_unknown" color = self.get_color(name) try: item.edit(name, color) except BaseException: item.delete() self._labels = None # walk over new labels we need to set for name in labels2set: if name not in self.labelnames: # does not exist yet in repo color = self.get_color(name) self._log_info("create label: %s %s %s" % (self.fullname, name, color)) self.api.create_label(name, color) self._labels = None name = "" if delete: labelstowalk = copy.copy(self.labels) for item in labelstowalk: if item.name not in labels2set: self._log_info("delete label: %s %s" % (self.fullname, item.name)) ignoreDeleteDo = False for filteritem in ignoreDelete: if item.name.startswith(filteritem): ignoreDeleteDo = True if ignoreDeleteDo is False: item.delete() self._labels = None # check the colors labelstowalk = copy.copy(self.labels) for item in labelstowalk: # we recognise the label self._log_info("check color of repo:%s labelname:'%s'" % (self.fullname, item.name)) color = self.get_color(item.name) if item.color != color: self._log_info("change label color for repo %s %s" % (item.name, color)) item.edit(item.name, color) self._labels = None def getlabel(self, name): for item in self.labels: self._log_info("%s:look for name:'%s'" % (item.name, name)) if item.name == name: return item raise Exception("Dit not find label: '%s'" % name) def get_issue_from_markdown(self, issueNumber, markdown): i = self.get_issue(issueNumber, False) i._loadMD(markdown) self.issues.append(i) return i def get_issue(self, issueNumber, die=True): for issue in self.issues: if issue.number == issueNumber: return issue # not found in cache, try to load from github github_issue = self.api.get_issue(issueNumber) if github_issue: issue = Issue(repo=self, githubObj=github_issue) self._issues.append(issue) return issue if die: raise Exception("cannot find issue:%s in repo:%s" % (issueNumber, self)) else: i = Issue(self) i._ddict["number"] = issueNumber return i def issues_by_type(self, *types): """ filter is method which takes issue as argument and returns True or False to include """ issues = [] for issue in self.issues: if issue.type in types: issues.append(issue) return issues def issues_by_state(self, filter=None): """ filter is method which takes issue as argument and returns True or False to include """ res = {} for item in self.states: res[item] = [] for issue in self.issues: if issue.state == item: if filter is None or filter(issue): res[item].append(issue) return res def issues_by_priority(self, filter=None): """ filter is method which takes issue as argument and returns True or False to include """ res = {} for item in self.priorities: res[item] = [] for issue in self.issues: if issue.priority == item: if filter is None or filter(issue): res[item].append(issue) return res def issues_by_type_state(self, filter=None, collapsepriority=True): """ filter is method which takes issue as argument and returns True or False to include returns dict of dict keys: type, state and then issues sorted following priority """ res = {} for type in self.types: res[type] = {} for state in self.states: res[type][state] = {} for priority in self.priorities: res[type][state][priority] = [] for issue in self.issues: if issue.type == type and issue.state == state: if filter is None or filter(issue): res[type][state][priority].append(issue) if collapsepriority: # sort the issues following priority temp = res[type][state] res[type][state] = [] for priority in self.priorities: for subitem in temp[priority]: res[type][state].append(subitem) return res @property def types(self): return GithubRepo.TYPES @property def priorities(self): return GithubRepo.PRIORITIES @property def states(self): return GithubRepo.STATES @property def milestones(self): if self._milestones is None: self._milestones = [RepoMilestone(self, x) for x in self.api.get_milestones()] return self._milestones @property def milestone_titles(self): return [item.title for item in self.milestones] @property def milestone_names(self): return [item.name for item in self.milestones] def get_milestone(self, name, die=True): name = name.strip() if name == "": raise Exception("Name cannot be empty.") for item in self.milestones: if name == item.name.strip() or name == item.title.strip(): return item if die: raise Exception("Could not find milestone with name:%s" % name) else: return None @retry def create_milestone(self, name, title, description="", deadline="", owner=""): self._log_info('Attempt to create milestone "%s" [%s] deadline %s' % (name, title, deadline)) def getBody(descr, name, owner): out = "%s\n\n" % descr out += "## name:%s\n" % name out += "## owner:%s\n" % owner return out ms = None for s in [name, title]: ms = self.get_milestone(s, die=False) if ms is not None: break if ms is not None: if ms.title != title: ms.title = title # if ms.deadline != deadline: # ms.deadline = deadline tocheck = getBody(description.strip(), name, owner) if ms.body.strip() != tocheck.strip(): ms.body = tocheck else: # due = j.data.time.epoch2pythonDateTime(int(j.data.time.getEpochFuture(deadline))) self._log_info("Create milestone on %s: %s" % (self, title)) body = getBody(description.strip(), name, owner) # workaround for https://github.com/PyGithub/PyGithub/issues/396 milestone = self.api.create_milestone(title=title, description=body) milestone.edit(title=title) self._milestones.append(RepoMilestone(self, milestone)) def delete_milestone(self, name): if name.strip() == "": raise Exception("Name cannot be empty.") self._log_info("Delete milestone on %s: '%s'" % (self, name)) try: ms = self.get_milestone(name) ms.api.delete() self._milestones = [] except Exception: self._log_info("Milestone '%s' doesn't exist. no need to delete" % name) def _labelsubset(self, cat): res = [] for item in self.labels: if item.startswith(cat): item = item[len(cat) :].strip("_") res.append(item) res.sort() return res def get_color(self, name): # colors={'state_question':'fbca04', # 'priority_urgent':'d93f0b', # 'state_verification':'006b75', # 'priority_minor':'', # 'type_task':'', # 'type_feature':'', # 'process_wontfix':"ffffff", # 'priority_critical':"b60205", # 'state_inprogress':"e6e6e6", # 'priority_normal':"e6e6e6", # 'type_story':"ee9a00", # 'process_duplicate':"", # 'state_closed':"5319e7", # 'type_bug':"fc2929", # 'state_accepted':"0e8a16", # 'type_question':"fbca04", # 'state_new':"1d76db"} if name.startswith("state"): return "c2e0c6" # light green if name.startswith("process"): return "d4c5f9" # light purple if name.startswith("type"): return "fef2c0" # light yellow if name in ("priority_critical", "task_no_estimation"): return "b60205" # red if name.startswith("priority_urgent"): return "d93f0b" if name.startswith("priority"): return "f9d0c4" # roze return "ffffff" @retry def set_file(self, path, content, message="update file"): """ Creates or updates the file content at path with given content :param path: file path `README.md` :param content: Plain content of file :return: """ bytes = content.encode() encoded = base64.encodebytes(bytes) params = {"message": message, "content": encoded.decode()} path = urllib.parse.quote(path) try: obj = self.api.get_contents(path) params["sha"] = obj.sha if base64.decodebytes(obj.content.encode()) == bytes: return except UnknownObjectException: pass self._log_info('Updating file "%s"' % path) self.api._requester.requestJsonAndCheck("PUT", self.api.url + "/contents/" + path, input=params) @property def issues(self): with self._lock: if self._issues is None: issues = [] for item in self.api.get_issues(state="all"): issues.append(Issue(self, githubObj=item)) self._issues = issues return self._issues def download_directory(self, src, download_dir, branch=None): dest = j.sals.fs.join_paths(download_dir, self.api.full_name) j.sals.fs.mkdirs(dest) branch = branch or self.api.default_branch contents = self.api.get_dir_contents(src, ref=branch) for content in contents: if content.type == "dir": dir_path = j.sals.fs.join_paths(dest, content.path) j.sals.fs.mkdirs(dir_path) self.download_directory(content.path, download_dir, branch) else: file_path = j.sals.fs.join_paths(dest, content.path) j.sals.fs.mkdirs(j.sals.fs.dirname(file_path)) file_content = self.api.get_contents(content.path, ref=branch) with open(file_path, "+w") as f: f.write(base64.b64decode(file_content.content).decode()) return j.sals.fs.join_paths(dest, src) def get_git_tree(self, sha_or_branch): """return a list of `GitTreeElement` for every element in source tree""" return self.api.get_git_tree(sha_or_branch).tree def __str__(self): return "gitrepo:%s" % self.fullname __repr__ = __str__
Class variables
var PRIORITIES
var STATES
var TYPES
Instance variables
var api
-
Expand source code
@property def api(self): if self._repoclient is None: self._repoclient = self.client.get_repo(self.fullname) return self._repoclient
var branches
-
list of
Branch
objectsExpand source code
@property def branches(self): """list of `Branch` objects""" return list(self.api.get_branches())
var issues
-
Expand source code
@property def issues(self): with self._lock: if self._issues is None: issues = [] for item in self.api.get_issues(state="all"): issues.append(Issue(self, githubObj=item)) self._issues = issues return self._issues
var labelnames
-
Expand source code
@property def labelnames(self): return [item.name for item in self.labels]
var labels
-
Expand source code
@property def labels(self): with self._lock: if self._labels is None: self._labels = [item for item in self.api.get_labels()] return self._labels
var milestone_names
-
Expand source code
@property def milestone_names(self): return [item.name for item in self.milestones]
var milestone_titles
-
Expand source code
@property def milestone_titles(self): return [item.title for item in self.milestones]
var milestones
-
Expand source code
@property def milestones(self): if self._milestones is None: self._milestones = [RepoMilestone(self, x) for x in self.api.get_milestones()] return self._milestones
var name
-
Expand source code
@property def name(self): return self.fullname.split("/", 1)[-1]
var priorities
-
Expand source code
@property def priorities(self): return GithubRepo.PRIORITIES
var states
-
Expand source code
@property def states(self): return GithubRepo.STATES
var stories
-
Expand source code
@property def stories(self): # walk overall issues find the stories (based on type) # only for home type repo, otherwise return [] return self.issues_by_type("story")
var tasks
-
Expand source code
@property def tasks(self): # walk overall issues find the stories (based on type) # only for home type repo, otherwise return [] return self.issues_by_type("task")
var type
-
Expand source code
@property def type(self): if self.name in ["home"]: return "home" elif self.name.startswith("proj"): return "proj" elif self.name.startswith("org_"): return "org" elif self.name.startswith("www"): return "www" elif self.name.startswith("doc"): return "doc" elif self.name.startswith("cockpit"): return "cockpit" else: return "code"
var types
-
Expand source code
@property def types(self): return GithubRepo.TYPES
Methods
def create_milestone(self, *args, **kwargs)
-
Expand source code
def wrapper(self, *args, **kwargs): for _ in range(6): try: result = function(self, *args, **kwargs) break except Exception as e: j.logger.warning(f"Failed to execute {function.__name__} due to error: {str(e)}") sleep(1) else: raise j.exceptions.Runtime(f"Failed to execute {function.__name__} after multiple retries") return result
def delete_milestone(self, name)
-
Expand source code
def delete_milestone(self, name): if name.strip() == "": raise Exception("Name cannot be empty.") self._log_info("Delete milestone on %s: '%s'" % (self, name)) try: ms = self.get_milestone(name) ms.api.delete() self._milestones = [] except Exception: self._log_info("Milestone '%s' doesn't exist. no need to delete" % name)
def download_directory(self, src, download_dir, branch=None)
-
Expand source code
def download_directory(self, src, download_dir, branch=None): dest = j.sals.fs.join_paths(download_dir, self.api.full_name) j.sals.fs.mkdirs(dest) branch = branch or self.api.default_branch contents = self.api.get_dir_contents(src, ref=branch) for content in contents: if content.type == "dir": dir_path = j.sals.fs.join_paths(dest, content.path) j.sals.fs.mkdirs(dir_path) self.download_directory(content.path, download_dir, branch) else: file_path = j.sals.fs.join_paths(dest, content.path) j.sals.fs.mkdirs(j.sals.fs.dirname(file_path)) file_content = self.api.get_contents(content.path, ref=branch) with open(file_path, "+w") as f: f.write(base64.b64decode(file_content.content).decode()) return j.sals.fs.join_paths(dest, src)
def get_color(self, name)
-
Expand source code
def get_color(self, name): # colors={'state_question':'fbca04', # 'priority_urgent':'d93f0b', # 'state_verification':'006b75', # 'priority_minor':'', # 'type_task':'', # 'type_feature':'', # 'process_wontfix':"ffffff", # 'priority_critical':"b60205", # 'state_inprogress':"e6e6e6", # 'priority_normal':"e6e6e6", # 'type_story':"ee9a00", # 'process_duplicate':"", # 'state_closed':"5319e7", # 'type_bug':"fc2929", # 'state_accepted':"0e8a16", # 'type_question':"fbca04", # 'state_new':"1d76db"} if name.startswith("state"): return "c2e0c6" # light green if name.startswith("process"): return "d4c5f9" # light purple if name.startswith("type"): return "fef2c0" # light yellow if name in ("priority_critical", "task_no_estimation"): return "b60205" # red if name.startswith("priority_urgent"): return "d93f0b" if name.startswith("priority"): return "f9d0c4" # roze return "ffffff"
def get_git_tree(self, sha_or_branch)
-
return a list of
GitTreeElement
for every element in source treeExpand source code
def get_git_tree(self, sha_or_branch): """return a list of `GitTreeElement` for every element in source tree""" return self.api.get_git_tree(sha_or_branch).tree
def get_issue(self, issueNumber, die=True)
-
Expand source code
def get_issue(self, issueNumber, die=True): for issue in self.issues: if issue.number == issueNumber: return issue # not found in cache, try to load from github github_issue = self.api.get_issue(issueNumber) if github_issue: issue = Issue(repo=self, githubObj=github_issue) self._issues.append(issue) return issue if die: raise Exception("cannot find issue:%s in repo:%s" % (issueNumber, self)) else: i = Issue(self) i._ddict["number"] = issueNumber return i
def get_issue_from_markdown(self, issueNumber, markdown)
-
Expand source code
def get_issue_from_markdown(self, issueNumber, markdown): i = self.get_issue(issueNumber, False) i._loadMD(markdown) self.issues.append(i) return i
def get_milestone(self, name, die=True)
-
Expand source code
def get_milestone(self, name, die=True): name = name.strip() if name == "": raise Exception("Name cannot be empty.") for item in self.milestones: if name == item.name.strip() or name == item.title.strip(): return item if die: raise Exception("Could not find milestone with name:%s" % name) else: return None
def getlabel(self, name)
-
Expand source code
def getlabel(self, name): for item in self.labels: self._log_info("%s:look for name:'%s'" % (item.name, name)) if item.name == name: return item raise Exception("Dit not find label: '%s'" % name)
def issues_by_priority(self, filter=None)
-
filter is method which takes issue as argument and returns True or False to include
Expand source code
def issues_by_priority(self, filter=None): """ filter is method which takes issue as argument and returns True or False to include """ res = {} for item in self.priorities: res[item] = [] for issue in self.issues: if issue.priority == item: if filter is None or filter(issue): res[item].append(issue) return res
def issues_by_state(self, filter=None)
-
filter is method which takes issue as argument and returns True or False to include
Expand source code
def issues_by_state(self, filter=None): """ filter is method which takes issue as argument and returns True or False to include """ res = {} for item in self.states: res[item] = [] for issue in self.issues: if issue.state == item: if filter is None or filter(issue): res[item].append(issue) return res
def issues_by_type(self, *types)
-
filter is method which takes issue as argument and returns True or False to include
Expand source code
def issues_by_type(self, *types): """ filter is method which takes issue as argument and returns True or False to include """ issues = [] for issue in self.issues: if issue.type in types: issues.append(issue) return issues
def issues_by_type_state(self, filter=None, collapsepriority=True)
-
filter is method which takes issue as argument and returns True or False to include returns dict of dict keys: type, state and then issues sorted following priority
Expand source code
def issues_by_type_state(self, filter=None, collapsepriority=True): """ filter is method which takes issue as argument and returns True or False to include returns dict of dict keys: type, state and then issues sorted following priority """ res = {} for type in self.types: res[type] = {} for state in self.states: res[type][state] = {} for priority in self.priorities: res[type][state][priority] = [] for issue in self.issues: if issue.type == type and issue.state == state: if filter is None or filter(issue): res[type][state][priority].append(issue) if collapsepriority: # sort the issues following priority temp = res[type][state] res[type][state] = [] for priority in self.priorities: for subitem in temp[priority]: res[type][state].append(subitem) return res
def labelsSet(self, labels2set, ignoreDelete=['p_'], delete=True)
-
@param ignore all labels starting with ignore will not be deleted
Expand source code
def labelsSet(self, labels2set, ignoreDelete=["p_"], delete=True): """ @param ignore all labels starting with ignore will not be deleted """ for item in labels2set: if not isinstance(item, str): raise Exception("Labels to set need to be in string format, found:%s" % labels2set) # walk over github existing labels labelstowalk = copy.copy(self.labels) for item in labelstowalk: name = item.name.lower() if name not in labels2set: # label in repo does not correspond to label we need if name in replacelabels: nameNew = replacelabels[item.name.lower()] if nameNew not in self.labelnames: color = self.get_color(name) self._log_info( "change label in repo: %s oldlabel:'%s' to:'%s' color:%s" % (self.fullname, item.name, nameNew, color) ) item.edit(nameNew, color) self._labels = None else: # no replacement name = "type_unknown" color = self.get_color(name) try: item.edit(name, color) except BaseException: item.delete() self._labels = None # walk over new labels we need to set for name in labels2set: if name not in self.labelnames: # does not exist yet in repo color = self.get_color(name) self._log_info("create label: %s %s %s" % (self.fullname, name, color)) self.api.create_label(name, color) self._labels = None name = "" if delete: labelstowalk = copy.copy(self.labels) for item in labelstowalk: if item.name not in labels2set: self._log_info("delete label: %s %s" % (self.fullname, item.name)) ignoreDeleteDo = False for filteritem in ignoreDelete: if item.name.startswith(filteritem): ignoreDeleteDo = True if ignoreDeleteDo is False: item.delete() self._labels = None # check the colors labelstowalk = copy.copy(self.labels) for item in labelstowalk: # we recognise the label self._log_info("check color of repo:%s labelname:'%s'" % (self.fullname, item.name)) color = self.get_color(item.name) if item.color != color: self._log_info("change label color for repo %s %s" % (item.name, color)) item.edit(item.name, color) self._labels = None
def set_file(self, *args, **kwargs)
-
Expand source code
def wrapper(self, *args, **kwargs): for _ in range(6): try: result = function(self, *args, **kwargs) break except Exception as e: j.logger.warning(f"Failed to execute {function.__name__} due to error: {str(e)}") sleep(1) else: raise j.exceptions.Runtime(f"Failed to execute {function.__name__} after multiple retries") return result