API documentation
Commands
deploy
DeployCmd (ACmd)
Source code in capsule/cmds/deploy.py
class DeployCmd(ACmd):
CMD_NAME = "deploy"
CMD_HELP = "Deploy a wasm contract artifact to a specified Terra Chain"
CMD_USAGE = """
$ capsule deploy -p ./my_contract.wasm -c columbus-5
$ capsule deploy --path ./artifacts/my_contract.wasm --chain tequila-0004
$ capsule deploy -p artifacts/capsule_test.wasm -i '{"count":17}' -c bombay-12"""
CMD_DESCRIPTION = "Helper tool which enables you to programatically deploy a Wasm contract artifact to a chain as a code object and instantiate it"
def initialise(self):
# Define usage and description
self.parser.usage = self.CMD_USAGE
self.parser.description = self.CMD_DESCRIPTION
# Add any positional or optional arguments here
self.parser.add_argument("-p", "--package",
type=str,
help="(required) Name of new or path to existing package")
# Add any positional or optional arguments here
self.parser.add_argument("-i", "--initmsg",
type=str,
default={},
help="(Optional) The initialization message for the contract you are trying to deploy. Must be a json-like str")
self.parser.add_argument("-c", "--chain",
type=str,
default="",
help="(Optional) A chain to deploy too. Defaults to localterra")
def run_command(self, args):
"""Schema:
Read Mnemonic from env as well as host to deploy on
any specified chain/account
Prepare defaults for the above
Perform a store call for the wasm contract that was specified
Verify the contract was stored with a new API call
Instantiate the code object into a contract
Return success.
"""
LOG.info("Starting deployment")
# Setup the Deployer with its lcd, fcd urls as well as the desired chain.
# config = asyncio.run(get_config())
chain_url="https://bombay-lcd.terra.dev"
chain_fcd_url="https://bombay-fcd.terra.dev"
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# config = asyncio.run(get_config())
# # TODO: Review setting up a list of urls in project rather than just depending on settings in config
# chain_url = config.get("networks", {}).get(args.chain, DEFAULT_TESTNET_CHAIN).get("chain_url")
# chain_fcd_url = config.get("networks", {}).get(args.chain, DEFAULT_TESTNET_CHAIN).get("chain_fcd_url")
# TODO: Validate Init_msg is wellformed json
deployer = Deployer(client=LCDClient(
url=chain_url,
chain_id=args.chain or "bombay-12",
gas_prices=Coins(requests.get(f"{chain_fcd_url}/v1/txs/gas_prices").json())))
# # Attempt to store the provided package as a code object, the response will be a code ID if successful
stored_code_id = asyncio.run(deployer.store_contract(contract_name="test", contract_path=args.package))
# Instantiate a contract using the stored code ID for our contract bundle
# and an init msg which will be different depending on the contract.
instantiation_result = asyncio.run(deployer.instantiate_contract(stored_code_id, init_msg=json.loads(args.initmsg)))
LOG.info(f"Successfully deployed contract artifact located at {args.package}. Contract address of instantiated contract is {instantiation_result}")
run_command(self, args)
Schema: Read Mnemonic from env as well as host to deploy on any specified chain/account
Prepare defaults for the above
Perform a store call for the wasm contract that was specified
Verify the contract was stored with a new API call
Instantiate the code object into a contract
Return success.
Source code in capsule/cmds/deploy.py
def run_command(self, args):
"""Schema:
Read Mnemonic from env as well as host to deploy on
any specified chain/account
Prepare defaults for the above
Perform a store call for the wasm contract that was specified
Verify the contract was stored with a new API call
Instantiate the code object into a contract
Return success.
"""
LOG.info("Starting deployment")
# Setup the Deployer with its lcd, fcd urls as well as the desired chain.
# config = asyncio.run(get_config())
chain_url="https://bombay-lcd.terra.dev"
chain_fcd_url="https://bombay-fcd.terra.dev"
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# config = asyncio.run(get_config())
# # TODO: Review setting up a list of urls in project rather than just depending on settings in config
# chain_url = config.get("networks", {}).get(args.chain, DEFAULT_TESTNET_CHAIN).get("chain_url")
# chain_fcd_url = config.get("networks", {}).get(args.chain, DEFAULT_TESTNET_CHAIN).get("chain_fcd_url")
# TODO: Validate Init_msg is wellformed json
deployer = Deployer(client=LCDClient(
url=chain_url,
chain_id=args.chain or "bombay-12",
gas_prices=Coins(requests.get(f"{chain_fcd_url}/v1/txs/gas_prices").json())))
# # Attempt to store the provided package as a code object, the response will be a code ID if successful
stored_code_id = asyncio.run(deployer.store_contract(contract_name="test", contract_path=args.package))
# Instantiate a contract using the stored code ID for our contract bundle
# and an init msg which will be different depending on the contract.
instantiation_result = asyncio.run(deployer.instantiate_contract(stored_code_id, init_msg=json.loads(args.initmsg)))
LOG.info(f"Successfully deployed contract artifact located at {args.package}. Contract address of instantiated contract is {instantiation_result}")
execute
Query command -- Used to perform queries on MultiChain contracts
ExecuteCmd (ACmd)
Execute command -- Used to execute actions on MultiChain contracts
Source code in capsule/cmds/execute.py
class ExecuteCmd(ACmd):
"""
Execute command -- Used to execute actions on MultiChain contracts
"""
CMD_NAME = "execute"
CMD_HELP = "Attempt to execute an action on a given contract address."
CMD_USAGE = """
$ capsule execute --contract <addr> --chain <chain> --msg <msg>"""
CMD_DESCRIPTION = "Helper tool which exposes the ability to prepare and sending ExecuteMsg's on chain specific contract addresses"
def initialise(self):
# Define usage and description
self.parser.usage = self.CMD_USAGE
self.parser.description = self.CMD_DESCRIPTION
# Add any positional or optional arguments here
self.parser.add_argument("-a", "--address",
type=str,
help="(required) Contract Address to perform query on")
# Add any positional or optional arguments here
self.parser.add_argument("-m", "--msg",
type=str,
default={},
help="(Optional) The execution message for the contract you are trying to execute an action on. Must be a json-like str")
self.parser.add_argument("-c", "--chain",
type=str,
default="",
help="(Optional) A chain to deploy too. Defaults to localterra")
def run_command(self, args):
"""
"""
LOG.info(f"Performing msg exectution on contract addr {args.address}")
chain_url="https://bombay-lcd.terra.dev"
chain_fcd_url="https://bombay-fcd.terra.dev"
deployer = Deployer(client=LCDClient(
url=chain_url,
chain_id=args.chain or "bombay-12",
gas_prices=Coins(requests.get(f"{chain_fcd_url}/v1/txs/gas_prices").json())))
exe_result = asyncio.run(deployer.execute_contract(args.address, json.loads(args.msg)))
LOG.info(f"Execute Result {exe_result} \n\n Execute Finished.")
run_command(self, args)
Source code in capsule/cmds/execute.py
def run_command(self, args):
"""
"""
LOG.info(f"Performing msg exectution on contract addr {args.address}")
chain_url="https://bombay-lcd.terra.dev"
chain_fcd_url="https://bombay-fcd.terra.dev"
deployer = Deployer(client=LCDClient(
url=chain_url,
chain_id=args.chain or "bombay-12",
gas_prices=Coins(requests.get(f"{chain_fcd_url}/v1/txs/gas_prices").json())))
exe_result = asyncio.run(deployer.execute_contract(args.address, json.loads(args.msg)))
LOG.info(f"Execute Result {exe_result} \n\n Execute Finished.")
local
Local command -- Used to spin up a local instance of localterra or your favorite multi-chain dev-env
LocalCmd (ACmd)
Local command -- Used to spin up a local instance of localterra or your favorite multi-chain dev-env
Source code in capsule/cmds/local.py
class LocalCmd(ACmd):
"""
Local command -- Used to spin up a local instance of localterra or your favorite multi-chain dev-env
"""
CMD_NAME = "local"
CMD_HELP = "Attempt to setup a local chain instance using Git and Docker."
CMD_USAGE = """
$ capsule local"""
CMD_DESCRIPTION = "Helper tool which attempts to git clone the localterra repo and then compose it as services which you can use for local dev env contract testing"
def initialise(self):
# Define usage and description
self.parser.usage = self.CMD_USAGE
self.parser.description = self.CMD_DESCRIPTION
# Add any positional or optional arguments here
self.parser.add_argument("-p", "--package",
type=str,
help="(required) Name of new or path to existing package")
# Add any positional or optional arguments here
self.parser.add_argument("-d", "--down",
action='store_false',
help="(Optional) Whether to spin down, include to spin down an instance")
self.parser.add_argument("-c", "--chain",
type=str,
default="",
help="(Optional) A chain to deploy too. Defaults to localterra")
def run_command(self, args):
"""Schema:
Check if we already have a local terra, if so return the path
If we don't git clone the localterra repo and then return the path
Pass path to docker-compose and attempt to interact
Return success.
"""
LOG.info("Starting local setup of localterra")
# Start by doing a git clone
# Check if the repo is already cloned in the default location
repo = self.get_localterra_repo(self)
LOG.info(
"Localterra repo actions finished, now attempting to use its compose-file")
# Spin up or down via compose
self.run_docker_compose(args, dirname=repo.working_tree_dir)
LOG.info("Finish docker interaction")
LOG.info("Command run finished")
def get_localterra_repo(self, CLONE_LOCATION=DEFAULT_CLONE_PATH, CLONE_REMOTE_URL="git@github.com:terra-money/LocalTerra.git"):
"""Attempts to get a localterra repository so that localterra can be interacted with.
Note: Any compose spun up away from the capsule tool may not be accessible by the tool.
You need to ensure you are passing the correct Docker Compose in order to use this tool to interact with your already deployed services
Args:
CLONE_LOCATION (str, optional): Where the repos will go, defaults to DEFAULT_CLONE_PATH which is "~/.capsule/localterra-clones".
CLONE_REMOTE_URL (str, optional): The git url to clone from. Defaults to "git@github.com:terra-money/LocalTerra.git".
Returns:
git.Repo: The repo to be worked with
"""
if os.path.exists(os.path.realpath(CLONE_LOCATION)):
LOG.info("localterra appears to be already cloned, skipped reclone")
return git.Repo(os.path.realpath(CLONE_LOCATION))
return git.Repo.clone_from(url=CLONE_REMOTE_URL, to_path=CLONE_LOCATION, depth=1)
def run_docker_compose(args, dirname, filename="docker-compose.yml"):
"""Take a directory and a filename for the compose and attempt to
interact with docker-compose
Args:
dirname ([type]): The directory of the project where the compose file is
filename (str, optional): The compose file name. Defaults to "docker-compose.yml".
Returns:
process: The process
"""
command_name = ["docker-compose", "-f",
os.path.join(dirname, filename), "up" if args.down else "down"]
LOG.info(f"[Compose Command] {''.join(command_name)}")
try:
LOG.info("Starting compose command \n\n\n")
with subprocess.Popen(command_name, stdin=PIPE, stdout=PIPE, stderr=STDOUT, universal_newlines=True) as process:
while process.poll() is None:
line = process.stdout.readline()
LOG.info(line.rstrip())
except KeyboardInterrupt:
# process.terminate()
sys.exit()
except Exception as ex:
print("Encountered an error : ", ex)
get_localterra_repo(self, CLONE_LOCATION='/Users/ryan/.capsule/localterra-clones', CLONE_REMOTE_URL='git@github.com:terra-money/LocalTerra.git')
Attempts to get a localterra repository so that localterra can be interacted with. Note: Any compose spun up away from the capsule tool may not be accessible by the tool. You need to ensure you are passing the correct Docker Compose in order to use this tool to interact with your already deployed services
Parameters:
Name | Type | Description | Default |
---|---|---|---|
CLONE_LOCATION |
str |
Where the repos will go, defaults to DEFAULT_CLONE_PATH which is "~/.capsule/localterra-clones". |
'/Users/ryan/.capsule/localterra-clones' |
CLONE_REMOTE_URL |
str |
The git url to clone from. Defaults to "git@github.com:terra-money/LocalTerra.git". |
'git@github.com:terra-money/LocalTerra.git' |
Returns:
Type | Description |
---|---|
git.Repo |
The repo to be worked with |
Source code in capsule/cmds/local.py
def get_localterra_repo(self, CLONE_LOCATION=DEFAULT_CLONE_PATH, CLONE_REMOTE_URL="git@github.com:terra-money/LocalTerra.git"):
"""Attempts to get a localterra repository so that localterra can be interacted with.
Note: Any compose spun up away from the capsule tool may not be accessible by the tool.
You need to ensure you are passing the correct Docker Compose in order to use this tool to interact with your already deployed services
Args:
CLONE_LOCATION (str, optional): Where the repos will go, defaults to DEFAULT_CLONE_PATH which is "~/.capsule/localterra-clones".
CLONE_REMOTE_URL (str, optional): The git url to clone from. Defaults to "git@github.com:terra-money/LocalTerra.git".
Returns:
git.Repo: The repo to be worked with
"""
if os.path.exists(os.path.realpath(CLONE_LOCATION)):
LOG.info("localterra appears to be already cloned, skipped reclone")
return git.Repo(os.path.realpath(CLONE_LOCATION))
return git.Repo.clone_from(url=CLONE_REMOTE_URL, to_path=CLONE_LOCATION, depth=1)
run_command(self, args)
Schema: Check if we already have a local terra, if so return the path
If we don't git clone the localterra repo and then return the path
Pass path to docker-compose and attempt to interact
Return success.
Source code in capsule/cmds/local.py
def run_command(self, args):
"""Schema:
Check if we already have a local terra, if so return the path
If we don't git clone the localterra repo and then return the path
Pass path to docker-compose and attempt to interact
Return success.
"""
LOG.info("Starting local setup of localterra")
# Start by doing a git clone
# Check if the repo is already cloned in the default location
repo = self.get_localterra_repo(self)
LOG.info(
"Localterra repo actions finished, now attempting to use its compose-file")
# Spin up or down via compose
self.run_docker_compose(args, dirname=repo.working_tree_dir)
LOG.info("Finish docker interaction")
LOG.info("Command run finished")
run_docker_compose(args, dirname, filename='docker-compose.yml')
Take a directory and a filename for the compose and attempt to interact with docker-compose
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dirname |
[type] |
The directory of the project where the compose file is |
required |
filename |
str |
The compose file name. Defaults to "docker-compose.yml". |
'docker-compose.yml' |
Returns:
Type | Description |
---|---|
process |
The process |
Source code in capsule/cmds/local.py
def run_docker_compose(args, dirname, filename="docker-compose.yml"):
"""Take a directory and a filename for the compose and attempt to
interact with docker-compose
Args:
dirname ([type]): The directory of the project where the compose file is
filename (str, optional): The compose file name. Defaults to "docker-compose.yml".
Returns:
process: The process
"""
command_name = ["docker-compose", "-f",
os.path.join(dirname, filename), "up" if args.down else "down"]
LOG.info(f"[Compose Command] {''.join(command_name)}")
try:
LOG.info("Starting compose command \n\n\n")
with subprocess.Popen(command_name, stdin=PIPE, stdout=PIPE, stderr=STDOUT, universal_newlines=True) as process:
while process.poll() is None:
line = process.stdout.readline()
LOG.info(line.rstrip())
except KeyboardInterrupt:
# process.terminate()
sys.exit()
except Exception as ex:
print("Encountered an error : ", ex)
new
NewCmd (ACmd)
Source code in capsule/cmds/new.py
class NewCmd(ACmd):
CMD_NAME = "new"
CMD_HELP = "Simple command to create a new cosmwasm smart contract. Unless specified, will use x"
CMD_USAGE = """
$ capsule new -p ./<path_to_my_contracts_root> -n my_new_contract
$ capsule new --path ./<path_to_my_contracts_root> -n my_new_contract
$ capsule new -p <path_to_my_contracts_root> -n my_new_contract -u <other cosmwasm cargo codegen template>"""
CMD_DESCRIPTION = "Simple command to create a new cosmwasm smart contract. Unless specified, will use x"
def initialise(self):
# Define usage and description
self.parser.usage = self.CMD_USAGE
self.parser.description = self.CMD_DESCRIPTION
# Add any positional or optional arguments here
self.parser.add_argument("-p", "--package",
type=str,
help="(required) Name of new or path to existing package")
# Add any positional or optional arguments here
self.parser.add_argument("-u", "--codeid",
type=str,
default="",
help="URL of your own template to use")
self.parser.add_argument("-n", "--name",
type=str,
default="",
help="Name of your new smart-contract package")
self.parser.add_argument("-b", "--branch",
type=str,
default="0.16",
help="Branch/Version of your new smart-contract package")
def run_command(self, args):
"""
"""
LOG.info("Starting cargo generate of new package")
output_dir = os.path.abspath(args.package)
contract_dir = os.path.join(output_dir, args.name)
if os.path.exists(contract_dir):
raise ValueError("Directory exists, use a diff name boss")
try:
process = subprocess.run(['cargo', 'generate', '--git', 'https://github.com/CosmWasm/cw-template.git', '--branch',
args.branch, '--name', args.name], cwd=output_dir, check=True, stdout=subprocess.PIPE, universal_newlines=True)
except subprocess.CalledProcessError as grepexc:
LOG.info("error code", grepexc.returncode, grepexc.output)
LOG.info(process.stdout)
LOG.info("[New Contract] Contract creation finished")
run_command(self, args)
Source code in capsule/cmds/new.py
def run_command(self, args):
"""
"""
LOG.info("Starting cargo generate of new package")
output_dir = os.path.abspath(args.package)
contract_dir = os.path.join(output_dir, args.name)
if os.path.exists(contract_dir):
raise ValueError("Directory exists, use a diff name boss")
try:
process = subprocess.run(['cargo', 'generate', '--git', 'https://github.com/CosmWasm/cw-template.git', '--branch',
args.branch, '--name', args.name], cwd=output_dir, check=True, stdout=subprocess.PIPE, universal_newlines=True)
except subprocess.CalledProcessError as grepexc:
LOG.info("error code", grepexc.returncode, grepexc.output)
LOG.info(process.stdout)
LOG.info("[New Contract] Contract creation finished")
query
Query command -- Used to perform queries on MultiChain contracts
QueryCmd (ACmd)
Query command -- Used to perform queries on MultiChain contracts
Source code in capsule/cmds/query.py
class QueryCmd(ACmd):
"""
Query command -- Used to perform queries on MultiChain contracts
"""
CMD_NAME = "query"
CMD_HELP = "Attempt to perform a query on a given contract address."
CMD_USAGE = """
$ capsule query --contract <addr> --chain=<> --query=<query>"""
CMD_DESCRIPTION = "Helper tool which exposes the ability to perform queries on chain specific contract addresses"
def initialise(self):
# Define usage and description
self.parser.usage = self.CMD_USAGE
self.parser.description = self.CMD_DESCRIPTION
# Add any positional or optional arguments here
self.parser.add_argument("-a", "--address",
type=str,
help="(required) Contract Address to perform query on")
# Add any positional or optional arguments here
self.parser.add_argument("-q", "--query",
type=str,
default={},
help="(Optional) The query message for the contract you are trying to query. Must be a json-like str")
self.parser.add_argument("-c", "--chain",
type=str,
default="",
help="(Optional) A chain to deploy too. Defaults to localterra")
def run_command(self, args):
"""
"""
LOG.info(f"Performing query on contract addr {args.address}")
chain_url="https://bombay-lcd.terra.dev"
chain_fcd_url="https://bombay-fcd.terra.dev"
deployer = Deployer(client=LCDClient(
url=chain_url,
chain_id=args.chain or "bombay-12",
gas_prices=Coins(requests.get(f"{chain_fcd_url}/v1/txs/gas_prices").json())))
query_result = asyncio.run(deployer.query_contract(args.address, json.loads(args.query)))
LOG.info(f"Query Result {query_result} \n\n Query Finished.")
run_command(self, args)
Source code in capsule/cmds/query.py
def run_command(self, args):
"""
"""
LOG.info(f"Performing query on contract addr {args.address}")
chain_url="https://bombay-lcd.terra.dev"
chain_fcd_url="https://bombay-fcd.terra.dev"
deployer = Deployer(client=LCDClient(
url=chain_url,
chain_id=args.chain or "bombay-12",
gas_prices=Coins(requests.get(f"{chain_fcd_url}/v1/txs/gas_prices").json())))
query_result = asyncio.run(deployer.query_contract(args.address, json.loads(args.query)))
LOG.info(f"Query Result {query_result} \n\n Query Finished.")
verify
VerifyCmd (ACmd)
Source code in capsule/cmds/verify.py
class VerifyCmd(ACmd):
CMD_NAME = "verify"
CMD_HELP = "Attempt to perform a smart contract verification against a given code id. Attempts to perform a deterministic comparison by compiling the provided contract into an optmized wasm and then comparing its checksum with the code_hash of the provided code ID code object. Note if you are running an M1 Mac or an ARM based machine you will not get perfect matches on verification as most production deployments are done from a Intel based machine."
CMD_USAGE = """
$ capsule verify -p ./<path_to_my_contracts_root> -c columbus-5 -i 3
$ capsule verify --path ./<path_to_my_contracts_root> --chain tequila-0004
$ capsule verify -p <path_to_my_contracts_root> -c bombay-12 -i 300"""
CMD_DESCRIPTION = "Helper tool which enables you to perform a Smart Contract Verification (SCV) by providing the path to a single smart-contract repo and providing a code id. The project path is passed either to `cargo run-script optimize` or to a custom docker invocation for ARM64 to create an optmized production wasm of the contract for comparison. The code id is used to query a stored code object's byte_code on the respective chain. Once the byte_code is gathered we get the SHA256 of this to compare to our locally prepared optimized wasm. If the SHA265 of on chain code object matches the SHA256 checksum on the optimized build we have verified the contact. Note: The outputted SHA256 and locally build optimized build will be different on ARM vs Intel. An ARM machine can only really be used to verify images which were built and uploaded from an ARM machine"
def initialise(self):
# Define usage and description
self.parser.usage = self.CMD_USAGE
self.parser.description = self.CMD_DESCRIPTION
# Add any positional or optional arguments here
self.parser.add_argument("-p", "--package",
type=str,
help="(required) Name of new or path to existing package")
# Add any positional or optional arguments here
self.parser.add_argument("-i", "--codeid",
type=int,
default=0,
help="The code_id to compare the provided contract against")
self.parser.add_argument("-c", "--chain",
type=str,
default="",
help="(Optional) A chain to deploy too. Defaults to localterra")
self.parser.add_argument("-n", "--nobuild",
action='store_true',
help="(Optional) Skip the building and go right to comparing the onchain code with your own. This assumes you already ran an optimized build and saves you rebuilding each time you run verify command")
def run_command(self, args):
"""Schema:
Read Mnemonic from env as well as host to deploy on
any specified chain/account
Prepare defaults for the above
Prepare an optimized build using the appropiate technique
If nobuild is provided; skip above and go right to the checksums.txt
Query the byte_code from the lcd client
Prepare a hash of the queried byte_code and compare
Return success.
"""
LOG.info("Starting verification")
# Setup the Deployer with its lcd, fcd urls as well as the desired chain.
# config = asyncio.run(get_config())
chain_url="https://bombay-lcd.terra.dev"
chain_fcd_url="https://bombay-fcd.terra.dev"
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# config = asyncio.run(get_config())
# # TODO: Review setting up a list of urls in project rather than just depending on settings in config
# chain_url = config.get("networks", {}).get(args.chain, DEFAULT_TESTNET_CHAIN).get("chain_url")
# chain_fcd_url = config.get("networks", {}).get(args.chain, DEFAULT_TESTNET_CHAIN).get("chain_fcd_url")
# TODO: Validate Init_msg is wellformed json
deployer = Deployer(client=LCDClient(
url=chain_url,
chain_id=args.chain or "bombay-12",
gas_prices=Coins(requests.get(f"{chain_fcd_url}/v1/txs/gas_prices").json())))
contract_dir = os.path.abspath(args.package)
# TODO: Refactor into a neater function
if not args.nobuild:
if platform.uname()[4] == "arm64":
try:
# process = subprocess.run(['docker', 'run', '--rm', '-v', '"$(pwd)":/code',
# '--mount', 'type=volume,source="$(basename "$(pwd)")_cache",target=/code/target',
# '--mount', 'type=volume,source=registry_cache,target=/usr/local/cargo/registry',
# 'cosmwasm/rust-optimizer-arm64:0.12.4'], cwd=contract_dir, check=True, stdout=subprocess.PIPE, universal_newlines=True)
process = subprocess.run('docker run --rm -v "$(pwd)":/code \
--mount type=volume,source="$(basename "$(pwd)")_cache",target=/code/target \
--mount type=volume,source=registry_cache,target=/usr/local/cargo/registry \
cosmwasm/rust-optimizer-arm64:0.12.4', cwd=contract_dir, shell=True, check=True, stdout=subprocess.PIPE, universal_newlines=True)
LOG.info(process.stdout)
except subprocess.CalledProcessError as grepexc:
LOG.info("error code", grepexc.returncode, grepexc.output)
else:
try:
process = subprocess.run(['cargo','run-script','optimize'], cwd=contract_dir, check=True, stdout=subprocess.PIPE, universal_newlines=True)
LOG.info(process.stdout)
except subprocess.CalledProcessError as grepexc:
LOG.info("error code", grepexc.returncode, grepexc.output)
# code_id_info = asyncio.run(deployer.query_code_id(args.codeid))
# LOG.info(base64.b64decode(code_id_info['result']['code_hash']))
code_byte_code = asyncio.run(deployer.query_code_bytecode(args.codeid))
on_chain = hashlib.sha256()
on_chain.update(base64.b64decode(code_byte_code['byte_code']))
# LOG.info(on_chain.digest())
# LOG.info(base64.b16encode(on_chain.digest()))
on_chain_code_sha = base64.b16encode(on_chain.digest())
# Get the hash from artifacts
# TODO: Make less POC and more MVP, what if there are multiple checksums
with open(contract_dir+"/artifacts/checksums.txt", "r") as file:
first_line = file.readline()
sha_to_compare = first_line.split(" ")[0].encode()
LOG.info(f"[SHA265 of code with id {args.codeid}] {on_chain_code_sha.lower()}")
LOG.info(f"[SHA265 of code with path {args.package}] {sha_to_compare}")
if sha_to_compare == on_chain_code_sha.lower():
LOG.info("[Verification]: Success. The provided contract wasm matches the provided code ID's byte_code hash")
else:
LOG.info("[Verification]: Failed. The provided contract wasm does not match the provided code ID's byte_code hash")
run_command(self, args)
Schema: Read Mnemonic from env as well as host to deploy on any specified chain/account
Prepare defaults for the above
Prepare an optimized build using the appropiate technique If nobuild is provided; skip above and go right to the checksums.txt
Query the byte_code from the lcd client
Prepare a hash of the queried byte_code and compare
Return success.
Source code in capsule/cmds/verify.py
def run_command(self, args):
"""Schema:
Read Mnemonic from env as well as host to deploy on
any specified chain/account
Prepare defaults for the above
Prepare an optimized build using the appropiate technique
If nobuild is provided; skip above and go right to the checksums.txt
Query the byte_code from the lcd client
Prepare a hash of the queried byte_code and compare
Return success.
"""
LOG.info("Starting verification")
# Setup the Deployer with its lcd, fcd urls as well as the desired chain.
# config = asyncio.run(get_config())
chain_url="https://bombay-lcd.terra.dev"
chain_fcd_url="https://bombay-fcd.terra.dev"
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# config = asyncio.run(get_config())
# # TODO: Review setting up a list of urls in project rather than just depending on settings in config
# chain_url = config.get("networks", {}).get(args.chain, DEFAULT_TESTNET_CHAIN).get("chain_url")
# chain_fcd_url = config.get("networks", {}).get(args.chain, DEFAULT_TESTNET_CHAIN).get("chain_fcd_url")
# TODO: Validate Init_msg is wellformed json
deployer = Deployer(client=LCDClient(
url=chain_url,
chain_id=args.chain or "bombay-12",
gas_prices=Coins(requests.get(f"{chain_fcd_url}/v1/txs/gas_prices").json())))
contract_dir = os.path.abspath(args.package)
# TODO: Refactor into a neater function
if not args.nobuild:
if platform.uname()[4] == "arm64":
try:
# process = subprocess.run(['docker', 'run', '--rm', '-v', '"$(pwd)":/code',
# '--mount', 'type=volume,source="$(basename "$(pwd)")_cache",target=/code/target',
# '--mount', 'type=volume,source=registry_cache,target=/usr/local/cargo/registry',
# 'cosmwasm/rust-optimizer-arm64:0.12.4'], cwd=contract_dir, check=True, stdout=subprocess.PIPE, universal_newlines=True)
process = subprocess.run('docker run --rm -v "$(pwd)":/code \
--mount type=volume,source="$(basename "$(pwd)")_cache",target=/code/target \
--mount type=volume,source=registry_cache,target=/usr/local/cargo/registry \
cosmwasm/rust-optimizer-arm64:0.12.4', cwd=contract_dir, shell=True, check=True, stdout=subprocess.PIPE, universal_newlines=True)
LOG.info(process.stdout)
except subprocess.CalledProcessError as grepexc:
LOG.info("error code", grepexc.returncode, grepexc.output)
else:
try:
process = subprocess.run(['cargo','run-script','optimize'], cwd=contract_dir, check=True, stdout=subprocess.PIPE, universal_newlines=True)
LOG.info(process.stdout)
except subprocess.CalledProcessError as grepexc:
LOG.info("error code", grepexc.returncode, grepexc.output)
# code_id_info = asyncio.run(deployer.query_code_id(args.codeid))
# LOG.info(base64.b64decode(code_id_info['result']['code_hash']))
code_byte_code = asyncio.run(deployer.query_code_bytecode(args.codeid))
on_chain = hashlib.sha256()
on_chain.update(base64.b64decode(code_byte_code['byte_code']))
# LOG.info(on_chain.digest())
# LOG.info(base64.b16encode(on_chain.digest()))
on_chain_code_sha = base64.b16encode(on_chain.digest())
# Get the hash from artifacts
# TODO: Make less POC and more MVP, what if there are multiple checksums
with open(contract_dir+"/artifacts/checksums.txt", "r") as file:
first_line = file.readline()
sha_to_compare = first_line.split(" ")[0].encode()
LOG.info(f"[SHA265 of code with id {args.codeid}] {on_chain_code_sha.lower()}")
LOG.info(f"[SHA265 of code with path {args.package}] {sha_to_compare}")
if sha_to_compare == on_chain_code_sha.lower():
LOG.info("[Verification]: Success. The provided contract wasm matches the provided code ID's byte_code hash")
else:
LOG.info("[Verification]: Failed. The provided contract wasm does not match the provided code ID's byte_code hash")
Lib and Helpers
ACmd
ACmd is an abstraction that helps to define a consistant minimal interface for each command in this CLI. When a class is a subclass of this you have the assurance: The class is-a command of some sort It has info for its Name, Description, Example Usage and Help It has a run_command function to execute
Source code in capsule/abstractions/ACmd.py
class ACmd(object):
"""ACmd is an abstraction that helps to
define a consistant minimal interface for
each command in this CLI.
When a class is a subclass of this you have the assurance:
The class is-a command of some sort
It has info for its Name, Description, Example Usage and Help
It has a run_command function to execute
"""
REQUIRED_FIELDS = ['cmd_name', 'cmd_description', 'cmd_help', 'cmd_usage']
def __init__(self, sub_parser):
# Raise an exception if a subclass does not set values for all of the required fields
if not all(getattr(self, field.upper()) is not None for field in self.REQUIRED_FIELDS):
raise Exception("Command did not implement all the required fields")
self.parser = sub_parser.add_parser(self.CMD_NAME,
help=self.CMD_HELP,
formatter_class=HelpFormatter,
parents=[])
self.initialise(self)
def initialise(self):
raise NotImplementedError()
def run_command(self, args):
raise NotImplementedError()
config_handler
Load configuration from .toml file.
get_config(config_path=None)
async
Simple function which takes a config_file and attempts to parse it as a toml config returning the parsed result as a dict
Source code in capsule/lib/config_handler.py
async def get_config(config_path=None):
"""Simple function which takes a config_file
and attempts to parse it as a toml config
returning the parsed result as a dict
"""
filename = get_config_file(filename=config_path)
# Read toml file
config = toml.load(filename)
LOG.debug(f"Found these networks available: {config['networks']}")
LOG.debug(f"Found this deployment info: {config['deploy_info']['mnemonic']}")
return config
get_config_file(filename=None)
Attempts to get the location of the capsule config. One of 4 things occurs here, either: - A config location is gathered from the environment - A specified filename is provided and a config file with this name is created - A specified filename is provided and if a config file with this name if found in the current dir then this is used - A default config file is created in the ~/.capsule directory
Parameters:
Name | Type | Description | Default |
---|---|---|---|
filename |
[str] |
A specified filename to create for config. Defaults to None. |
None |
Returns:
Type | Description |
---|---|
[str] |
[The config files path.] |
Source code in capsule/lib/config_handler.py
def get_config_file(filename=None):
"""Attempts to get the location of
the capsule config.
One of 4 things occurs here, either:
- A config location is gathered from the environment
- A specified filename is provided and a config file with this name is created
- A specified filename is provided and if a config file with this name if found in the current dir then this is used
- A default config file is created in the ~/.capsule directory
Args:
filename ([str], optional): A specified filename to create for config. Defaults to None.
Returns:
[str]: [The config files path.]
"""
# First check if there is a specified location in the env
env_config_file = os.environ.get(DEFAULT_CONFIG_FILE_ENV_VAR, None)
# If a config was specified in the env, this takes priority
if env_config_file: return env_config_file
# If a filename was provided use this as the config file name
if filename: return os.path.abspath(os.path.expandvars(os.path.expanduser(filename)))
# Otherwise use a default one located in the .capsule directory at the home dir
config_file = os.path.expanduser(os.path.join("~", ".capsule", DEFAULT_CONFIG_FILE_NAME))
capsule_dir = os.path.dirname(config_file)
# Check if the capsule directory has been created and if not, create it.
if not os.path.exists(capsule_dir):
os.makedirs(capsule_dir)
return config_file
credential_handler
get_mnemonic(strict=False)
async
Attempt to gather a mnemonic from one of the available sources First, if a mnemonic is defined in the env, use that. Next, check the config file for the secret If no mnemonic can be found, optionally raise an Exception
Parameters:
Name | Type | Description | Default |
---|---|---|---|
strict |
bool |
When set to true, if no mnemonic is found an exception is raised. Defaults to False. |
False |
Returns:
Type | Description |
---|---|
str |
The mnemonic found either in the env or in the config file |
Source code in capsule/lib/credential_handler.py
async def get_mnemonic(strict=False):
"""Attempt to gather a mnemonic from one of the available sources
First, if a mnemonic is defined in the env, use that.
Next, check the config file for the secret
If no mnemonic can be found, optionally raise an Exception
Args:
strict (bool, optional): When set to true, if no mnemonic is found an exception is raised. Defaults to False.
Returns:
str: The mnemonic found either in the env or in the config file
"""
if os.getenv("CAPSULE_MNEMONIC", False):
return os.environ["CAPSULE_MNEMONIC"]
config = await get_config()
if config.get("deploy_info", {}).get("mnemonic", False):
return config.get("deploy_info", {}).get("mnemonic", False)
if strict:
raise Exception("No Mnemonic was found either in the specified config file or in the environment. Strict mode is set to true")
return None
deployer
Deployer
Deployer is a simple facade object providing an interface towards general deployment actions such as sending messages, getting addresses, uploading code objects, instantiating them into contracts and also executing or querying those contracts
Source code in capsule/lib/deployer.py
class Deployer():
"""Deployer is a simple facade object
providing an interface towards general deployment
actions such as sending messages, getting addresses,
uploading code objects, instantiating them into contracts
and also executing or querying those contracts
"""
def __init__(self, client: LCDClient) -> None:
self.client = client
self.mnemonic = asyncio.run(get_mnemonic())
LOG.debug(self.mnemonic)
self.deployer = Wallet(lcd=self.client, key=MnemonicKey(self.mnemonic))
self.std_fee = StdFee(4000000, "600000uluna")
async def send_msg(self, msg):
"""send_msg attempts to create
and sign a transaction with the provided
msg and then broadcasts the tx
"""
tx = self.deployer.create_and_sign_tx(
msgs=[msg], fee=self.std_fee
)
# estimated = self.client.tx.estimate_fee(tx, fee_denoms=["uusd"], msgs=[msg])
# LOG.info(f'estimated fee: {estimated}')
return self.client.tx.broadcast(tx)
async def store_contract(self, contract_name:str, contract_path:str="") -> str:
"""store_contract attempts to
gather a given wasm artifact file
and upload it to the given chain as a StoredCodeObject
The storage operation is done using the send_msg helper
Args:
contract_name (str): The name of the contract to deploy
contract_path (str, optional): The path to a wasm contract artifact to deploy. Defaults to "".
Returns:
str: The contract storage result is parsed and just the code id of the stored code object is returned
"""
# If the full path was provided, use it else assume its located in artifacts
bytes = read_file_as_b64(contract_path if contract_path else f"artifacts/{contract_name}.wasm")
msg = MsgStoreCode(self.deployer.key.acc_address, bytes)
contract_storage_result = await self.send_msg(msg)
LOG.info(contract_storage_result)
return get_code_id(contract_storage_result)
async def instantiate_contract(self, code_id: str, init_msg:dict) -> str:
"""instantiate_contract attempts to
instantiate a code object with an init msg
into a live contract on the network.
Args:
code_id (str): The code_id of the stored wasm contract artifact
init_msg (dict): The init msg to send to setup the contract
Returns:
str: The contracts address
"""
msg = MsgInstantiateContract(
sender=self.deployer.key.acc_address,
admin=self.deployer.key.acc_address,
code_id=code_id,
init_msg=init_msg
)
instantiation_result = await self.send_msg(msg)
LOG.info(instantiation_result)
return get_contract_address(instantiation_result)
async def execute_contract(self, contract_addr: str, execute_msg, coins = []):
"""Execute a message to perform an action on a given contract, returning the result
Args:
contract_addr (str): The contract to execute a msg on
execute_msg (dict): The msg to execute on the contract
coins (list, optional): Coins which may be needed for the execution tx. Defaults to [].
Returns:
dict: execution results
"""
msg = MsgExecuteContract(
sender=self.deployer.key.acc_address,
contract=contract_addr,
execute_msg=execute_msg,
coins=coins
)
exe_result = await self.send_msg(msg)
LOG.debug(exe_result)
return exe_result
async def query_contract(self, contract_addr: str, query_msg: dict):
"""Perform a query on a given contract, returning the result
Args:
contract_addr (str): The contract to perform the query on
query_msg (dict): The query to perform
Returns:
dict: Query Result
"""
LOG.info(f"Query to be ran {query_msg}")
query_result = self.client.wasm.contract_query(contract_addr, query_msg)
LOG.info(query_result)
return query_result
async def query_code_id(self, code_id: int):
"""
"""
LOG.info(f"Query to be ran {code_id}")
# query_result = self.client.wasm.code_info(code_id)
# query_two = self.client.wasm._c._get(f"/wasm/codes/{code_id}")
query_raw = requests.get(f"https://bombay-lcd.terra.dev/wasm/codes/{code_id}").json()
return query_raw
async def query_code_bytecode(self, code_id: int):
"""
"""
LOG.info(f"Query to be ran {code_id}")
# query_result = self.client.wasm.code_info(code_id)
# query_two = self.client.wasm._c._get(f"/wasm/codes/{code_id}")
# TODO: This query is not cross-chain capable
query_raw = requests.get(f"https://bombay-lcd.terra.dev/terra/wasm/v1beta1/codes/36374/byte_code").json()
return query_raw
execute_contract(self, contract_addr, execute_msg, coins=[])
async
Execute a message to perform an action on a given contract, returning the result
Parameters:
Name | Type | Description | Default |
---|---|---|---|
contract_addr |
str |
The contract to execute a msg on |
required |
execute_msg |
dict |
The msg to execute on the contract |
required |
coins |
list |
Coins which may be needed for the execution tx. Defaults to []. |
[] |
Returns:
Type | Description |
---|---|
dict |
execution results |
Source code in capsule/lib/deployer.py
async def execute_contract(self, contract_addr: str, execute_msg, coins = []):
"""Execute a message to perform an action on a given contract, returning the result
Args:
contract_addr (str): The contract to execute a msg on
execute_msg (dict): The msg to execute on the contract
coins (list, optional): Coins which may be needed for the execution tx. Defaults to [].
Returns:
dict: execution results
"""
msg = MsgExecuteContract(
sender=self.deployer.key.acc_address,
contract=contract_addr,
execute_msg=execute_msg,
coins=coins
)
exe_result = await self.send_msg(msg)
LOG.debug(exe_result)
return exe_result
instantiate_contract(self, code_id, init_msg)
async
instantiate_contract attempts to instantiate a code object with an init msg into a live contract on the network.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
code_id |
str |
The code_id of the stored wasm contract artifact |
required |
init_msg |
dict |
The init msg to send to setup the contract |
required |
Returns:
Type | Description |
---|---|
str |
The contracts address |
Source code in capsule/lib/deployer.py
async def instantiate_contract(self, code_id: str, init_msg:dict) -> str:
"""instantiate_contract attempts to
instantiate a code object with an init msg
into a live contract on the network.
Args:
code_id (str): The code_id of the stored wasm contract artifact
init_msg (dict): The init msg to send to setup the contract
Returns:
str: The contracts address
"""
msg = MsgInstantiateContract(
sender=self.deployer.key.acc_address,
admin=self.deployer.key.acc_address,
code_id=code_id,
init_msg=init_msg
)
instantiation_result = await self.send_msg(msg)
LOG.info(instantiation_result)
return get_contract_address(instantiation_result)
query_code_bytecode(self, code_id)
async
Source code in capsule/lib/deployer.py
async def query_code_bytecode(self, code_id: int):
"""
"""
LOG.info(f"Query to be ran {code_id}")
# query_result = self.client.wasm.code_info(code_id)
# query_two = self.client.wasm._c._get(f"/wasm/codes/{code_id}")
# TODO: This query is not cross-chain capable
query_raw = requests.get(f"https://bombay-lcd.terra.dev/terra/wasm/v1beta1/codes/36374/byte_code").json()
return query_raw
query_code_id(self, code_id)
async
Source code in capsule/lib/deployer.py
async def query_code_id(self, code_id: int):
"""
"""
LOG.info(f"Query to be ran {code_id}")
# query_result = self.client.wasm.code_info(code_id)
# query_two = self.client.wasm._c._get(f"/wasm/codes/{code_id}")
query_raw = requests.get(f"https://bombay-lcd.terra.dev/wasm/codes/{code_id}").json()
return query_raw
query_contract(self, contract_addr, query_msg)
async
Perform a query on a given contract, returning the result
Parameters:
Name | Type | Description | Default |
---|---|---|---|
contract_addr |
str |
The contract to perform the query on |
required |
query_msg |
dict |
The query to perform |
required |
Returns:
Type | Description |
---|---|
dict |
Query Result |
Source code in capsule/lib/deployer.py
async def query_contract(self, contract_addr: str, query_msg: dict):
"""Perform a query on a given contract, returning the result
Args:
contract_addr (str): The contract to perform the query on
query_msg (dict): The query to perform
Returns:
dict: Query Result
"""
LOG.info(f"Query to be ran {query_msg}")
query_result = self.client.wasm.contract_query(contract_addr, query_msg)
LOG.info(query_result)
return query_result
send_msg(self, msg)
async
send_msg attempts to create and sign a transaction with the provided msg and then broadcasts the tx
Source code in capsule/lib/deployer.py
async def send_msg(self, msg):
"""send_msg attempts to create
and sign a transaction with the provided
msg and then broadcasts the tx
"""
tx = self.deployer.create_and_sign_tx(
msgs=[msg], fee=self.std_fee
)
# estimated = self.client.tx.estimate_fee(tx, fee_denoms=["uusd"], msgs=[msg])
# LOG.info(f'estimated fee: {estimated}')
return self.client.tx.broadcast(tx)
store_contract(self, contract_name, contract_path='')
async
store_contract attempts to gather a given wasm artifact file and upload it to the given chain as a StoredCodeObject The storage operation is done using the send_msg helper
Parameters:
Name | Type | Description | Default |
---|---|---|---|
contract_name |
str |
The name of the contract to deploy |
required |
contract_path |
str |
The path to a wasm contract artifact to deploy. Defaults to "". |
'' |
Returns:
Type | Description |
---|---|
str |
The contract storage result is parsed and just the code id of the stored code object is returned |
Source code in capsule/lib/deployer.py
async def store_contract(self, contract_name:str, contract_path:str="") -> str:
"""store_contract attempts to
gather a given wasm artifact file
and upload it to the given chain as a StoredCodeObject
The storage operation is done using the send_msg helper
Args:
contract_name (str): The name of the contract to deploy
contract_path (str, optional): The path to a wasm contract artifact to deploy. Defaults to "".
Returns:
str: The contract storage result is parsed and just the code id of the stored code object is returned
"""
# If the full path was provided, use it else assume its located in artifacts
bytes = read_file_as_b64(contract_path if contract_path else f"artifacts/{contract_name}.wasm")
msg = MsgStoreCode(self.deployer.key.acc_address, bytes)
contract_storage_result = await self.send_msg(msg)
LOG.info(contract_storage_result)
return get_code_id(contract_storage_result)
logging_handler
Once Place for logging stuff
setup_logger(logger_name='capsule', extra_kwargs={})
Simple wrapper to setup a logger and return it. Used by the LOG constant which is used through the project
Parameters:
Name | Type | Description | Default |
---|---|---|---|
logger_name |
str |
The name for the logger. Defaults to "capsule". |
'capsule' |
extra_kwargs |
dict |
Any extra options to use with the logger. Defaults to {}. |
{} |
Source code in capsule/lib/logging_handler.py
def setup_logger(logger_name="capsule", extra_kwargs={}):
"""Simple wrapper to setup a logger and return it.
Used by the LOG constant which is used through the project
Args:
logger_name (str, optional): The name for the logger. Defaults to "capsule".
extra_kwargs (dict, optional): Any extra options to use with the logger. Defaults to {}.
"""
logger = logging.getLogger(logger_name, **extra_kwargs)
logger.setLevel(logging.INFO)
# Setup a StreamHandler to give output to the logs
handler = logging.StreamHandler()
# Establish a log format for messages
handler.setFormatter(logging.Formatter('[capsule:%(module)s] %(message)s'))
# Add handler to logger
logger.addHandler(handler)
return logger