Skip to content

API documentation

Commands

deploy

DeployCmd (ACmd)

Source code in capsule/cmds/deploy.py
class DeployCmd(ACmd):

    CMD_NAME = "deploy"
    CMD_HELP = "Deploy a wasm contract artifact to a specified Terra Chain"
    CMD_USAGE = """
    $ capsule deploy -p ./my_contract.wasm -c columbus-5
    $ capsule deploy --path ./artifacts/my_contract.wasm --chain tequila-0004
    $ capsule deploy -p artifacts/capsule_test.wasm -i '{"count":17}' -c bombay-12"""
    CMD_DESCRIPTION = "Helper tool which enables you to programatically deploy a Wasm contract artifact to a chain as a code object and instantiate it"

    def initialise(self):
        # Define usage and description
        self.parser.usage = self.CMD_USAGE
        self.parser.description = self.CMD_DESCRIPTION

        # Add any positional or optional arguments here
        self.parser.add_argument("-p", "--package",
                                 type=str,
                                 help="(required) Name of new or path to existing package")

        # Add any positional or optional arguments here
        self.parser.add_argument("-i", "--initmsg",
                                 type=str,
                                 default={},
                                 help="(Optional) The initialization message for the contract you are trying to deploy. Must be a json-like str")

        self.parser.add_argument("-c", "--chain",
                                 type=str,
                                 default="",
                                 help="(Optional) A chain to deploy too. Defaults to localterra")




    def run_command(self, args):

        """Schema:
            Read Mnemonic from env as well as host to deploy on 
            any specified chain/account 

            Prepare defaults for the above

            Perform a store call for the wasm contract that was specified

            Verify the contract was stored with a new API call

            Instantiate the code object into a contract 

            Return success. 
        """
        LOG.info("Starting deployment")
        # Setup the Deployer with its lcd, fcd urls as well as the desired chain.
        # config = asyncio.run(get_config())
        chain_url="https://bombay-lcd.terra.dev"
        chain_fcd_url="https://bombay-fcd.terra.dev"
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        # config = asyncio.run(get_config())
        # # TODO: Review setting up a list of urls in project rather than just depending on settings in config
        # chain_url = config.get("networks", {}).get(args.chain, DEFAULT_TESTNET_CHAIN).get("chain_url")
        # chain_fcd_url = config.get("networks", {}).get(args.chain, DEFAULT_TESTNET_CHAIN).get("chain_fcd_url")

        # TODO: Validate Init_msg is wellformed json 

        deployer = Deployer(client=LCDClient(
            url=chain_url, 
            chain_id=args.chain or "bombay-12",
            gas_prices=Coins(requests.get(f"{chain_fcd_url}/v1/txs/gas_prices").json())))

        # # Attempt to store the provided package as a code object, the response will be a code ID if successful
        stored_code_id = asyncio.run(deployer.store_contract(contract_name="test", contract_path=args.package))
        # Instantiate a contract using the stored code ID for our contract bundle
        # and an init msg which will be different depending on the contract.
        instantiation_result = asyncio.run(deployer.instantiate_contract(stored_code_id, init_msg=json.loads(args.initmsg)))
        LOG.info(f"Successfully deployed contract artifact located at {args.package}. Contract address of instantiated contract is {instantiation_result}")
run_command(self, args)

Schema: Read Mnemonic from env as well as host to deploy on any specified chain/account

Prepare defaults for the above

Perform a store call for the wasm contract that was specified

Verify the contract was stored with a new API call

Instantiate the code object into a contract

Return success.

Source code in capsule/cmds/deploy.py
def run_command(self, args):

    """Schema:
        Read Mnemonic from env as well as host to deploy on 
        any specified chain/account 

        Prepare defaults for the above

        Perform a store call for the wasm contract that was specified

        Verify the contract was stored with a new API call

        Instantiate the code object into a contract 

        Return success. 
    """
    LOG.info("Starting deployment")
    # Setup the Deployer with its lcd, fcd urls as well as the desired chain.
    # config = asyncio.run(get_config())
    chain_url="https://bombay-lcd.terra.dev"
    chain_fcd_url="https://bombay-fcd.terra.dev"
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    # config = asyncio.run(get_config())
    # # TODO: Review setting up a list of urls in project rather than just depending on settings in config
    # chain_url = config.get("networks", {}).get(args.chain, DEFAULT_TESTNET_CHAIN).get("chain_url")
    # chain_fcd_url = config.get("networks", {}).get(args.chain, DEFAULT_TESTNET_CHAIN).get("chain_fcd_url")

    # TODO: Validate Init_msg is wellformed json 

    deployer = Deployer(client=LCDClient(
        url=chain_url, 
        chain_id=args.chain or "bombay-12",
        gas_prices=Coins(requests.get(f"{chain_fcd_url}/v1/txs/gas_prices").json())))

    # # Attempt to store the provided package as a code object, the response will be a code ID if successful
    stored_code_id = asyncio.run(deployer.store_contract(contract_name="test", contract_path=args.package))
    # Instantiate a contract using the stored code ID for our contract bundle
    # and an init msg which will be different depending on the contract.
    instantiation_result = asyncio.run(deployer.instantiate_contract(stored_code_id, init_msg=json.loads(args.initmsg)))
    LOG.info(f"Successfully deployed contract artifact located at {args.package}. Contract address of instantiated contract is {instantiation_result}")

execute

Query command -- Used to perform queries on MultiChain contracts

ExecuteCmd (ACmd)

Execute command -- Used to execute actions on MultiChain contracts

Source code in capsule/cmds/execute.py
class ExecuteCmd(ACmd):
    """
        Execute command -- Used to execute actions on MultiChain contracts
    """

    CMD_NAME = "execute"
    CMD_HELP = "Attempt to execute an action on a given contract address."
    CMD_USAGE = """
    $ capsule execute --contract <addr> --chain <chain> --msg <msg>"""
    CMD_DESCRIPTION = "Helper tool which exposes the ability to prepare and sending ExecuteMsg's on chain specific contract addresses"

    def initialise(self):
        # Define usage and description
        self.parser.usage = self.CMD_USAGE
        self.parser.description = self.CMD_DESCRIPTION

        # Add any positional or optional arguments here
        self.parser.add_argument("-a", "--address",
                                 type=str,
                                 help="(required) Contract Address to perform query on")

        # Add any positional or optional arguments here
        self.parser.add_argument("-m", "--msg",
                                type=str,
                                default={},
                                help="(Optional) The execution message for the contract you are trying to execute an action on. Must be a json-like str")

        self.parser.add_argument("-c", "--chain",
                                 type=str,
                                 default="",
                                 help="(Optional) A chain to deploy too. Defaults to localterra")

    def run_command(self, args):
        """

        """
        LOG.info(f"Performing msg exectution on contract addr {args.address}")
        chain_url="https://bombay-lcd.terra.dev"
        chain_fcd_url="https://bombay-fcd.terra.dev"

        deployer = Deployer(client=LCDClient(
            url=chain_url, 
            chain_id=args.chain or "bombay-12",
            gas_prices=Coins(requests.get(f"{chain_fcd_url}/v1/txs/gas_prices").json())))

        exe_result = asyncio.run(deployer.execute_contract(args.address, json.loads(args.msg)))
        LOG.info(f"Execute Result {exe_result} \n\n Execute Finished.")
run_command(self, args)
Source code in capsule/cmds/execute.py
def run_command(self, args):
    """

    """
    LOG.info(f"Performing msg exectution on contract addr {args.address}")
    chain_url="https://bombay-lcd.terra.dev"
    chain_fcd_url="https://bombay-fcd.terra.dev"

    deployer = Deployer(client=LCDClient(
        url=chain_url, 
        chain_id=args.chain or "bombay-12",
        gas_prices=Coins(requests.get(f"{chain_fcd_url}/v1/txs/gas_prices").json())))

    exe_result = asyncio.run(deployer.execute_contract(args.address, json.loads(args.msg)))
    LOG.info(f"Execute Result {exe_result} \n\n Execute Finished.")

local

Local command -- Used to spin up a local instance of localterra or your favorite multi-chain dev-env

LocalCmd (ACmd)

Local command -- Used to spin up a local instance of localterra or your favorite multi-chain dev-env

Source code in capsule/cmds/local.py
class LocalCmd(ACmd):
    """
        Local command -- Used to spin up a local instance of localterra or your favorite multi-chain dev-env
    """

    CMD_NAME = "local"
    CMD_HELP = "Attempt to setup a local chain instance using Git and Docker."
    CMD_USAGE = """
    $ capsule local"""
    CMD_DESCRIPTION = "Helper tool which attempts to git clone the localterra repo and then compose it as services which you can use for local dev env contract testing"

    def initialise(self):
        # Define usage and description
        self.parser.usage = self.CMD_USAGE
        self.parser.description = self.CMD_DESCRIPTION

        # Add any positional or optional arguments here
        self.parser.add_argument("-p", "--package",
                                 type=str,
                                 help="(required) Name of new or path to existing package")

        # Add any positional or optional arguments here
        self.parser.add_argument("-d", "--down",
                                 action='store_false',
                                 help="(Optional) Whether to spin down, include to spin down an instance")

        self.parser.add_argument("-c", "--chain",
                                 type=str,
                                 default="",
                                 help="(Optional) A chain to deploy too. Defaults to localterra")

    def run_command(self, args):
        """Schema:
            Check if we already have a local terra, if so return the path

            If we don't git clone the localterra repo and then return the path

            Pass path to docker-compose and attempt to interact

            Return success.
        """
        LOG.info("Starting local setup of localterra")
        # Start by doing a git clone
        # Check if the repo is already cloned in the default location
        repo = self.get_localterra_repo(self)
        LOG.info(
            "Localterra repo actions finished, now attempting to use its compose-file")
        # Spin up or down via compose
        self.run_docker_compose(args, dirname=repo.working_tree_dir)
        LOG.info("Finish docker interaction")

        LOG.info("Command run finished")

    def get_localterra_repo(self, CLONE_LOCATION=DEFAULT_CLONE_PATH, CLONE_REMOTE_URL="git@github.com:terra-money/LocalTerra.git"):
        """Attempts to get a localterra repository so that localterra can be interacted with.
        Note: Any compose spun up away from the capsule tool may not be accessible by the tool.
        You need to ensure you are passing the correct Docker Compose in order to use this tool to interact with your already deployed services

        Args:
            CLONE_LOCATION (str, optional): Where the repos will go, defaults to DEFAULT_CLONE_PATH which is "~/.capsule/localterra-clones".
            CLONE_REMOTE_URL (str, optional): The git url to clone from. Defaults to "git@github.com:terra-money/LocalTerra.git".

        Returns:
            git.Repo: The repo to be worked with
        """
        if os.path.exists(os.path.realpath(CLONE_LOCATION)):
            LOG.info("localterra appears to be already cloned, skipped reclone")
            return git.Repo(os.path.realpath(CLONE_LOCATION))
        return git.Repo.clone_from(url=CLONE_REMOTE_URL, to_path=CLONE_LOCATION, depth=1)

    def run_docker_compose(args, dirname, filename="docker-compose.yml"):
        """Take a directory and a filename for the compose and attempt to
        interact with docker-compose

        Args:
            dirname ([type]): The directory of the project where the compose file is
            filename (str, optional): The compose file name. Defaults to "docker-compose.yml".

        Returns:
            process: The process
        """
        command_name = ["docker-compose", "-f",
                        os.path.join(dirname, filename), "up" if args.down else "down"]
        LOG.info(f"[Compose Command] {''.join(command_name)}")
        try:
            LOG.info("Starting compose command \n\n\n")
            with subprocess.Popen(command_name, stdin=PIPE, stdout=PIPE, stderr=STDOUT, universal_newlines=True) as process:
                while process.poll() is None:
                    line = process.stdout.readline()
                    LOG.info(line.rstrip())
        except KeyboardInterrupt:
            # process.terminate()
            sys.exit()
        except Exception as ex:
            print("Encountered an error : ", ex)
get_localterra_repo(self, CLONE_LOCATION='/Users/ryan/.capsule/localterra-clones', CLONE_REMOTE_URL='git@github.com:terra-money/LocalTerra.git')

Attempts to get a localterra repository so that localterra can be interacted with. Note: Any compose spun up away from the capsule tool may not be accessible by the tool. You need to ensure you are passing the correct Docker Compose in order to use this tool to interact with your already deployed services

Parameters:

Name Type Description Default
CLONE_LOCATION str

Where the repos will go, defaults to DEFAULT_CLONE_PATH which is "~/.capsule/localterra-clones".

'/Users/ryan/.capsule/localterra-clones'
CLONE_REMOTE_URL str

The git url to clone from. Defaults to "git@github.com:terra-money/LocalTerra.git".

'git@github.com:terra-money/LocalTerra.git'

Returns:

Type Description
git.Repo

The repo to be worked with

Source code in capsule/cmds/local.py
def get_localterra_repo(self, CLONE_LOCATION=DEFAULT_CLONE_PATH, CLONE_REMOTE_URL="git@github.com:terra-money/LocalTerra.git"):
    """Attempts to get a localterra repository so that localterra can be interacted with.
    Note: Any compose spun up away from the capsule tool may not be accessible by the tool.
    You need to ensure you are passing the correct Docker Compose in order to use this tool to interact with your already deployed services

    Args:
        CLONE_LOCATION (str, optional): Where the repos will go, defaults to DEFAULT_CLONE_PATH which is "~/.capsule/localterra-clones".
        CLONE_REMOTE_URL (str, optional): The git url to clone from. Defaults to "git@github.com:terra-money/LocalTerra.git".

    Returns:
        git.Repo: The repo to be worked with
    """
    if os.path.exists(os.path.realpath(CLONE_LOCATION)):
        LOG.info("localterra appears to be already cloned, skipped reclone")
        return git.Repo(os.path.realpath(CLONE_LOCATION))
    return git.Repo.clone_from(url=CLONE_REMOTE_URL, to_path=CLONE_LOCATION, depth=1)
run_command(self, args)

Schema: Check if we already have a local terra, if so return the path

If we don't git clone the localterra repo and then return the path

Pass path to docker-compose and attempt to interact

Return success.

Source code in capsule/cmds/local.py
def run_command(self, args):
    """Schema:
        Check if we already have a local terra, if so return the path

        If we don't git clone the localterra repo and then return the path

        Pass path to docker-compose and attempt to interact

        Return success.
    """
    LOG.info("Starting local setup of localterra")
    # Start by doing a git clone
    # Check if the repo is already cloned in the default location
    repo = self.get_localterra_repo(self)
    LOG.info(
        "Localterra repo actions finished, now attempting to use its compose-file")
    # Spin up or down via compose
    self.run_docker_compose(args, dirname=repo.working_tree_dir)
    LOG.info("Finish docker interaction")

    LOG.info("Command run finished")
run_docker_compose(args, dirname, filename='docker-compose.yml')

Take a directory and a filename for the compose and attempt to interact with docker-compose

Parameters:

Name Type Description Default
dirname [type]

The directory of the project where the compose file is

required
filename str

The compose file name. Defaults to "docker-compose.yml".

'docker-compose.yml'

Returns:

Type Description
process

The process

Source code in capsule/cmds/local.py
def run_docker_compose(args, dirname, filename="docker-compose.yml"):
    """Take a directory and a filename for the compose and attempt to
    interact with docker-compose

    Args:
        dirname ([type]): The directory of the project where the compose file is
        filename (str, optional): The compose file name. Defaults to "docker-compose.yml".

    Returns:
        process: The process
    """
    command_name = ["docker-compose", "-f",
                    os.path.join(dirname, filename), "up" if args.down else "down"]
    LOG.info(f"[Compose Command] {''.join(command_name)}")
    try:
        LOG.info("Starting compose command \n\n\n")
        with subprocess.Popen(command_name, stdin=PIPE, stdout=PIPE, stderr=STDOUT, universal_newlines=True) as process:
            while process.poll() is None:
                line = process.stdout.readline()
                LOG.info(line.rstrip())
    except KeyboardInterrupt:
        # process.terminate()
        sys.exit()
    except Exception as ex:
        print("Encountered an error : ", ex)

new

NewCmd (ACmd)

Source code in capsule/cmds/new.py
class NewCmd(ACmd):

    CMD_NAME = "new"
    CMD_HELP = "Simple command to create a new cosmwasm smart contract. Unless specified, will use x"
    CMD_USAGE = """
    $ capsule new -p ./<path_to_my_contracts_root> -n my_new_contract
    $ capsule new --path ./<path_to_my_contracts_root> -n my_new_contract
    $ capsule new -p <path_to_my_contracts_root> -n my_new_contract -u <other cosmwasm cargo codegen template>"""
    CMD_DESCRIPTION = "Simple command to create a new cosmwasm smart contract. Unless specified, will use x"

    def initialise(self):
        # Define usage and description
        self.parser.usage = self.CMD_USAGE
        self.parser.description = self.CMD_DESCRIPTION

        # Add any positional or optional arguments here
        self.parser.add_argument("-p", "--package",
                                 type=str,
                                 help="(required) Name of new or path to existing package")

        # Add any positional or optional arguments here
        self.parser.add_argument("-u", "--codeid",
                                 type=str,
                                 default="",
                                 help="URL of your own template to use")

        self.parser.add_argument("-n", "--name",
                                 type=str,
                                 default="",
                                 help="Name of your new smart-contract package")

        self.parser.add_argument("-b", "--branch",
                                 type=str,
                                 default="0.16",
                                 help="Branch/Version of your new smart-contract package")

    def run_command(self, args):
        """
        """
        LOG.info("Starting cargo generate of new package")
        output_dir = os.path.abspath(args.package)
        contract_dir = os.path.join(output_dir, args.name)

        if os.path.exists(contract_dir):
            raise ValueError("Directory exists, use a diff name boss")

        try:
            process = subprocess.run(['cargo', 'generate', '--git', 'https://github.com/CosmWasm/cw-template.git', '--branch',
                                     args.branch, '--name', args.name], cwd=output_dir, check=True, stdout=subprocess.PIPE, universal_newlines=True)
        except subprocess.CalledProcessError as grepexc:
            LOG.info("error code", grepexc.returncode, grepexc.output)
        LOG.info(process.stdout)
        LOG.info("[New Contract] Contract creation finished")
run_command(self, args)
Source code in capsule/cmds/new.py
def run_command(self, args):
    """
    """
    LOG.info("Starting cargo generate of new package")
    output_dir = os.path.abspath(args.package)
    contract_dir = os.path.join(output_dir, args.name)

    if os.path.exists(contract_dir):
        raise ValueError("Directory exists, use a diff name boss")

    try:
        process = subprocess.run(['cargo', 'generate', '--git', 'https://github.com/CosmWasm/cw-template.git', '--branch',
                                 args.branch, '--name', args.name], cwd=output_dir, check=True, stdout=subprocess.PIPE, universal_newlines=True)
    except subprocess.CalledProcessError as grepexc:
        LOG.info("error code", grepexc.returncode, grepexc.output)
    LOG.info(process.stdout)
    LOG.info("[New Contract] Contract creation finished")

query

Query command -- Used to perform queries on MultiChain contracts

QueryCmd (ACmd)

Query command -- Used to perform queries on MultiChain contracts

Source code in capsule/cmds/query.py
class QueryCmd(ACmd):
    """
        Query command -- Used to perform queries on MultiChain contracts
    """

    CMD_NAME = "query"
    CMD_HELP = "Attempt to perform a query on a given contract address."
    CMD_USAGE = """
    $ capsule query --contract <addr> --chain=<> --query=<query>"""
    CMD_DESCRIPTION = "Helper tool which exposes the ability to perform queries on chain specific contract addresses"

    def initialise(self):
        # Define usage and description
        self.parser.usage = self.CMD_USAGE
        self.parser.description = self.CMD_DESCRIPTION

        # Add any positional or optional arguments here
        self.parser.add_argument("-a", "--address",
                                 type=str,
                                 help="(required) Contract Address to perform query on")

        # Add any positional or optional arguments here
        self.parser.add_argument("-q", "--query",
                                type=str,
                                default={},
                                help="(Optional) The query message for the contract you are trying to query. Must be a json-like str")

        self.parser.add_argument("-c", "--chain",
                                 type=str,
                                 default="",
                                 help="(Optional) A chain to deploy too. Defaults to localterra")

    def run_command(self, args):
        """

        """
        LOG.info(f"Performing query on contract addr {args.address}")
        chain_url="https://bombay-lcd.terra.dev"
        chain_fcd_url="https://bombay-fcd.terra.dev"

        deployer = Deployer(client=LCDClient(
            url=chain_url, 
            chain_id=args.chain or "bombay-12",
            gas_prices=Coins(requests.get(f"{chain_fcd_url}/v1/txs/gas_prices").json())))

        query_result = asyncio.run(deployer.query_contract(args.address, json.loads(args.query)))
        LOG.info(f"Query Result {query_result} \n\n Query Finished.")
run_command(self, args)
Source code in capsule/cmds/query.py
def run_command(self, args):
    """

    """
    LOG.info(f"Performing query on contract addr {args.address}")
    chain_url="https://bombay-lcd.terra.dev"
    chain_fcd_url="https://bombay-fcd.terra.dev"

    deployer = Deployer(client=LCDClient(
        url=chain_url, 
        chain_id=args.chain or "bombay-12",
        gas_prices=Coins(requests.get(f"{chain_fcd_url}/v1/txs/gas_prices").json())))

    query_result = asyncio.run(deployer.query_contract(args.address, json.loads(args.query)))
    LOG.info(f"Query Result {query_result} \n\n Query Finished.")

verify

VerifyCmd (ACmd)

Source code in capsule/cmds/verify.py
class VerifyCmd(ACmd):

    CMD_NAME = "verify"
    CMD_HELP = "Attempt to perform a smart contract verification against a given code id. Attempts to perform a deterministic comparison by compiling the provided contract into an optmized wasm and then comparing its checksum with the code_hash of the provided code ID code object. Note if you are running an M1 Mac or an ARM based machine you will not get perfect matches on verification as most production deployments are done from a Intel based machine."
    CMD_USAGE = """
    $ capsule verify -p ./<path_to_my_contracts_root> -c columbus-5 -i 3
    $ capsule verify --path ./<path_to_my_contracts_root> --chain tequila-0004
    $ capsule verify -p <path_to_my_contracts_root> -c bombay-12 -i 300"""
    CMD_DESCRIPTION = "Helper tool which enables you to perform a Smart Contract Verification (SCV) by providing the path to a single smart-contract repo and providing a code id. The project path is passed either to `cargo run-script optimize` or to a custom docker invocation for ARM64 to create an optmized production wasm of the contract for comparison. The code id is used to query a stored code object's byte_code on the respective chain. Once the byte_code is gathered we get the SHA256 of this to compare to our locally prepared optimized wasm. If the SHA265 of on chain code object matches the SHA256 checksum on the optimized build we have verified the contact. Note: The outputted SHA256 and locally build optimized build will be different on ARM vs Intel. An ARM machine can only really be used to verify images which were built and uploaded from an ARM machine"

    def initialise(self):
        # Define usage and description
        self.parser.usage = self.CMD_USAGE
        self.parser.description = self.CMD_DESCRIPTION

        # Add any positional or optional arguments here
        self.parser.add_argument("-p", "--package",
                                 type=str,
                                 help="(required) Name of new or path to existing package")

        # Add any positional or optional arguments here
        self.parser.add_argument("-i", "--codeid",
                                 type=int,
                                 default=0,
                                 help="The code_id to compare the provided contract against")

        self.parser.add_argument("-c", "--chain",
                                 type=str,
                                 default="",
                                 help="(Optional) A chain to deploy too. Defaults to localterra")

        self.parser.add_argument("-n", "--nobuild",
                                 action='store_true',
                                 help="(Optional) Skip the building and go right to comparing the onchain code with your own. This assumes you already ran an optimized build and saves you rebuilding each time you run verify command")




    def run_command(self, args):

        """Schema:
            Read Mnemonic from env as well as host to deploy on 
            any specified chain/account 

            Prepare defaults for the above

            Prepare an optimized build using the appropiate technique 
            If nobuild is provided; skip above and go right to the checksums.txt

            Query the byte_code from the lcd client 

            Prepare a hash of the queried byte_code and compare 

            Return success. 
        """
        LOG.info("Starting verification")
        # Setup the Deployer with its lcd, fcd urls as well as the desired chain.
        # config = asyncio.run(get_config())
        chain_url="https://bombay-lcd.terra.dev"
        chain_fcd_url="https://bombay-fcd.terra.dev"
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        # config = asyncio.run(get_config())
        # # TODO: Review setting up a list of urls in project rather than just depending on settings in config
        # chain_url = config.get("networks", {}).get(args.chain, DEFAULT_TESTNET_CHAIN).get("chain_url")
        # chain_fcd_url = config.get("networks", {}).get(args.chain, DEFAULT_TESTNET_CHAIN).get("chain_fcd_url")

        # TODO: Validate Init_msg is wellformed json 

        deployer = Deployer(client=LCDClient(
            url=chain_url, 
            chain_id=args.chain or "bombay-12",
            gas_prices=Coins(requests.get(f"{chain_fcd_url}/v1/txs/gas_prices").json())))



        contract_dir = os.path.abspath(args.package)
        # TODO: Refactor into a neater function 
        if not args.nobuild: 
            if platform.uname()[4] == "arm64":
                try:
                    # process = subprocess.run(['docker', 'run', '--rm', '-v', '"$(pwd)":/code',
                    # '--mount', 'type=volume,source="$(basename "$(pwd)")_cache",target=/code/target',
                    # '--mount', 'type=volume,source=registry_cache,target=/usr/local/cargo/registry',
                    # 'cosmwasm/rust-optimizer-arm64:0.12.4'], cwd=contract_dir, check=True, stdout=subprocess.PIPE, universal_newlines=True)

                    process = subprocess.run('docker run --rm -v "$(pwd)":/code \
                            --mount type=volume,source="$(basename "$(pwd)")_cache",target=/code/target \
                            --mount type=volume,source=registry_cache,target=/usr/local/cargo/registry \
                            cosmwasm/rust-optimizer-arm64:0.12.4', cwd=contract_dir, shell=True, check=True, stdout=subprocess.PIPE, universal_newlines=True)

                    LOG.info(process.stdout)

                except subprocess.CalledProcessError as grepexc:                                                                                                   
                    LOG.info("error code", grepexc.returncode, grepexc.output)
            else:
                try:
                    process = subprocess.run(['cargo','run-script','optimize'], cwd=contract_dir, check=True, stdout=subprocess.PIPE, universal_newlines=True)

                    LOG.info(process.stdout)
                except subprocess.CalledProcessError as grepexc:                                                                                                   
                    LOG.info("error code", grepexc.returncode, grepexc.output)

        # code_id_info = asyncio.run(deployer.query_code_id(args.codeid))
        # LOG.info(base64.b64decode(code_id_info['result']['code_hash']))
        code_byte_code = asyncio.run(deployer.query_code_bytecode(args.codeid))
        on_chain = hashlib.sha256()
        on_chain.update(base64.b64decode(code_byte_code['byte_code']))
        # LOG.info(on_chain.digest())
        # LOG.info(base64.b16encode(on_chain.digest()))
        on_chain_code_sha = base64.b16encode(on_chain.digest())
        # Get the hash from artifacts 
        # TODO: Make less POC and more MVP, what if there are multiple checksums 
        with open(contract_dir+"/artifacts/checksums.txt", "r") as file:
            first_line = file.readline()
            sha_to_compare = first_line.split("  ")[0].encode()
            LOG.info(f"[SHA265 of code with id {args.codeid}] {on_chain_code_sha.lower()}")
            LOG.info(f"[SHA265 of code with path {args.package}] {sha_to_compare}")

            if sha_to_compare == on_chain_code_sha.lower():
                LOG.info("[Verification]: Success. The provided contract wasm matches the provided code ID's byte_code hash")
            else:
                LOG.info("[Verification]: Failed. The provided contract wasm does not match the provided code ID's byte_code hash")
run_command(self, args)

Schema: Read Mnemonic from env as well as host to deploy on any specified chain/account

Prepare defaults for the above

Prepare an optimized build using the appropiate technique If nobuild is provided; skip above and go right to the checksums.txt

Query the byte_code from the lcd client

Prepare a hash of the queried byte_code and compare

Return success.

Source code in capsule/cmds/verify.py
def run_command(self, args):

    """Schema:
        Read Mnemonic from env as well as host to deploy on 
        any specified chain/account 

        Prepare defaults for the above

        Prepare an optimized build using the appropiate technique 
        If nobuild is provided; skip above and go right to the checksums.txt

        Query the byte_code from the lcd client 

        Prepare a hash of the queried byte_code and compare 

        Return success. 
    """
    LOG.info("Starting verification")
    # Setup the Deployer with its lcd, fcd urls as well as the desired chain.
    # config = asyncio.run(get_config())
    chain_url="https://bombay-lcd.terra.dev"
    chain_fcd_url="https://bombay-fcd.terra.dev"
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    # config = asyncio.run(get_config())
    # # TODO: Review setting up a list of urls in project rather than just depending on settings in config
    # chain_url = config.get("networks", {}).get(args.chain, DEFAULT_TESTNET_CHAIN).get("chain_url")
    # chain_fcd_url = config.get("networks", {}).get(args.chain, DEFAULT_TESTNET_CHAIN).get("chain_fcd_url")

    # TODO: Validate Init_msg is wellformed json 

    deployer = Deployer(client=LCDClient(
        url=chain_url, 
        chain_id=args.chain or "bombay-12",
        gas_prices=Coins(requests.get(f"{chain_fcd_url}/v1/txs/gas_prices").json())))



    contract_dir = os.path.abspath(args.package)
    # TODO: Refactor into a neater function 
    if not args.nobuild: 
        if platform.uname()[4] == "arm64":
            try:
                # process = subprocess.run(['docker', 'run', '--rm', '-v', '"$(pwd)":/code',
                # '--mount', 'type=volume,source="$(basename "$(pwd)")_cache",target=/code/target',
                # '--mount', 'type=volume,source=registry_cache,target=/usr/local/cargo/registry',
                # 'cosmwasm/rust-optimizer-arm64:0.12.4'], cwd=contract_dir, check=True, stdout=subprocess.PIPE, universal_newlines=True)

                process = subprocess.run('docker run --rm -v "$(pwd)":/code \
                        --mount type=volume,source="$(basename "$(pwd)")_cache",target=/code/target \
                        --mount type=volume,source=registry_cache,target=/usr/local/cargo/registry \
                        cosmwasm/rust-optimizer-arm64:0.12.4', cwd=contract_dir, shell=True, check=True, stdout=subprocess.PIPE, universal_newlines=True)

                LOG.info(process.stdout)

            except subprocess.CalledProcessError as grepexc:                                                                                                   
                LOG.info("error code", grepexc.returncode, grepexc.output)
        else:
            try:
                process = subprocess.run(['cargo','run-script','optimize'], cwd=contract_dir, check=True, stdout=subprocess.PIPE, universal_newlines=True)

                LOG.info(process.stdout)
            except subprocess.CalledProcessError as grepexc:                                                                                                   
                LOG.info("error code", grepexc.returncode, grepexc.output)

    # code_id_info = asyncio.run(deployer.query_code_id(args.codeid))
    # LOG.info(base64.b64decode(code_id_info['result']['code_hash']))
    code_byte_code = asyncio.run(deployer.query_code_bytecode(args.codeid))
    on_chain = hashlib.sha256()
    on_chain.update(base64.b64decode(code_byte_code['byte_code']))
    # LOG.info(on_chain.digest())
    # LOG.info(base64.b16encode(on_chain.digest()))
    on_chain_code_sha = base64.b16encode(on_chain.digest())
    # Get the hash from artifacts 
    # TODO: Make less POC and more MVP, what if there are multiple checksums 
    with open(contract_dir+"/artifacts/checksums.txt", "r") as file:
        first_line = file.readline()
        sha_to_compare = first_line.split("  ")[0].encode()
        LOG.info(f"[SHA265 of code with id {args.codeid}] {on_chain_code_sha.lower()}")
        LOG.info(f"[SHA265 of code with path {args.package}] {sha_to_compare}")

        if sha_to_compare == on_chain_code_sha.lower():
            LOG.info("[Verification]: Success. The provided contract wasm matches the provided code ID's byte_code hash")
        else:
            LOG.info("[Verification]: Failed. The provided contract wasm does not match the provided code ID's byte_code hash")

Lib and Helpers

ACmd

ACmd is an abstraction that helps to define a consistant minimal interface for each command in this CLI. When a class is a subclass of this you have the assurance: The class is-a command of some sort It has info for its Name, Description, Example Usage and Help It has a run_command function to execute

Source code in capsule/abstractions/ACmd.py
class ACmd(object):
    """ACmd is an abstraction that helps to
    define a consistant minimal interface for
    each command in this CLI.
    When a class is a subclass of this you have the assurance:
    The class is-a command of some sort
    It has info for its Name, Description, Example Usage and Help
    It has a run_command function to execute
    """
    REQUIRED_FIELDS = ['cmd_name', 'cmd_description', 'cmd_help', 'cmd_usage']


    def __init__(self, sub_parser):
        # Raise an exception if a subclass does not set values for all of the required fields
        if not all(getattr(self, field.upper()) is not None for field in self.REQUIRED_FIELDS):
            raise Exception("Command did not implement all the required fields")

        self.parser = sub_parser.add_parser(self.CMD_NAME,
                            help=self.CMD_HELP,
                            formatter_class=HelpFormatter,
                            parents=[])
        self.initialise(self)


    def initialise(self):
        raise NotImplementedError()

    def run_command(self, args):
        raise NotImplementedError()

config_handler

Load configuration from .toml file.

get_config(config_path=None) async

Simple function which takes a config_file and attempts to parse it as a toml config returning the parsed result as a dict

Source code in capsule/lib/config_handler.py
async def get_config(config_path=None):
    """Simple function which takes a config_file
    and attempts to parse it as a toml config
    returning the parsed result as a dict
    """
    filename = get_config_file(filename=config_path)
    # Read toml file
    config = toml.load(filename)

    LOG.debug(f"Found these networks available: {config['networks']}")
    LOG.debug(f"Found this deployment info: {config['deploy_info']['mnemonic']}")

    return config

get_config_file(filename=None)

Attempts to get the location of the capsule config. One of 4 things occurs here, either: - A config location is gathered from the environment - A specified filename is provided and a config file with this name is created - A specified filename is provided and if a config file with this name if found in the current dir then this is used - A default config file is created in the ~/.capsule directory

Parameters:

Name Type Description Default
filename [str]

A specified filename to create for config. Defaults to None.

None

Returns:

Type Description
[str]

[The config files path.]

Source code in capsule/lib/config_handler.py
def get_config_file(filename=None):
    """Attempts to get the location of 
    the capsule config. 
    One of 4 things occurs here, either:
    - A config location is gathered from the environment
    - A specified filename is provided and a config file with this name is created
    - A specified filename is provided and if a config file with this name if found in the current dir then this is used
    - A default config file is created in the ~/.capsule directory

    Args:
        filename ([str], optional): A specified filename to create for config. Defaults to None.

    Returns:
        [str]: [The config files path.]
    """

    # First check if there is a specified location in the env
    env_config_file = os.environ.get(DEFAULT_CONFIG_FILE_ENV_VAR, None)

    # If a config was specified in the env, this takes priority
    if env_config_file: return env_config_file

    # If a filename was provided use this as the config file name
    if filename: return os.path.abspath(os.path.expandvars(os.path.expanduser(filename)))

    # Otherwise use a default one located in the .capsule directory at the home dir
    config_file = os.path.expanduser(os.path.join("~", ".capsule", DEFAULT_CONFIG_FILE_NAME))
    capsule_dir = os.path.dirname(config_file)
    # Check if the capsule directory has been created and if not, create it.
    if not os.path.exists(capsule_dir):
        os.makedirs(capsule_dir)
    return config_file

credential_handler

get_mnemonic(strict=False) async

Attempt to gather a mnemonic from one of the available sources First, if a mnemonic is defined in the env, use that. Next, check the config file for the secret If no mnemonic can be found, optionally raise an Exception

Parameters:

Name Type Description Default
strict bool

When set to true, if no mnemonic is found an exception is raised. Defaults to False.

False

Returns:

Type Description
str

The mnemonic found either in the env or in the config file

Source code in capsule/lib/credential_handler.py
async def get_mnemonic(strict=False):
    """Attempt to gather a mnemonic from one of the available sources
    First, if a mnemonic is defined in the env, use that.
    Next, check the config file for the secret 
    If no mnemonic can be found, optionally raise an Exception


    Args:
        strict (bool, optional): When set to true, if no mnemonic is found an exception is raised. Defaults to False.

    Returns:
        str: The mnemonic found either in the env or in the config file
    """
    if os.getenv("CAPSULE_MNEMONIC", False):
        return os.environ["CAPSULE_MNEMONIC"]

    config = await get_config()
    if config.get("deploy_info", {}).get("mnemonic", False):
        return config.get("deploy_info", {}).get("mnemonic", False)

    if strict:
        raise Exception("No Mnemonic was found either in the specified config file or in the environment. Strict mode is set to true")
    return None

deployer

Deployer

Deployer is a simple facade object providing an interface towards general deployment actions such as sending messages, getting addresses, uploading code objects, instantiating them into contracts and also executing or querying those contracts

Source code in capsule/lib/deployer.py
class Deployer():
    """Deployer is a simple facade object
    providing an interface towards general deployment
    actions such as sending messages, getting addresses,
    uploading code objects, instantiating them into contracts
    and also executing or querying those contracts
    """

    def __init__(self, client: LCDClient) -> None:

        self.client = client
        self.mnemonic = asyncio.run(get_mnemonic())
        LOG.debug(self.mnemonic)
        self.deployer = Wallet(lcd=self.client, key=MnemonicKey(self.mnemonic))
        self.std_fee = StdFee(4000000, "600000uluna")

    async def send_msg(self, msg):
        """send_msg attempts to create 
        and sign a transaction with the provided
        msg and then broadcasts the tx

        """
        tx = self.deployer.create_and_sign_tx(
            msgs=[msg], fee=self.std_fee
        )
        # estimated = self.client.tx.estimate_fee(tx, fee_denoms=["uusd"], msgs=[msg])
        # LOG.info(f'estimated fee: {estimated}')
        return self.client.tx.broadcast(tx)

    async def store_contract(self, contract_name:str, contract_path:str="") -> str:
        """store_contract attempts to 
        gather a given wasm artifact file 
        and upload it to the given chain as a StoredCodeObject
        The storage operation is done using the send_msg helper

        Args:
            contract_name (str): The name of the contract to deploy
            contract_path (str, optional): The path to a wasm contract artifact to deploy. Defaults to "".

        Returns:
            str: The contract storage result is parsed and just the code id of the stored code object is returned
        """
        # If the full path was provided, use it else assume its located in artifacts
        bytes = read_file_as_b64(contract_path if contract_path else f"artifacts/{contract_name}.wasm")
        msg = MsgStoreCode(self.deployer.key.acc_address, bytes)
        contract_storage_result = await self.send_msg(msg)
        LOG.info(contract_storage_result)
        return get_code_id(contract_storage_result)

    async def instantiate_contract(self, code_id: str, init_msg:dict) -> str:
        """instantiate_contract attempts to 
        instantiate a code object with an init msg 
        into a live contract on the network. 

        Args:
            code_id (str): The code_id of the stored wasm contract artifact
            init_msg (dict): The init msg to send to setup the contract

        Returns:
            str: The contracts address
        """
        msg = MsgInstantiateContract(
            sender=self.deployer.key.acc_address,
            admin=self.deployer.key.acc_address,
            code_id=code_id,
            init_msg=init_msg
        )

        instantiation_result = await self.send_msg(msg)

        LOG.info(instantiation_result)
        return get_contract_address(instantiation_result)

    async def execute_contract(self, contract_addr: str, execute_msg, coins = []):
        """Execute a message to perform an action on a given contract, returning the result

        Args:
            contract_addr (str): The contract to execute a msg on 
            execute_msg (dict): The msg to execute on the contract
            coins (list, optional): Coins which may be needed for the execution tx. Defaults to [].

        Returns:
            dict: execution results
        """
        msg = MsgExecuteContract(
            sender=self.deployer.key.acc_address,
            contract=contract_addr,
            execute_msg=execute_msg,
            coins=coins
        )
        exe_result = await self.send_msg(msg)
        LOG.debug(exe_result)
        return exe_result

    async def query_contract(self, contract_addr: str, query_msg: dict):
        """Perform a query on a given contract, returning the result

        Args:
            contract_addr (str): The contract to perform the query on 
            query_msg (dict): The query to perform

        Returns:
            dict: Query Result
        """
        LOG.info(f"Query to be ran {query_msg}")
        query_result = self.client.wasm.contract_query(contract_addr, query_msg)

        LOG.info(query_result)
        return query_result

    async def query_code_id(self, code_id: int):
        """
        """
        LOG.info(f"Query to be ran {code_id}")
        # query_result = self.client.wasm.code_info(code_id)
        # query_two = self.client.wasm._c._get(f"/wasm/codes/{code_id}")

        query_raw = requests.get(f"https://bombay-lcd.terra.dev/wasm/codes/{code_id}").json()
        return query_raw

    async def query_code_bytecode(self, code_id: int):
        """
        """
        LOG.info(f"Query to be ran {code_id}")
        # query_result = self.client.wasm.code_info(code_id)
        # query_two = self.client.wasm._c._get(f"/wasm/codes/{code_id}")
        # TODO: This query is not cross-chain capable 
        query_raw = requests.get(f"https://bombay-lcd.terra.dev/terra/wasm/v1beta1/codes/36374/byte_code").json()
        return query_raw
execute_contract(self, contract_addr, execute_msg, coins=[]) async

Execute a message to perform an action on a given contract, returning the result

Parameters:

Name Type Description Default
contract_addr str

The contract to execute a msg on

required
execute_msg dict

The msg to execute on the contract

required
coins list

Coins which may be needed for the execution tx. Defaults to [].

[]

Returns:

Type Description
dict

execution results

Source code in capsule/lib/deployer.py
async def execute_contract(self, contract_addr: str, execute_msg, coins = []):
    """Execute a message to perform an action on a given contract, returning the result

    Args:
        contract_addr (str): The contract to execute a msg on 
        execute_msg (dict): The msg to execute on the contract
        coins (list, optional): Coins which may be needed for the execution tx. Defaults to [].

    Returns:
        dict: execution results
    """
    msg = MsgExecuteContract(
        sender=self.deployer.key.acc_address,
        contract=contract_addr,
        execute_msg=execute_msg,
        coins=coins
    )
    exe_result = await self.send_msg(msg)
    LOG.debug(exe_result)
    return exe_result
instantiate_contract(self, code_id, init_msg) async

instantiate_contract attempts to instantiate a code object with an init msg into a live contract on the network.

Parameters:

Name Type Description Default
code_id str

The code_id of the stored wasm contract artifact

required
init_msg dict

The init msg to send to setup the contract

required

Returns:

Type Description
str

The contracts address

Source code in capsule/lib/deployer.py
async def instantiate_contract(self, code_id: str, init_msg:dict) -> str:
    """instantiate_contract attempts to 
    instantiate a code object with an init msg 
    into a live contract on the network. 

    Args:
        code_id (str): The code_id of the stored wasm contract artifact
        init_msg (dict): The init msg to send to setup the contract

    Returns:
        str: The contracts address
    """
    msg = MsgInstantiateContract(
        sender=self.deployer.key.acc_address,
        admin=self.deployer.key.acc_address,
        code_id=code_id,
        init_msg=init_msg
    )

    instantiation_result = await self.send_msg(msg)

    LOG.info(instantiation_result)
    return get_contract_address(instantiation_result)
query_code_bytecode(self, code_id) async
Source code in capsule/lib/deployer.py
async def query_code_bytecode(self, code_id: int):
    """
    """
    LOG.info(f"Query to be ran {code_id}")
    # query_result = self.client.wasm.code_info(code_id)
    # query_two = self.client.wasm._c._get(f"/wasm/codes/{code_id}")
    # TODO: This query is not cross-chain capable 
    query_raw = requests.get(f"https://bombay-lcd.terra.dev/terra/wasm/v1beta1/codes/36374/byte_code").json()
    return query_raw
query_code_id(self, code_id) async
Source code in capsule/lib/deployer.py
async def query_code_id(self, code_id: int):
    """
    """
    LOG.info(f"Query to be ran {code_id}")
    # query_result = self.client.wasm.code_info(code_id)
    # query_two = self.client.wasm._c._get(f"/wasm/codes/{code_id}")

    query_raw = requests.get(f"https://bombay-lcd.terra.dev/wasm/codes/{code_id}").json()
    return query_raw
query_contract(self, contract_addr, query_msg) async

Perform a query on a given contract, returning the result

Parameters:

Name Type Description Default
contract_addr str

The contract to perform the query on

required
query_msg dict

The query to perform

required

Returns:

Type Description
dict

Query Result

Source code in capsule/lib/deployer.py
async def query_contract(self, contract_addr: str, query_msg: dict):
    """Perform a query on a given contract, returning the result

    Args:
        contract_addr (str): The contract to perform the query on 
        query_msg (dict): The query to perform

    Returns:
        dict: Query Result
    """
    LOG.info(f"Query to be ran {query_msg}")
    query_result = self.client.wasm.contract_query(contract_addr, query_msg)

    LOG.info(query_result)
    return query_result
send_msg(self, msg) async

send_msg attempts to create and sign a transaction with the provided msg and then broadcasts the tx

Source code in capsule/lib/deployer.py
async def send_msg(self, msg):
    """send_msg attempts to create 
    and sign a transaction with the provided
    msg and then broadcasts the tx

    """
    tx = self.deployer.create_and_sign_tx(
        msgs=[msg], fee=self.std_fee
    )
    # estimated = self.client.tx.estimate_fee(tx, fee_denoms=["uusd"], msgs=[msg])
    # LOG.info(f'estimated fee: {estimated}')
    return self.client.tx.broadcast(tx)
store_contract(self, contract_name, contract_path='') async

store_contract attempts to gather a given wasm artifact file and upload it to the given chain as a StoredCodeObject The storage operation is done using the send_msg helper

Parameters:

Name Type Description Default
contract_name str

The name of the contract to deploy

required
contract_path str

The path to a wasm contract artifact to deploy. Defaults to "".

''

Returns:

Type Description
str

The contract storage result is parsed and just the code id of the stored code object is returned

Source code in capsule/lib/deployer.py
async def store_contract(self, contract_name:str, contract_path:str="") -> str:
    """store_contract attempts to 
    gather a given wasm artifact file 
    and upload it to the given chain as a StoredCodeObject
    The storage operation is done using the send_msg helper

    Args:
        contract_name (str): The name of the contract to deploy
        contract_path (str, optional): The path to a wasm contract artifact to deploy. Defaults to "".

    Returns:
        str: The contract storage result is parsed and just the code id of the stored code object is returned
    """
    # If the full path was provided, use it else assume its located in artifacts
    bytes = read_file_as_b64(contract_path if contract_path else f"artifacts/{contract_name}.wasm")
    msg = MsgStoreCode(self.deployer.key.acc_address, bytes)
    contract_storage_result = await self.send_msg(msg)
    LOG.info(contract_storage_result)
    return get_code_id(contract_storage_result)

logging_handler

Once Place for logging stuff

setup_logger(logger_name='capsule', extra_kwargs={})

Simple wrapper to setup a logger and return it. Used by the LOG constant which is used through the project

Parameters:

Name Type Description Default
logger_name str

The name for the logger. Defaults to "capsule".

'capsule'
extra_kwargs dict

Any extra options to use with the logger. Defaults to {}.

{}
Source code in capsule/lib/logging_handler.py
def setup_logger(logger_name="capsule", extra_kwargs={}):
    """Simple wrapper to setup a logger and return it.
    Used by the LOG constant which is used through the project

    Args:
        logger_name (str, optional): The name for the logger. Defaults to "capsule".
        extra_kwargs (dict, optional): Any extra options to use with the logger. Defaults to {}.
    """
    logger = logging.getLogger(logger_name, **extra_kwargs)
    logger.setLevel(logging.INFO)
    # Setup a StreamHandler to give output to the logs
    handler = logging.StreamHandler()
    # Establish a log format for messages
    handler.setFormatter(logging.Formatter('[capsule:%(module)s] %(message)s'))
    # Add handler to logger
    logger.addHandler(handler)
    return logger