Module jumpscale.clients.github.issue

Expand source code
from jumpscale.loader import j
from .base import base
from .base import replacelabels
from .milestone import RepoMilestone


class Issue(base):
    def __init__(self, repo, ddict={}, githubObj=None):
        base.__init__(self)
        self.repo = repo
        self._ddict = ddict
        self._githubObj = githubObj
        self._comments = ddict.get("comments", None)
        if githubObj is not None:
            self.load()

        self._lock = threading.RLock()
        # self.todo

    @property
    def api(self):
        if self._githubObj is None:
            self._githubObj = self.repo.api.get_issue(self.number)
        return self._githubObj

    @property
    def ddict(self):
        if self._ddict == {}:
            # no dict yet, fetch from github
            self.load()
        # we lazy load the comments. so it's only loaded when accesses
        self._ddict["comments"] = self.comments
        return self._ddict

    @property
    def comments(self):
        if self._comments is not None:
            return self._comments

        with self._lock:
            if self._comments is None:
                self._log_debug("Loading comments for issue: %s" % self.number)
                self._comments = []
                for comment in self.api.get_comments():
                    obj = {}
                    user = self.repo.client.getUserLogin(githubObj=comment.user)
                    obj["user"] = user
                    obj["url"] = comment.url
                    obj["id"] = comment.id
                    obj["body"] = comment.body
                    obj["user_id"] = comment.user.id
                    # obj["time"] = j.data.time.any2HRDateTime([comment.last_modified, comment.created_at])
                    self._comments.append(obj)
        return self._comments

    def reload_comments(self):
        with self._lock:
            self._comments = []
            for comment in self.api.get_comments():
                obj = {}
                user = self.repo.client.getUserLogin(githubObj=comment.user)
                obj["user"] = user
                obj["url"] = comment.url
                obj["id"] = comment.id
                obj["body"] = comment.body
                # obj["time"] = j.data.time.any2HRDateTime([comment.last_modified, comment.created_at])
                self._comments.append(obj)
        return self._comments

    @property
    def guid(self):
        return self.repo.fullname + "_" + str(self._ddict["number"])

    @property
    def number(self):
        return int(self._ddict["number"])

    @property
    def title(self):
        return self._ddict["title"]

    @property
    def body(self):
        return self._ddict["body"]

    @body.setter
    def body(self, val):
        self._ddict["body"] = val
        try:
            self.api.edit(body=self._ddict["body"])
        except Exception as e:
            self._log_error("Failed to update the issue body: %s" % e)

    @property
    def time(self):
        return self._ddict["time"]

    @property
    def url(self):
        return self._ddict["url"]

    @property
    def assignee(self):
        return self._ddict["assignee"]

    @property
    def labels(self):
        # we return a copy so changing the list doesn't actually change the
        # ddict value
        return self._ddict["labels"][:]

    @property
    def id(self):
        return self._ddict["id"]

    @labels.setter
    def labels(self, val):
        # check if all are already in labels, if yes nothing to do
        if len(val) == len(self._ddict["labels"]):
            self._ddict["labels"].sort()
            val.sort()
            if val == self._ddict["labels"]:
                return
        self._ddict["labels"] = val
        toset = [self.repo.getLabel(item) for item in self._ddict["labels"]]
        self.api.set_labels(*toset)

    @property
    def milestone(self):
        return self._ddict["milestone"]

    @property
    def state(self):
        states = []
        if not self.is_open:
            return "closed"

        for label in self.labels:
            if label.startswith("state"):
                states.append(label)
        if len(states) == 1:
            return states[0][len("state") :].strip("_")
        elif len(states) > 1:
            self.state = "question"
        else:
            return ""

    @state.setter
    def state(self, val):
        return self._setLabels(val, "state")

    @property
    def is_open(self):
        return self._ddict["open"]

    @property
    def type(self):
        items = []
        for label in self.labels:
            if label.startswith("type"):
                items.append(label)
        if len(items) == 1:
            return items[0].partition("_")[-1]

        return ""

    @type.setter
    def type(self, val):
        return self._setLabels(val, "type")

    @property
    def priority(self):
        items = []
        for label in self.labels:
            if label.startswith("priority"):
                items.append(label)
        if len(items) == 1:
            return items[0].partition("_")[-1]
        else:
            self.priority = "normal"
            return self.priority

    @priority.setter
    def priority(self, val):
        return self._setLabels(val, "priority")

    @property
    def process(self):
        items = []
        for label in self.labels:
            if label.startswith("process"):
                items.append(label)
        if len(items) == 1:
            return items[0][len("process") :].strip("_")
        else:
            return ""

    @process.setter
    def process(self, val):
        return self._setLabels(val, "process")

    def _setLabels(self, val, category):
        if val is None or val == "":
            return

        if val.startswith(category):
            _, _, val = val.partition("_")

        val = val.strip("_")
        val = val.lower()

        val = "%s_%s" % (category, val)

        if val not in self.repo.labelnames:
            self.repo.labelnames.sort()
            llist = ",".join(self.repo.labelnames)
            raise Exception(
                "Label needs to be in list:%s (is understood labels in this repo on github), now is: '%s'"
                % (llist, val)
            )

        # make sure there is only 1
        labels2set = self.labels
        items = []
        for label in self.labels:
            if label.startswith(category):
                items.append(label)
        if len(items) == 1 and val in items:
            return
        for item in items:
            labels2set.pop(labels2set.index(item))
        if val is not None or val != "":
            labels2set.append(val)
        self.labels = labels2set

    def load(self):

        self._ddict = {}

        # check labels
        labels = [item.name for item in self.api.labels]  # are the names
        newlabels = []
        for label in labels:
            if label not in self.repo.labelnames:
                if label in replacelabels:
                    if replacelabels[label] not in newlabels:
                        newlabels.append(replacelabels[label])
            else:
                if label not in newlabels:
                    newlabels.append(label)

        if labels != newlabels:
            self._log_info("change label:%s for %s" % (labels, self.api.title))
            labels2set = [self.repo.getLabel(item) for item in newlabels]
            self.api.set_labels(*labels2set)
            labels = newlabels

        self._ddict["labels"] = labels
        self._ddict["id"] = self.api.id
        self._ddict["url"] = self.api.html_url
        self._ddict["number"] = self.api.number
        self._ddict["open"] = self.api.state == "open"

        self._ddict["assignee"] = self.repo.client.getUserLogin(githubObj=self.api.assignee)
        self._ddict["state"] = self.api.state
        self._ddict["title"] = self.api.title

        self._ddict["body"] = self.api.body

        # self._ddict["time"] = j.data.time.any2HRDateTime([self.api.last_modified, self.api.created_at])

        self._log_debug("LOAD:%s %s" % (self.repo.fullname, self._ddict["title"]))

        if self.api.milestone is None:
            self._ddict["milestone"] = ""
        else:
            ms = RepoMilestone(repo=self.repo, githubObj=self.api.milestone)
            self._ddict["milestone"] = "%s:%s" % (ms.number, ms.title)

    @property
    def todo(self):
        if "_todo" not in self.__dict__:
            todo = []
            if self.body is not None:
                for line in self.body.split("\n"):
                    if line.startswith("!! "):
                        todo.append(line.strip().strip("!! "))
            for comment in self.comments:
                for line in comment["body"].split("\n"):
                    if line.startswith("!! "):
                        todo.append(line.strip().strip("!! "))
            self._todo = todo
        return self._todo

    @property
    def istask(self):
        if self.type == "task" or self.title.lower().endswith("task"):
            return True
        return False

    def __str__(self):
        return "issue:%s" % self.title

    __repr__ = __str__

Classes

class Issue (repo, ddict={}, githubObj=None)

A simple attribute-based namespace.

SimpleNamespace(**kwargs)

base class implementation for any class with fields which supports getting/setting raw data for any instance fields.

any instance can have an optional name and a parent.

class Person(Base):
    name = fields.String()
    age = fields.Float()

p = Person(name="ahmed", age="19")
print(p.name, p.age)

Args

parent_ : Base, optional
parent instance. Defaults to None.
instance_name_ : str, optional
instance name. Defaults to None.
**values
any given field values to initiate the instance with
Expand source code
class Issue(base):
    def __init__(self, repo, ddict={}, githubObj=None):
        base.__init__(self)
        self.repo = repo
        self._ddict = ddict
        self._githubObj = githubObj
        self._comments = ddict.get("comments", None)
        if githubObj is not None:
            self.load()

        self._lock = threading.RLock()
        # self.todo

    @property
    def api(self):
        if self._githubObj is None:
            self._githubObj = self.repo.api.get_issue(self.number)
        return self._githubObj

    @property
    def ddict(self):
        if self._ddict == {}:
            # no dict yet, fetch from github
            self.load()
        # we lazy load the comments. so it's only loaded when accesses
        self._ddict["comments"] = self.comments
        return self._ddict

    @property
    def comments(self):
        if self._comments is not None:
            return self._comments

        with self._lock:
            if self._comments is None:
                self._log_debug("Loading comments for issue: %s" % self.number)
                self._comments = []
                for comment in self.api.get_comments():
                    obj = {}
                    user = self.repo.client.getUserLogin(githubObj=comment.user)
                    obj["user"] = user
                    obj["url"] = comment.url
                    obj["id"] = comment.id
                    obj["body"] = comment.body
                    obj["user_id"] = comment.user.id
                    # obj["time"] = j.data.time.any2HRDateTime([comment.last_modified, comment.created_at])
                    self._comments.append(obj)
        return self._comments

    def reload_comments(self):
        with self._lock:
            self._comments = []
            for comment in self.api.get_comments():
                obj = {}
                user = self.repo.client.getUserLogin(githubObj=comment.user)
                obj["user"] = user
                obj["url"] = comment.url
                obj["id"] = comment.id
                obj["body"] = comment.body
                # obj["time"] = j.data.time.any2HRDateTime([comment.last_modified, comment.created_at])
                self._comments.append(obj)
        return self._comments

    @property
    def guid(self):
        return self.repo.fullname + "_" + str(self._ddict["number"])

    @property
    def number(self):
        return int(self._ddict["number"])

    @property
    def title(self):
        return self._ddict["title"]

    @property
    def body(self):
        return self._ddict["body"]

    @body.setter
    def body(self, val):
        self._ddict["body"] = val
        try:
            self.api.edit(body=self._ddict["body"])
        except Exception as e:
            self._log_error("Failed to update the issue body: %s" % e)

    @property
    def time(self):
        return self._ddict["time"]

    @property
    def url(self):
        return self._ddict["url"]

    @property
    def assignee(self):
        return self._ddict["assignee"]

    @property
    def labels(self):
        # we return a copy so changing the list doesn't actually change the
        # ddict value
        return self._ddict["labels"][:]

    @property
    def id(self):
        return self._ddict["id"]

    @labels.setter
    def labels(self, val):
        # check if all are already in labels, if yes nothing to do
        if len(val) == len(self._ddict["labels"]):
            self._ddict["labels"].sort()
            val.sort()
            if val == self._ddict["labels"]:
                return
        self._ddict["labels"] = val
        toset = [self.repo.getLabel(item) for item in self._ddict["labels"]]
        self.api.set_labels(*toset)

    @property
    def milestone(self):
        return self._ddict["milestone"]

    @property
    def state(self):
        states = []
        if not self.is_open:
            return "closed"

        for label in self.labels:
            if label.startswith("state"):
                states.append(label)
        if len(states) == 1:
            return states[0][len("state") :].strip("_")
        elif len(states) > 1:
            self.state = "question"
        else:
            return ""

    @state.setter
    def state(self, val):
        return self._setLabels(val, "state")

    @property
    def is_open(self):
        return self._ddict["open"]

    @property
    def type(self):
        items = []
        for label in self.labels:
            if label.startswith("type"):
                items.append(label)
        if len(items) == 1:
            return items[0].partition("_")[-1]

        return ""

    @type.setter
    def type(self, val):
        return self._setLabels(val, "type")

    @property
    def priority(self):
        items = []
        for label in self.labels:
            if label.startswith("priority"):
                items.append(label)
        if len(items) == 1:
            return items[0].partition("_")[-1]
        else:
            self.priority = "normal"
            return self.priority

    @priority.setter
    def priority(self, val):
        return self._setLabels(val, "priority")

    @property
    def process(self):
        items = []
        for label in self.labels:
            if label.startswith("process"):
                items.append(label)
        if len(items) == 1:
            return items[0][len("process") :].strip("_")
        else:
            return ""

    @process.setter
    def process(self, val):
        return self._setLabels(val, "process")

    def _setLabels(self, val, category):
        if val is None or val == "":
            return

        if val.startswith(category):
            _, _, val = val.partition("_")

        val = val.strip("_")
        val = val.lower()

        val = "%s_%s" % (category, val)

        if val not in self.repo.labelnames:
            self.repo.labelnames.sort()
            llist = ",".join(self.repo.labelnames)
            raise Exception(
                "Label needs to be in list:%s (is understood labels in this repo on github), now is: '%s'"
                % (llist, val)
            )

        # make sure there is only 1
        labels2set = self.labels
        items = []
        for label in self.labels:
            if label.startswith(category):
                items.append(label)
        if len(items) == 1 and val in items:
            return
        for item in items:
            labels2set.pop(labels2set.index(item))
        if val is not None or val != "":
            labels2set.append(val)
        self.labels = labels2set

    def load(self):

        self._ddict = {}

        # check labels
        labels = [item.name for item in self.api.labels]  # are the names
        newlabels = []
        for label in labels:
            if label not in self.repo.labelnames:
                if label in replacelabels:
                    if replacelabels[label] not in newlabels:
                        newlabels.append(replacelabels[label])
            else:
                if label not in newlabels:
                    newlabels.append(label)

        if labels != newlabels:
            self._log_info("change label:%s for %s" % (labels, self.api.title))
            labels2set = [self.repo.getLabel(item) for item in newlabels]
            self.api.set_labels(*labels2set)
            labels = newlabels

        self._ddict["labels"] = labels
        self._ddict["id"] = self.api.id
        self._ddict["url"] = self.api.html_url
        self._ddict["number"] = self.api.number
        self._ddict["open"] = self.api.state == "open"

        self._ddict["assignee"] = self.repo.client.getUserLogin(githubObj=self.api.assignee)
        self._ddict["state"] = self.api.state
        self._ddict["title"] = self.api.title

        self._ddict["body"] = self.api.body

        # self._ddict["time"] = j.data.time.any2HRDateTime([self.api.last_modified, self.api.created_at])

        self._log_debug("LOAD:%s %s" % (self.repo.fullname, self._ddict["title"]))

        if self.api.milestone is None:
            self._ddict["milestone"] = ""
        else:
            ms = RepoMilestone(repo=self.repo, githubObj=self.api.milestone)
            self._ddict["milestone"] = "%s:%s" % (ms.number, ms.title)

    @property
    def todo(self):
        if "_todo" not in self.__dict__:
            todo = []
            if self.body is not None:
                for line in self.body.split("\n"):
                    if line.startswith("!! "):
                        todo.append(line.strip().strip("!! "))
            for comment in self.comments:
                for line in comment["body"].split("\n"):
                    if line.startswith("!! "):
                        todo.append(line.strip().strip("!! "))
            self._todo = todo
        return self._todo

    @property
    def istask(self):
        if self.type == "task" or self.title.lower().endswith("task"):
            return True
        return False

    def __str__(self):
        return "issue:%s" % self.title

    __repr__ = __str__

Ancestors

Instance variables

var api
Expand source code
@property
def api(self):
    if self._githubObj is None:
        self._githubObj = self.repo.api.get_issue(self.number)
    return self._githubObj
var assignee
Expand source code
@property
def assignee(self):
    return self._ddict["assignee"]
var body
Expand source code
@property
def body(self):
    return self._ddict["body"]
var comments
Expand source code
@property
def comments(self):
    if self._comments is not None:
        return self._comments

    with self._lock:
        if self._comments is None:
            self._log_debug("Loading comments for issue: %s" % self.number)
            self._comments = []
            for comment in self.api.get_comments():
                obj = {}
                user = self.repo.client.getUserLogin(githubObj=comment.user)
                obj["user"] = user
                obj["url"] = comment.url
                obj["id"] = comment.id
                obj["body"] = comment.body
                obj["user_id"] = comment.user.id
                # obj["time"] = j.data.time.any2HRDateTime([comment.last_modified, comment.created_at])
                self._comments.append(obj)
    return self._comments
var ddict
Expand source code
@property
def ddict(self):
    if self._ddict == {}:
        # no dict yet, fetch from github
        self.load()
    # we lazy load the comments. so it's only loaded when accesses
    self._ddict["comments"] = self.comments
    return self._ddict
var guid
Expand source code
@property
def guid(self):
    return self.repo.fullname + "_" + str(self._ddict["number"])
var id
Expand source code
@property
def id(self):
    return self._ddict["id"]
var is_open
Expand source code
@property
def is_open(self):
    return self._ddict["open"]
var istask
Expand source code
@property
def istask(self):
    if self.type == "task" or self.title.lower().endswith("task"):
        return True
    return False
var labels
Expand source code
@property
def labels(self):
    # we return a copy so changing the list doesn't actually change the
    # ddict value
    return self._ddict["labels"][:]
var milestone
Expand source code
@property
def milestone(self):
    return self._ddict["milestone"]
var number
Expand source code
@property
def number(self):
    return int(self._ddict["number"])
var priority
Expand source code
@property
def priority(self):
    items = []
    for label in self.labels:
        if label.startswith("priority"):
            items.append(label)
    if len(items) == 1:
        return items[0].partition("_")[-1]
    else:
        self.priority = "normal"
        return self.priority
var process
Expand source code
@property
def process(self):
    items = []
    for label in self.labels:
        if label.startswith("process"):
            items.append(label)
    if len(items) == 1:
        return items[0][len("process") :].strip("_")
    else:
        return ""
var state
Expand source code
@property
def state(self):
    states = []
    if not self.is_open:
        return "closed"

    for label in self.labels:
        if label.startswith("state"):
            states.append(label)
    if len(states) == 1:
        return states[0][len("state") :].strip("_")
    elif len(states) > 1:
        self.state = "question"
    else:
        return ""
var time
Expand source code
@property
def time(self):
    return self._ddict["time"]
var title
Expand source code
@property
def title(self):
    return self._ddict["title"]
var todo
Expand source code
@property
def todo(self):
    if "_todo" not in self.__dict__:
        todo = []
        if self.body is not None:
            for line in self.body.split("\n"):
                if line.startswith("!! "):
                    todo.append(line.strip().strip("!! "))
        for comment in self.comments:
            for line in comment["body"].split("\n"):
                if line.startswith("!! "):
                    todo.append(line.strip().strip("!! "))
        self._todo = todo
    return self._todo
var type
Expand source code
@property
def type(self):
    items = []
    for label in self.labels:
        if label.startswith("type"):
            items.append(label)
    if len(items) == 1:
        return items[0].partition("_")[-1]

    return ""
var url
Expand source code
@property
def url(self):
    return self._ddict["url"]

Methods

def load(self)
Expand source code
def load(self):

    self._ddict = {}

    # check labels
    labels = [item.name for item in self.api.labels]  # are the names
    newlabels = []
    for label in labels:
        if label not in self.repo.labelnames:
            if label in replacelabels:
                if replacelabels[label] not in newlabels:
                    newlabels.append(replacelabels[label])
        else:
            if label not in newlabels:
                newlabels.append(label)

    if labels != newlabels:
        self._log_info("change label:%s for %s" % (labels, self.api.title))
        labels2set = [self.repo.getLabel(item) for item in newlabels]
        self.api.set_labels(*labels2set)
        labels = newlabels

    self._ddict["labels"] = labels
    self._ddict["id"] = self.api.id
    self._ddict["url"] = self.api.html_url
    self._ddict["number"] = self.api.number
    self._ddict["open"] = self.api.state == "open"

    self._ddict["assignee"] = self.repo.client.getUserLogin(githubObj=self.api.assignee)
    self._ddict["state"] = self.api.state
    self._ddict["title"] = self.api.title

    self._ddict["body"] = self.api.body

    # self._ddict["time"] = j.data.time.any2HRDateTime([self.api.last_modified, self.api.created_at])

    self._log_debug("LOAD:%s %s" % (self.repo.fullname, self._ddict["title"]))

    if self.api.milestone is None:
        self._ddict["milestone"] = ""
    else:
        ms = RepoMilestone(repo=self.repo, githubObj=self.api.milestone)
        self._ddict["milestone"] = "%s:%s" % (ms.number, ms.title)
def reload_comments(self)
Expand source code
def reload_comments(self):
    with self._lock:
        self._comments = []
        for comment in self.api.get_comments():
            obj = {}
            user = self.repo.client.getUserLogin(githubObj=comment.user)
            obj["user"] = user
            obj["url"] = comment.url
            obj["id"] = comment.id
            obj["body"] = comment.body
            # obj["time"] = j.data.time.any2HRDateTime([comment.last_modified, comment.created_at])
            self._comments.append(obj)
    return self._comments

Inherited members