Module jumpscale.clients.stellar.vesting

Expand source code
import stellar_sdk
from jumpscale.loader import j


_TFT_ISSUERS = {
    "TEST": "GA47YZA3PKFUZMPLQ3B5F2E3CJIB57TGGU7SPCQT2WAEYKN766PWIMB3",
    "STD": "GBOVQKJYHXRR3DX6NOX2RRYFRCUMSADGDESTDNBDS6CDVLGVESRTAC47",
}

_VESTING_ACTIVATORS = {
    "TEST": "GAXSJCVW7FBQCW3COY5B67S5QB6PLKTFL6ATLYXLZGCACO3KLYA6FVI7",
    "STD": "GB2P3V4GZNYTI3E5IOQHAF2E46GKMW26RHVTD4TZEZAELKLJHUYPH4FR",
}


_COSIGNERS = {
    "TEST": [
        "GBOZV7I3NJGXIFBMGHTETCY7ZL36QMER25IJGRFEEX3KM36YRUMHEZAP",
        "GCU5OGRCFJ3NWQWUF3A4MAGNYABOPJMCHMXYQ7TC4RQ6IYBIZBEOGN6H",
        "GAK2Q3P3YOAPK6MCM4P3ASDJYSHQ376LIO64CHBXNKIRFZECH2JY2LYX",
        "GALOCGHPJ2KQ67VIZRSLOAPJIBCUL54WG6ZHFDB6AKPIMEQXHU374UCJ",
        "GC3IFIJ7KEOZ5CUZS57HMLJLXISS7N45INIGXO2JFNJBLKOGYTSADFPB",
        "GD56QGOWI5ZRJAXNLMAH6BQ3MQEXNWI5H6Q57BFMOQZKPMRLPIYK4RC6",
        "GAJFU5JCWRVJ5G7HDYSW6NMDSS6XTRPCNEULD5MWCKQEIGFFSJMDZXUA",
        "GDBDJCWCEHLCCJ74HJUDJIBTZO7JVFCI2IIDLEPL33FHOAKVDCNAUE4P",
        "GBMXLD4BEJWWETPCFTO2NI7MFRGGBFZDIOVA7OHHFA5XBTB7YHUQBQME",
    ],
    "STD": [
        "GARF35OFGW2XFHFG764UVO2UTUOSDRVL5DU7RXMM7JJJOSVWKK7GATXU",
        "GDORF4CKQ2GDOBXXU7R3EXV3XRN6LFCGNYTHMYXDPZ5NECZ6YZLJGAA2",
        "GDTTKKRECHQMYWJWKQ5UTONRMNK54WRN3PB4U7JZAPUHLPI75ALN7ORU",
        "GDSKTNDAIBUBGQZXEJ64F3P37T7Y45ZOZQCRZY2I46F4UT66KG4JJSOU",
        "GCHUIUY5MOBWOXEKZJEQU2DCUG4WHRXM4KAWCEUQK3NTQGBK5RZ6FQBR",
        "GDTFYNE5MKGFL625FNUQUHILILFNNRSRYAAXADFFLMOOF5E6V5FLLSBG",
        "GDOSJPACWZ2DWSDNNKCVIKMUL3BNVVV3IERJPAZXM3PJMDNXYJIZFUL3",
        "GALQ4TZA6VRBBBBYMM3KSBSXJDLC5A7YIGH4SAS6AJ7N4ZA6P6IHWH43",
        "GDMMVCANURBLP6O64QWJM3L2EZTDSGTFL4B2BNXKAQPWYDX6WNAFNWK4",
    ],
}

DATA_ENTRY_KEY = "tft-vesting"

VESTING_SCHEME = "month1=05/2021,48months,priceunlock=tftvalue>month*0.015+0.15"


def _is_valid_cleanup_transaction(
    vesting_account_id: str, preauth_signer: str, network, network_passphrase, _get_unlockhash_transaction
) -> bool:

    unlock_tx = _get_unlockhash_transaction(unlockhash=preauth_signer)
    if not unlock_tx:
        return False
    txe = stellar_sdk.TransactionEnvelope.from_xdr(unlock_tx["transaction_xdr"], network_passphrase)
    tx = txe.transaction
    if not type(tx.source) is stellar_sdk.keypair.Keypair:
        return False
    if tx.source.public_key != vesting_account_id:
        return False
    if len(tx.operations) != 3:
        return False
    change_trust_op = tx.operations[0]
    if not type(change_trust_op) is stellar_sdk.operation.change_trust.ChangeTrust:
        return False
    if not change_trust_op.source is None:
        return False
    if int(change_trust_op.limit) != 0:
        return False
    if change_trust_op.asset.code != "TFT" or change_trust_op.asset.issuer != _TFT_ISSUERS[network]:
        return False
    manage_data_op = tx.operations[1]
    if not type(manage_data_op) is stellar_sdk.operation.manage_data.ManageData:
        return False
    if not manage_data_op.source is None:
        return False
    if manage_data_op.data_name != DATA_ENTRY_KEY:
        return False
    if not manage_data_op.data_value is None:
        return False
    account_merge_op = tx.operations[2]
    if not type(account_merge_op) is stellar_sdk.operation.account_merge.AccountMerge:
        return False
    if not account_merge_op.source is None:
        return False
    if account_merge_op.destination != _VESTING_ACTIVATORS[network]:
        return False
    return True


def _verify_signers(
    account_record, owner_address: str, network, network_passphrase, _get_unlockhash_transaction
) -> bool:
    # verify tresholds
    tresholds = account_record["thresholds"]
    if tresholds["low_threshold"] != 10 or tresholds["med_threshold"] != 10 or tresholds["high_threshold"] != 10:
        return False
    ## signers found for account
    signers = {signer["key"]: (signer["weight"], signer["type"]) for signer in account_record["signers"]}
    # example of signers item --> {'GCWPSLTHDH3OYH226EVCLOG33NOMDEO4KUPZQXTU7AWQNJPQPBGTLAVM':(5,'ed25519_public_key')}

    # Check all cosigners are in account signers with weight 1
    for correct_signer in _COSIGNERS[network]:
        if (
            correct_signer in signers.keys()
            and signers[correct_signer][0] == 1
            and signers[correct_signer][1] == "ed25519_public_key"
        ):
            continue
        else:
            return False

    account_id = account_record["account_id"]

    cleanup_signer_correct = False
    master_key_weight_correct = False
    owner_key_weight_correct = False
    for signer in account_record["signers"]:
        if signer["type"] == "preauth_tx":
            if signer["weight"] != 10:
                return False
            cleanup_signer_correct = _is_valid_cleanup_transaction(
                account_id, signer["key"], network, network_passphrase, _get_unlockhash_transaction
            )
            continue
        if signer["type"] != "ed25519_public_key":
            return False
        if signer["key"] == account_id:
            master_key_weight_correct = signer["weight"] == 0
        if signer["key"] == owner_address:
            owner_key_weight_correct = signer["weight"] == 5

    if len(account_record["signers"]) == 12:
        return cleanup_signer_correct and master_key_weight_correct and owner_key_weight_correct
    return len(account_record["signers"]) == 11 and master_key_weight_correct and owner_key_weight_correct


def is_vesting_account(
    account_record: dict, owner_address: str, network, network_passphrase, _get_unlockhash_transaction
) -> bool:
    if DATA_ENTRY_KEY in account_record.get("data"):
        decoded_data = j.data.serializers.base64.decode(account_record["data"][DATA_ENTRY_KEY]).decode()
        if decoded_data == VESTING_SCHEME:
            if _verify_signers(account_record, owner_address, network, network_passphrase, _get_unlockhash_transaction):
                return True
    return False

Functions

def is_vesting_account(account_record: dict, owner_address: str, network, network_passphrase, _get_unlockhash_transaction) ‑> bool
Expand source code
def is_vesting_account(
    account_record: dict, owner_address: str, network, network_passphrase, _get_unlockhash_transaction
) -> bool:
    if DATA_ENTRY_KEY in account_record.get("data"):
        decoded_data = j.data.serializers.base64.decode(account_record["data"][DATA_ENTRY_KEY]).decode()
        if decoded_data == VESTING_SCHEME:
            if _verify_signers(account_record, owner_address, network, network_passphrase, _get_unlockhash_transaction):
                return True
    return False