Module jumpscale.sals.chatflows.polls

Expand source code
from textwrap import dedent

from jumpscale.core.base import StoredFactory
from jumpscale.loader import j
from jumpscale.sals.chatflows.chatflows import GedisChatBot, chatflow_step
from jumpscale.sals.chatflows.models.voter_model import User

WALLET_NAME = "polls_receive"
MANIFESTO_VERSION = "2.0.1"

all_users = StoredFactory(User)
all_users.always_reload = True


class Poll(GedisChatBot):
    """Polls chatflow base
    just inherit from this class and override poll_name and QUESTIONS in your chatflow

    Args:
        GedisChatBot (Parent): contains the chatflows sals main functions

    Raises:
        j.core.exceptions.Runtime: if wrong inheritance happens
        StopChatFlow: if payment is failed
    """

    poll_name = None  # Required

    steps = ["initialize", "welcome", "payment", "custom_votes", "result"]

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.QUESTIONS = {}
        self.extra_data = {}
        self.metadata = {}
        self.custom_answers = {}

        if not j.clients.stellar.find(WALLET_NAME):
            raise j.core.exceptions.Runtime(f"Wallet {WALLET_NAME} is not configured, please create it.")

        self.wallet = j.clients.stellar.get(WALLET_NAME)

    def _get_wallets_as_md(self, wallets):
        result = "\n"
        for item in wallets:
            result += f"- `{item}` has {self._get_voter_balance(item)} (TFT+TFTA)\n"
        return result

    @chatflow_step()
    def initialize(self):
        user_info = self.user_info()

        username = user_info["username"].split(".")[0]
        welcome_message = f"# Welcome `{username}` to {self.poll_name.capitalize()} Poll\n<br/>The detailed poll results are only visible to the tfgrid council members"
        self.user = all_users.get(name=f"{self.poll_name}_{username}")
        self.user.poll_name = self.poll_name
        if self.user.has_voted:
            welcome_message += "\n<br/><br/>`Note: You have already voted.`"

        if self.user.has_voted:
            actions = ["Edit My Vote", "See Results"]
            action = self.single_choice(welcome_message, options=actions, required=True, md=True)
            if action == actions[1]:
                self.result()
                self.end()
        else:
            self.md_show(welcome_message, md=True)

    @chatflow_step()
    def welcome(self):
        pass

    @chatflow_step(title="Loading Wallets")
    def payment(self):
        def _pay(msg=""):
            amount = 0.1
            currency = self.single_choice(
                "We need to know how many tokens you have to allow weighted vote results, "
                "in order to do this we need to know all of your wallets addresses you want us to consider in this poll. "
                "The idea is you send us a small transaction that costs 0.1 tokens. "
                "Then we will be able to calculate the sum of the TFTs and TFTAs you have in all of the wallets you added. Now you can start adding your wallets "
                "Which token would you like to continue the transaction with?",
                ["TFT", "TFTA"],
                required=True,
            )

            qr_code_content = j.sals.zos.get()._escrow_to_qrcode(
                escrow_address=self.wallet.address,
                escrow_asset=currency,
                total_amount=amount,
                message=self.user.user_code,
            )

            message_text = f"""\
            <h3>Make a Payment</h3>
            Scan the QR code with your wallet (do not change the message) or enter the information below manually and proceed with the payment.
            Make sure to add the message (user code) as memo_text
            Please make the transaction and press Next
            <h4> Wallet address: </h4>  {self.wallet.address}
            <h4> Currency: </h4>  {currency}
            <h4> Amount: </h4>  {amount}
            <h4> Message (User code): </h4>  {self.user.user_code}
            """
            self.qrcode_show(data=qr_code_content, msg=dedent(message_text), scale=4, update=True, html=True, md=True)
            if self._check_payment(timeout=360):
                return True
            else:
                return False

        def _pay_again(msg=""):
            while True:
                pay_again = self.single_choice(
                    msg
                    or f"Wallets added: {self._get_wallets_as_md(self.user.wallets_addresses)}\nDo you like to add another wallet?",
                    ["YES", "NO"],
                    md=True,
                )
                if pay_again == "NO":
                    break
                if not _pay():
                    _pay_again(
                        "Error adding the wallet, Please make sure you transaction is completed.\n do you want to try again ?"
                    )

        if not self.user.user_code:
            self.user.user_code = j.data.idgenerator.chars(10)
        # Payment
        if self.user.has_voted and len(self.user.wallets_addresses) > 0:
            self.md_show(
                f"You have already added wallets: {self._get_wallets_as_md(self.user.wallets_addresses)}\n, Press Next to add another wallet and modify your vote",
                md=True,
            )
            _pay_again()

        elif len(self.user.wallets_addresses) > 0:
            self.md_show(
                f"You have already added wallets: {self._get_wallets_as_md(self.user.wallets_addresses)}\n, Press Next to add another wallet and submit your vote",
                md=True,
            )
            _pay_again()
        else:
            if _pay():
                _pay_again()
            else:
                self.stop("Error adding the wallet, Please make sure you transaction is completed.\n Please try again")

    def _check_payment(self, timeout):
        """Returns True if user has paid already, False if not"""
        now = j.data.time.get().timestamp
        remaning_time = j.data.time.get(now + timeout).timestamp
        while remaning_time > now:
            remaning_time_msg = j.data.time.get(remaning_time).humanize(granularity=["minute", "second"])
            payment_message = (
                "# Payment being processed...\n"
                f"Process will be cancelled if payment is not successful {remaning_time_msg}"
            )
            self.md_show_update(payment_message, md=True)
            user_wallets_count = len(self.user.wallets_addresses)
            transactions = self.wallet.list_transactions()
            for transaction in transactions:
                if transaction.memo_text == self.user.user_code:
                    if transaction.hash not in self.user.transaction_hashes:
                        self.user.transaction_hashes.append(transaction.hash)
                    user_wallet = self.wallet.get_sender_wallet_address(transaction.hash)
                    if not user_wallet in self.user.wallets_addresses:
                        self.user.wallets_addresses.append(user_wallet)
                        self.user.tokens += float(self._get_voter_balance(user_wallet))
                    self.user.save()
            if len(self.user.wallets_addresses) > user_wallets_count:
                return True
        return False

    def get_vote_answer(self, vote_title):
        answer_array = self.user.vote_data.get(vote_title)
        if answer_array:
            options = self.QUESTIONS.get(vote_title)
            try:
                return options[answer_array.index(1)]
            except ValueError:
                pass

    def get_question_answer(self, question_title):
        return self.user.extra_data.get(question_title)

    def vote(self):
        answers = {}
        answers.update(self.custom_answers)
        vote_data = self._map_vote_results(answers.copy())
        vote_data_weighted = self._map_vote_results(answers.copy(), weighted=True)
        self.user.vote_data = vote_data
        self.user.vote_data_weighted = vote_data_weighted
        self.user.has_voted = True
        self.user.extra_data = self.extra_data
        self.user.manifesto_version = MANIFESTO_VERSION
        self.user.save()

    @chatflow_step(title="Please fill in the following form", disable_previous=True)
    def custom_votes(self):
        """allow child classes to have its custom slides

        Returns:
            Dict, Dict: Has all questions and answer, extra saved data outside the poll
        """
        pass

    def _map_vote_results(self, form_answers, weighted=False):
        """takes form answers and returns a sparse array of what user chose
        to be easy in calcualting votes

        example: ["Blue", "Red", "Green", "Orange"]
        if user chose "Red" will [0, 1, 0, 0]
        if user chose "Red" and weighted results will [0, <user_token_sum>, 0, 0]
        Args:
            form_answers (dict): form result dictionary
        """
        for question, answer in form_answers.items():
            all_answers_init = len(self.QUESTIONS[question]) * [0.0]
            answer_index = self.QUESTIONS[question].index(answer)
            if weighted:
                all_answers_init[answer_index] = self.user.tokens
            else:
                all_answers_init[answer_index] = 1
            form_answers[question] = all_answers_init
        return form_answers

    @chatflow_step(title="Poll Results %", final_step=True)
    def result(self):
        usersnames = all_users.list_all()
        total_votes = 0
        total_answers = {}
        total_answers_weighted = {}
        for username in usersnames:
            user = all_users.get(username)
            if user.poll_name == self.poll_name and user.has_voted:
                total_votes += 1
                user_votes = all_users.get(username).vote_data
                user_votes_weighted = all_users.get(username).vote_data_weighted
                for question, answer in user_votes.items():
                    if total_answers.get(question):
                        total_answers[question] = list(map(sum, zip(total_answers[question], answer)))
                    else:
                        total_answers[question] = answer

                for question, answer in user_votes_weighted.items():
                    if total_answers_weighted.get(question):
                        total_answers_weighted[question] = list(map(sum, zip(total_answers_weighted[question], answer)))
                    else:
                        total_answers_weighted[question] = answer

        total_answers_with_percent = {k: self._calculate_percent(v) for k, v in total_answers.items()}
        total_answers_weighted_with_percent = {k: self._calculate_percent(v) for k, v in total_answers_weighted.items()}

        result_msg = ""
        for question, answers in total_answers_with_percent.items():
            question_current_title = question
            question_new_title = self.metadata["new_title_keys"][question_current_title]
            result_msg += f"### {question_new_title}\n"
            for i in range(len(answers)):
                answer_name = self.QUESTIONS[question][i]
                result_msg += f"- {answer_name}: {answers[i]}%\n"
            result_msg += "\n\n"

        # result_msg += "\n<br />\n\n"
        # result_msg += "## Weighted results %\n\n<br />\n\n"
        # for question, answers in total_answers_weighted_with_percent.items():
        #     question_current_title = question
        #     question_new_title = self.metadata["new_title_keys"][question_current_title]
        #     result_msg += f"### {question_new_title}\n"
        #     for i in range(len(answers)):
        #         answer_name = self.QUESTIONS[question][i]
        #         result_msg += f"- {answer_name}: {answers[i]}%\n"
        #     result_msg += "\n"

        result_msg += f"\n<br />\n\n#### Total number of votes: {total_votes}\n"
        self.md_show(result_msg, md=True)

    def _calculate_percent(self, answers):
        """Takes the answers list which is a sparse array and map it
        to percentages

        Args:
            answers (list)

        Returns:
            list: answers_list mapped to percentages
        """
        answers_list = answers[:]
        total_votes = float(sum(answers_list))
        for i in range(len(answers_list)):
            res = (answers_list[i] / total_votes) * 100
            answers_list[i] = round(res, 2)
        return answers_list

    def _get_voter_balance(self, wallet_address):
        """Get sum of user TFT and TFTA

        Args:
            wallet_address (String): Wallet address
        """
        assets = self.wallet.get_balance(wallet_address)
        total_balance = 0.0
        # get free balances
        for asset in assets.balances:
            if asset.asset_code == "TFT" or asset.asset_code == "TFTA":
                total_balance += float(asset.balance)

        # add locked funds too
        for locked_account in assets.escrow_accounts:
            for locked_asset in locked_account.balances:
                if locked_asset.asset_code == "TFT" or locked_asset.asset_code == "TFTA":
                    total_balance += float(locked_asset.balance)

        return total_balance

Classes

class Poll (*args, **kwargs)

Polls chatflow base just inherit from this class and override poll_name and QUESTIONS in your chatflow

Args

GedisChatBot : Parent
contains the chatflows sals main functions

Raises

j.core.exceptions.Runtime
if wrong inheritance happens
StopChatFlow
if payment is failed

Keyword Args any extra kwargs that is passed while creating the session (i.e. can be used for passing any query parameters)

Expand source code
class Poll(GedisChatBot):
    """Polls chatflow base
    just inherit from this class and override poll_name and QUESTIONS in your chatflow

    Args:
        GedisChatBot (Parent): contains the chatflows sals main functions

    Raises:
        j.core.exceptions.Runtime: if wrong inheritance happens
        StopChatFlow: if payment is failed
    """

    poll_name = None  # Required

    steps = ["initialize", "welcome", "payment", "custom_votes", "result"]

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.QUESTIONS = {}
        self.extra_data = {}
        self.metadata = {}
        self.custom_answers = {}

        if not j.clients.stellar.find(WALLET_NAME):
            raise j.core.exceptions.Runtime(f"Wallet {WALLET_NAME} is not configured, please create it.")

        self.wallet = j.clients.stellar.get(WALLET_NAME)

    def _get_wallets_as_md(self, wallets):
        result = "\n"
        for item in wallets:
            result += f"- `{item}` has {self._get_voter_balance(item)} (TFT+TFTA)\n"
        return result

    @chatflow_step()
    def initialize(self):
        user_info = self.user_info()

        username = user_info["username"].split(".")[0]
        welcome_message = f"# Welcome `{username}` to {self.poll_name.capitalize()} Poll\n<br/>The detailed poll results are only visible to the tfgrid council members"
        self.user = all_users.get(name=f"{self.poll_name}_{username}")
        self.user.poll_name = self.poll_name
        if self.user.has_voted:
            welcome_message += "\n<br/><br/>`Note: You have already voted.`"

        if self.user.has_voted:
            actions = ["Edit My Vote", "See Results"]
            action = self.single_choice(welcome_message, options=actions, required=True, md=True)
            if action == actions[1]:
                self.result()
                self.end()
        else:
            self.md_show(welcome_message, md=True)

    @chatflow_step()
    def welcome(self):
        pass

    @chatflow_step(title="Loading Wallets")
    def payment(self):
        def _pay(msg=""):
            amount = 0.1
            currency = self.single_choice(
                "We need to know how many tokens you have to allow weighted vote results, "
                "in order to do this we need to know all of your wallets addresses you want us to consider in this poll. "
                "The idea is you send us a small transaction that costs 0.1 tokens. "
                "Then we will be able to calculate the sum of the TFTs and TFTAs you have in all of the wallets you added. Now you can start adding your wallets "
                "Which token would you like to continue the transaction with?",
                ["TFT", "TFTA"],
                required=True,
            )

            qr_code_content = j.sals.zos.get()._escrow_to_qrcode(
                escrow_address=self.wallet.address,
                escrow_asset=currency,
                total_amount=amount,
                message=self.user.user_code,
            )

            message_text = f"""\
            <h3>Make a Payment</h3>
            Scan the QR code with your wallet (do not change the message) or enter the information below manually and proceed with the payment.
            Make sure to add the message (user code) as memo_text
            Please make the transaction and press Next
            <h4> Wallet address: </h4>  {self.wallet.address}
            <h4> Currency: </h4>  {currency}
            <h4> Amount: </h4>  {amount}
            <h4> Message (User code): </h4>  {self.user.user_code}
            """
            self.qrcode_show(data=qr_code_content, msg=dedent(message_text), scale=4, update=True, html=True, md=True)
            if self._check_payment(timeout=360):
                return True
            else:
                return False

        def _pay_again(msg=""):
            while True:
                pay_again = self.single_choice(
                    msg
                    or f"Wallets added: {self._get_wallets_as_md(self.user.wallets_addresses)}\nDo you like to add another wallet?",
                    ["YES", "NO"],
                    md=True,
                )
                if pay_again == "NO":
                    break
                if not _pay():
                    _pay_again(
                        "Error adding the wallet, Please make sure you transaction is completed.\n do you want to try again ?"
                    )

        if not self.user.user_code:
            self.user.user_code = j.data.idgenerator.chars(10)
        # Payment
        if self.user.has_voted and len(self.user.wallets_addresses) > 0:
            self.md_show(
                f"You have already added wallets: {self._get_wallets_as_md(self.user.wallets_addresses)}\n, Press Next to add another wallet and modify your vote",
                md=True,
            )
            _pay_again()

        elif len(self.user.wallets_addresses) > 0:
            self.md_show(
                f"You have already added wallets: {self._get_wallets_as_md(self.user.wallets_addresses)}\n, Press Next to add another wallet and submit your vote",
                md=True,
            )
            _pay_again()
        else:
            if _pay():
                _pay_again()
            else:
                self.stop("Error adding the wallet, Please make sure you transaction is completed.\n Please try again")

    def _check_payment(self, timeout):
        """Returns True if user has paid already, False if not"""
        now = j.data.time.get().timestamp
        remaning_time = j.data.time.get(now + timeout).timestamp
        while remaning_time > now:
            remaning_time_msg = j.data.time.get(remaning_time).humanize(granularity=["minute", "second"])
            payment_message = (
                "# Payment being processed...\n"
                f"Process will be cancelled if payment is not successful {remaning_time_msg}"
            )
            self.md_show_update(payment_message, md=True)
            user_wallets_count = len(self.user.wallets_addresses)
            transactions = self.wallet.list_transactions()
            for transaction in transactions:
                if transaction.memo_text == self.user.user_code:
                    if transaction.hash not in self.user.transaction_hashes:
                        self.user.transaction_hashes.append(transaction.hash)
                    user_wallet = self.wallet.get_sender_wallet_address(transaction.hash)
                    if not user_wallet in self.user.wallets_addresses:
                        self.user.wallets_addresses.append(user_wallet)
                        self.user.tokens += float(self._get_voter_balance(user_wallet))
                    self.user.save()
            if len(self.user.wallets_addresses) > user_wallets_count:
                return True
        return False

    def get_vote_answer(self, vote_title):
        answer_array = self.user.vote_data.get(vote_title)
        if answer_array:
            options = self.QUESTIONS.get(vote_title)
            try:
                return options[answer_array.index(1)]
            except ValueError:
                pass

    def get_question_answer(self, question_title):
        return self.user.extra_data.get(question_title)

    def vote(self):
        answers = {}
        answers.update(self.custom_answers)
        vote_data = self._map_vote_results(answers.copy())
        vote_data_weighted = self._map_vote_results(answers.copy(), weighted=True)
        self.user.vote_data = vote_data
        self.user.vote_data_weighted = vote_data_weighted
        self.user.has_voted = True
        self.user.extra_data = self.extra_data
        self.user.manifesto_version = MANIFESTO_VERSION
        self.user.save()

    @chatflow_step(title="Please fill in the following form", disable_previous=True)
    def custom_votes(self):
        """allow child classes to have its custom slides

        Returns:
            Dict, Dict: Has all questions and answer, extra saved data outside the poll
        """
        pass

    def _map_vote_results(self, form_answers, weighted=False):
        """takes form answers and returns a sparse array of what user chose
        to be easy in calcualting votes

        example: ["Blue", "Red", "Green", "Orange"]
        if user chose "Red" will [0, 1, 0, 0]
        if user chose "Red" and weighted results will [0, <user_token_sum>, 0, 0]
        Args:
            form_answers (dict): form result dictionary
        """
        for question, answer in form_answers.items():
            all_answers_init = len(self.QUESTIONS[question]) * [0.0]
            answer_index = self.QUESTIONS[question].index(answer)
            if weighted:
                all_answers_init[answer_index] = self.user.tokens
            else:
                all_answers_init[answer_index] = 1
            form_answers[question] = all_answers_init
        return form_answers

    @chatflow_step(title="Poll Results %", final_step=True)
    def result(self):
        usersnames = all_users.list_all()
        total_votes = 0
        total_answers = {}
        total_answers_weighted = {}
        for username in usersnames:
            user = all_users.get(username)
            if user.poll_name == self.poll_name and user.has_voted:
                total_votes += 1
                user_votes = all_users.get(username).vote_data
                user_votes_weighted = all_users.get(username).vote_data_weighted
                for question, answer in user_votes.items():
                    if total_answers.get(question):
                        total_answers[question] = list(map(sum, zip(total_answers[question], answer)))
                    else:
                        total_answers[question] = answer

                for question, answer in user_votes_weighted.items():
                    if total_answers_weighted.get(question):
                        total_answers_weighted[question] = list(map(sum, zip(total_answers_weighted[question], answer)))
                    else:
                        total_answers_weighted[question] = answer

        total_answers_with_percent = {k: self._calculate_percent(v) for k, v in total_answers.items()}
        total_answers_weighted_with_percent = {k: self._calculate_percent(v) for k, v in total_answers_weighted.items()}

        result_msg = ""
        for question, answers in total_answers_with_percent.items():
            question_current_title = question
            question_new_title = self.metadata["new_title_keys"][question_current_title]
            result_msg += f"### {question_new_title}\n"
            for i in range(len(answers)):
                answer_name = self.QUESTIONS[question][i]
                result_msg += f"- {answer_name}: {answers[i]}%\n"
            result_msg += "\n\n"

        # result_msg += "\n<br />\n\n"
        # result_msg += "## Weighted results %\n\n<br />\n\n"
        # for question, answers in total_answers_weighted_with_percent.items():
        #     question_current_title = question
        #     question_new_title = self.metadata["new_title_keys"][question_current_title]
        #     result_msg += f"### {question_new_title}\n"
        #     for i in range(len(answers)):
        #         answer_name = self.QUESTIONS[question][i]
        #         result_msg += f"- {answer_name}: {answers[i]}%\n"
        #     result_msg += "\n"

        result_msg += f"\n<br />\n\n#### Total number of votes: {total_votes}\n"
        self.md_show(result_msg, md=True)

    def _calculate_percent(self, answers):
        """Takes the answers list which is a sparse array and map it
        to percentages

        Args:
            answers (list)

        Returns:
            list: answers_list mapped to percentages
        """
        answers_list = answers[:]
        total_votes = float(sum(answers_list))
        for i in range(len(answers_list)):
            res = (answers_list[i] / total_votes) * 100
            answers_list[i] = round(res, 2)
        return answers_list

    def _get_voter_balance(self, wallet_address):
        """Get sum of user TFT and TFTA

        Args:
            wallet_address (String): Wallet address
        """
        assets = self.wallet.get_balance(wallet_address)
        total_balance = 0.0
        # get free balances
        for asset in assets.balances:
            if asset.asset_code == "TFT" or asset.asset_code == "TFTA":
                total_balance += float(asset.balance)

        # add locked funds too
        for locked_account in assets.escrow_accounts:
            for locked_asset in locked_account.balances:
                if locked_asset.asset_code == "TFT" or locked_asset.asset_code == "TFTA":
                    total_balance += float(locked_asset.balance)

        return total_balance

Ancestors

Subclasses

Class variables

var poll_name
var steps

Methods

def custom_votes(*args, **kwargs)
Expand source code
def wrapper(*args, **kwargs):
    self_ = args[0]
    self_.step_info.update(title=title, slide=0, previous=(not disable_previous), final_step=final_step)
    return func(*args, **kwargs)
def get_question_answer(self, question_title)
Expand source code
def get_question_answer(self, question_title):
    return self.user.extra_data.get(question_title)
def get_vote_answer(self, vote_title)
Expand source code
def get_vote_answer(self, vote_title):
    answer_array = self.user.vote_data.get(vote_title)
    if answer_array:
        options = self.QUESTIONS.get(vote_title)
        try:
            return options[answer_array.index(1)]
        except ValueError:
            pass
def initialize(*args, **kwargs)
Expand source code
def wrapper(*args, **kwargs):
    self_ = args[0]
    self_.step_info.update(title=title, slide=0, previous=(not disable_previous), final_step=final_step)
    return func(*args, **kwargs)
def payment(*args, **kwargs)
Expand source code
def wrapper(*args, **kwargs):
    self_ = args[0]
    self_.step_info.update(title=title, slide=0, previous=(not disable_previous), final_step=final_step)
    return func(*args, **kwargs)
def result(*args, **kwargs)
Expand source code
def wrapper(*args, **kwargs):
    self_ = args[0]
    self_.step_info.update(title=title, slide=0, previous=(not disable_previous), final_step=final_step)
    return func(*args, **kwargs)
def vote(self)
Expand source code
def vote(self):
    answers = {}
    answers.update(self.custom_answers)
    vote_data = self._map_vote_results(answers.copy())
    vote_data_weighted = self._map_vote_results(answers.copy(), weighted=True)
    self.user.vote_data = vote_data
    self.user.vote_data_weighted = vote_data_weighted
    self.user.has_voted = True
    self.user.extra_data = self.extra_data
    self.user.manifesto_version = MANIFESTO_VERSION
    self.user.save()
def welcome(*args, **kwargs)
Expand source code
def wrapper(*args, **kwargs):
    self_ = args[0]
    self_.step_info.update(title=title, slide=0, previous=(not disable_previous), final_step=final_step)
    return func(*args, **kwargs)

Inherited members