Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
eth-brownie / brownie / network / transaction.py
Size: Mime:
#!/usr/bin/python3

import functools
import re
import sys
import threading
import time
from collections import deque
from enum import IntEnum
from hashlib import sha1
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union
from warnings import warn

import black
import requests
from eth_abi import decode_abi
from hexbytes import HexBytes
from web3.exceptions import TransactionNotFound

from brownie._config import CONFIG
from brownie.convert import EthAddress, Wei
from brownie.exceptions import ContractNotFound, RPCRequestError
from brownie.project import build
from brownie.project import main as project_main
from brownie.project.compiler.solidity import SOLIDITY_ERROR_CODES
from brownie.project.sources import highlight_source
from brownie.test import coverage
from brownie.utils import color
from brownie.utils.output import build_tree

from . import state
from .event import EventDict, _decode_logs, _decode_trace
from .web3 import web3

_marker = deque("-/|\\-/|\\")


def trace_property(fn: Callable) -> Any:
    # attributes that are only available after querying the tranasaction trace

    @property  # type: ignore
    def wrapper(self: "TransactionReceipt") -> Any:
        if self.status < 0:
            return None
        if self._trace_exc is not None:
            raise self._trace_exc
        try:
            return fn(self)
        except RPCRequestError as exc:
            if web3.supports_traces:
                # if the node client supports traces, raise the actual error
                raise exc
            raise RPCRequestError(
                f"Accessing `TransactionReceipt.{fn.__name__}` on a {self.status.name.lower()} "
                "transaction requires the `debug_traceTransaction` RPC endpoint, but the node "
                "client does not support it or has not made it available."
            ) from None

    return wrapper


def trace_inspection(fn: Callable) -> Any:
    def wrapper(self: "TransactionReceipt", *args: Any, **kwargs: Any) -> Any:
        if self.contract_address:
            raise NotImplementedError(
                "Trace inspection methods are not available for deployment transactions."
            )
        if self.input == "0x" and self.gas_used == 21000:
            return None
        return fn(self, *args, **kwargs)

    functools.update_wrapper(wrapper, fn)
    return wrapper


class Status(IntEnum):
    Dropped = -2
    Pending = -1
    Reverted = 0
    Confirmed = 1


class TransactionReceipt:

    """Attributes and methods relating to a broadcasted transaction.

    * All ether values are given as integers denominated in wei.
    * Before the tx has confirmed, most attributes are set to None
    * Accessing methods / attributes that query debug_traceTransaction
      may be very slow if the transaction involved many steps

    Attributes:
        contract_name: Name of the contract called in the transaction
        fn_name: Name of the method called in the transaction
        txid: Transaction ID
        sender: Address of the sender
        receiver: Address of the receiver
        value: Amount transferred
        gas_price: Gas price
        gas_limit: Gas limit
        gas_used: Gas used
        input: Hexstring input data
        confirmations: The number of blocks since the transaction was confirmed
        nonce: Transaction nonce
        block_number: Block number this transaction was included in
        timestamp: Timestamp of the block this transaction was included in
        txindex: Index of the transaction within the mined block
        contract_address: Address of contract deployed by the transaction
        logs: Raw transaction logs
        status: Transaction status: -1 pending, 0 reverted, 1 successful

    Additional attributes:
    (only available if debug_traceTransaction is enabled in the RPC)

        events: Decoded transaction log events
        trace: Expanded stack trace from debug_traceTransaction
        return_value: Return value(s) from contract call
        revert_msg: Error string from reverted contract all
        modified_state: Boolean, did this contract write to storage?"""

    # these are defined as class attributes to expose them in console completion hints
    block_number = None
    contract_address: Optional[str] = None
    contract_name = None
    fn_name = None
    gas_used = None
    logs: Optional[List] = None
    nonce = None
    sender = None
    txid: str
    txindex = None
    type: int

    def __init__(
        self,
        txid: Union[str, bytes],
        sender: Any = None,
        silent: bool = True,
        required_confs: int = 1,
        is_blocking: bool = True,
        name: str = "",
        revert_data: Optional[Tuple] = None,
    ) -> None:
        """Instantiates a new TransactionReceipt object.

        Args:
            txid: hexstring transaction ID
            sender: sender as a hex string or Account object
            required_confs: the number of required confirmations before processing the receipt
            is_blocking: if True, creating the object is a blocking action until the required
                         confirmations are received
            silent: toggles console verbosity (default True)
            name: contract function being called
            revert_data: (revert string, program counter, revert type)
        """
        self._silent = silent

        if isinstance(txid, bytes):
            txid = HexBytes(txid).hex()

        # this event is set once the transaction is confirmed or dropped
        # it is used to waiting during blocking transaction actions
        self._confirmed = threading.Event()

        # internal attributes
        self._call_cost = 0
        self._trace_exc: Optional[Exception] = None
        self._trace_origin: Optional[str] = None
        self._raw_trace: Optional[List] = None
        self._trace: Optional[List] = None
        self._events: Optional[EventDict] = None
        self._return_value: Any = None
        self._revert_msg: Optional[str] = None
        self._dev_revert_msg: Optional[str] = None
        self._modified_state: Optional[bool] = None
        self._new_contracts: Optional[List] = None
        self._internal_transfers: Optional[List[Dict]] = None
        self._subcalls: Optional[List[Dict]] = None

        # attributes that can be set immediately
        self.sender = sender
        self.status = Status(-1)
        self.txid = str(txid)
        self.contract_name = None
        self.fn_name = name

        if name and "." in name:
            self.contract_name, self.fn_name = name.split(".", maxsplit=1)

        # avoid querying the trace to get the revert string if possible
        self._revert_msg, self._revert_pc, revert_type = revert_data or (None, None, None)
        if self._revert_msg is None and revert_type not in ("revert", "invalid_opcode"):
            self._revert_msg = revert_type
        if self._revert_pc is not None:
            self._dev_revert_msg = build._get_dev_revert(self._revert_pc) or None

        tx: Dict = web3.eth.get_transaction(HexBytes(self.txid))
        self._set_from_tx(tx)

        if not self._silent:
            output_str = ""
            if self.type == 2:
                max_gas = tx["maxFeePerGas"] / 10**9
                priority_gas = tx["maxPriorityFeePerGas"] / 10**9
                output_str = (
                    f"  Max fee: {color('bright blue')}{max_gas}{color} gwei"
                    f"   Priority fee: {color('bright blue')}{priority_gas}{color} gwei"
                )
            elif self.gas_price is not None:
                gas_price = self.gas_price / 10**9
                output_str = f"  Gas price: {color('bright blue')}{gas_price}{color} gwei"
            print(
                f"{output_str}   Gas limit: {color('bright blue')}{self.gas_limit}{color}"
                f"   Nonce: {color('bright blue')}{self.nonce}{color}"
            )

        # await confirmation of tx in a separate thread which is blocking if
        # required_confs > 0 or tx has already confirmed (`blockNumber` != None)
        confirm_thread = threading.Thread(
            target=self._await_confirmation, args=(tx["blockNumber"], required_confs), daemon=True
        )
        confirm_thread.start()
        if is_blocking and (required_confs > 0 or tx["blockNumber"]):
            confirm_thread.join()

    def __repr__(self) -> str:
        color_str = {-2: "dark white", -1: "bright yellow", 0: "bright red", 1: ""}[self.status]
        return f"<Transaction '{color(color_str)}{self.txid}{color}'>"

    def __hash__(self) -> int:
        return hash(self.txid)

    @trace_property
    def events(self) -> Optional[EventDict]:
        if self._events is None:
            if self.status:
                # relay contract map so we can decode ds-note logs
                addrs = {log.address for log in self.logs} if self.logs else set()
                contracts = {addr: state._find_contract(addr) for addr in addrs}
                self._events = _decode_logs(self.logs, contracts=contracts)  # type: ignore
            else:
                self._get_trace()
                # get events from the trace - handled lazily so that other
                # trace operations are not blocked in case of a decoding error
                initial_address = str(self.receiver or self.contract_address)
                self._events = _decode_trace(self._raw_trace, initial_address)  # type: ignore
        return self._events

    @trace_property
    def internal_transfers(self) -> Optional[List]:
        if not self.status:
            return []
        if self._internal_transfers is None:
            self._expand_trace()
        return self._internal_transfers

    @trace_property
    def modified_state(self) -> Optional[bool]:
        if not self.status:
            self._modified_state = False
        elif self._modified_state is None:
            self._get_trace()
        return self._modified_state

    @trace_property
    def new_contracts(self) -> Optional[List]:
        if not self.status:
            return []
        if self._new_contracts is None:
            self._expand_trace()
        return self._new_contracts

    @trace_property
    def return_value(self) -> Optional[str]:
        if not self.status:
            return None
        if self._return_value is None:
            self._get_trace()
        return self._return_value

    @trace_property
    def revert_msg(self) -> Optional[str]:
        if self.status:
            return None
        if self._revert_msg is None:
            self._get_trace()
        elif self.contract_address and self._revert_msg == "out of gas":
            self._get_trace()
        return self._revert_msg

    @trace_property
    def dev_revert_msg(self) -> Optional[str]:
        if self.status:
            return None
        if self._dev_revert_msg is None:
            self._get_trace()

        return self._dev_revert_msg or None

    @trace_property
    def subcalls(self) -> Optional[List]:
        if self._subcalls is None:
            self._expand_trace()
        subcalls = filter(lambda s: not _is_call_to_precompile(s), self._subcalls)  # type: ignore
        return list(subcalls)

    @trace_property
    def trace(self) -> Optional[List]:
        if self._trace is None:
            self._expand_trace()
        return self._trace

    @property
    def timestamp(self) -> Optional[int]:
        if self.status < 0:
            return None
        return web3.eth.get_block(self.block_number)["timestamp"]

    @property
    def confirmations(self) -> int:
        if not self.block_number:
            return 0
        return web3.eth.block_number - self.block_number + 1

    def replace(
        self,
        increment: Optional[float] = None,
        gas_price: Optional[Wei] = None,
        silent: Optional[bool] = None,
    ) -> "TransactionReceipt":
        """
        Rebroadcast this transaction with a higher gas price.

        Exactly one of `increment` and `gas_price` must be given.

        Arguments
        ---------
        increment : float, optional
            Multiplier applied to the gas price of this transaction in order
            to determine the new gas price. For EIP1559 transactions the multiplier
            is applied to the max_fee, the priority_fee is incremented by 1.1.
        gas_price : Wei, optional
            Absolute gas price to use in the replacement transaction. For EIP1559
            transactions this is the new max_fee, the priority_fee is incremented
            by 1.1.
        silent : bool, optional
            Toggle console verbosity (default is same setting as this transaction)

        Returns
        -------
        TransactionReceipt
            New transaction object
        """
        if increment is None and gas_price is None:
            raise ValueError("Must give one of `increment` or `gas_price`")
        if gas_price is not None and increment is not None:
            raise ValueError("Cannot set `increment` and `gas_price` together")
        if self.status > -1:
            raise ValueError("Transaction has already confirmed")

        if self.gas_price is not None:
            if increment is not None:
                gas_price = Wei(self.gas_price * increment)
            else:
                gas_price = Wei(gas_price)

        max_fee, priority_fee = None, None
        if self.max_fee is not None and self.priority_fee is not None:
            max_fee = gas_price
            priority_fee = Wei(self.priority_fee * 1.1)
            gas_price = None

        if silent is None:
            silent = self._silent

        sender = self.sender
        if isinstance(sender, EthAddress):
            # if the transaction wasn't broadcast during this brownie session,
            # check if the sender is unlocked - we might be able to replace anyway
            from brownie import accounts

            if sender in accounts:
                sender = accounts.at(sender)
            else:
                raise ValueError("Sender address not in `accounts`")

        return sender.transfer(  # type: ignore
            self.receiver,
            self.value,
            gas_limit=self.gas_limit,
            gas_price=gas_price,
            max_fee=max_fee,
            priority_fee=priority_fee,
            data=self.input,
            nonce=self.nonce,
            required_confs=0,
            silent=silent,
        )

    def wait(self, required_confs: int) -> None:
        if required_confs < 1:
            return
        if self.confirmations > required_confs:
            print(f"This transaction already has {self.confirmations} confirmations.")
            return

        while True:
            try:
                tx: Dict = web3.eth.get_transaction(self.txid)
                break
            except TransactionNotFound:
                if self.nonce is not None:
                    sender_nonce = web3.eth.get_transaction_count(str(self.sender))
                    if sender_nonce > self.nonce:
                        self.status = Status(-2)
                        self._confirmed.set()
                        return
                time.sleep(1)

        self._await_confirmation(tx["blockNumber"], required_confs)

    def _raise_if_reverted(self, exc: Any) -> None:
        if self.status or CONFIG.mode == "console":
            return
        if not web3.supports_traces:
            # if traces are not available, do not attempt to determine the revert reason
            raise exc or ValueError("Execution reverted")

        if self._dev_revert_msg is None:
            # no revert message and unable to check dev string - have to get trace
            self._expand_trace()
        if self.contract_address:
            source = ""
        elif CONFIG.argv["revert"]:
            source = self._traceback_string()
        else:
            source = self._error_string(1)
            contract = state._find_contract(self.receiver)
            if contract:
                marker = "//" if contract._build["language"] == "Solidity" else "#"
                line = self._traceback_string().split("\n")[-1]
                if marker + " dev: " in line:
                    self._dev_revert_msg = line[line.index(marker) + len(marker) : -5].strip()

        raise exc._with_attr(
            source=source, revert_msg=self._revert_msg, dev_revert_msg=self._dev_revert_msg
        )

    def _await_confirmation(self, block_number: int = None, required_confs: int = 1) -> None:
        # await first confirmation
        block_number = block_number or self.block_number
        nonce_time = 0.0
        sender_nonce = 0
        while True:
            # every 15 seconds, check if the nonce increased without a confirmation of
            # this specific transaction. if this happens, the tx has likely dropped
            # and we should stop waiting.
            if time.time() - nonce_time > 15:
                sender_nonce = web3.eth.get_transaction_count(str(self.sender))
                nonce_time = time.time()

            try:
                receipt = web3.eth.get_transaction_receipt(HexBytes(self.txid))
            except TransactionNotFound:
                receipt = None
            # the null blockHash check is required for older versions of Parity
            # taken from `web3._utils.transactions.wait_for_transaction_receipt`
            if receipt is not None and receipt["blockHash"] is not None:
                break

            # continuation of the nonce logic 2 sections prior. we must check the receipt
            # after querying the nonce, because in the other order there is a chance that
            # the tx would confirm after checking the receipt but before checking the nonce
            if sender_nonce > self.nonce:  # type: ignore
                self.status = Status(-2)
                self._confirmed.set()
                return

            if not block_number and not self._silent and required_confs > 0:
                if required_confs == 1:
                    sys.stdout.write(f"  Waiting for confirmation... {_marker[0]}\r")
                else:
                    sys.stdout.write(
                        f"  Required confirmations: {color('bright yellow')}0/"
                        f"{required_confs}{color}   {_marker[0]}\r"
                    )
                _marker.rotate(1)
                sys.stdout.flush()

            time.sleep(1)

        # silence other dropped tx's immediately after confirmation to avoid output weirdness
        for dropped_tx in state.TxHistory().filter(
            sender=self.sender, nonce=self.nonce, key=lambda k: k != self
        ):
            dropped_tx._silent = True

        self.block_number = receipt["blockNumber"]
        # wait for more confirmations if required and handle uncle blocks
        remaining_confs = required_confs
        while remaining_confs > 0 and required_confs > 1:
            try:
                receipt = web3.eth.get_transaction_receipt(self.txid)
                self.block_number = receipt["blockNumber"]
            except TransactionNotFound:
                if not self._silent:
                    sys.stdout.write(f"\r{color('red')}Transaction was lost...{color}{' ' * 8}")
                    sys.stdout.flush()
                # check if tx is still in mempool, this will raise otherwise
                tx = web3.eth.get_transaction(self.txid)
                self.block_number = None
                return self._await_confirmation(tx["blockNumber"], required_confs)
            if required_confs - self.confirmations != remaining_confs:
                remaining_confs = required_confs - self.confirmations
                if not self._silent:
                    sys.stdout.write(
                        f"\rRequired confirmations: {color('bright yellow')}{self.confirmations}/"
                        f"{required_confs}{color}  "
                    )
                    if remaining_confs == 0:
                        sys.stdout.write("\n")
                    sys.stdout.flush()
            if remaining_confs > 0:
                time.sleep(1)

        self._set_from_receipt(receipt)
        # if coverage evaluation is active, evaluate the trace
        if (
            CONFIG.argv["coverage"]
            and not coverage._check_cached(self.coverage_hash)
            and self.trace
        ):
            self._expand_trace()
        if not self._silent and required_confs > 0:
            print(self._confirm_output())

        # set the confirmation event and mark other tx's with the same nonce as dropped
        self._confirmed.set()
        for dropped_tx in state.TxHistory().filter(
            sender=self.sender, nonce=self.nonce, key=lambda k: k != self
        ):
            dropped_tx.status = Status(-2)
            dropped_tx._confirmed.set()

    def _set_from_tx(self, tx: Dict) -> None:
        if not self.sender:
            self.sender = EthAddress(tx["from"])
        self.receiver = EthAddress(tx["to"]) if tx.get("to", None) else None
        self.value = Wei(tx["value"])
        self.gas_price = tx.get("gasPrice")
        self.max_fee = tx.get("maxFeePerGas")
        self.priority_fee = tx.get("maxPriorityFeePerGas")
        self.gas_limit = tx["gas"]
        self.input = tx["input"]
        self.nonce = tx["nonce"]
        self.type = int(HexBytes(tx.get("type", 0)).hex(), 16)

        # if receiver is a known contract, set function name
        if self.fn_name:
            return
        try:
            contract = state._find_contract(tx.get("to"))
            if contract is not None:
                self.contract_name = contract._name
                self.fn_name = contract.get_method(tx["input"])
        except ContractNotFound:
            # required in case the contract has self destructed
            # other aspects of functionality will be broken, but this way we
            # can at least return a receipt
            pass

    def _set_from_receipt(self, receipt: Dict) -> None:
        """Sets object attributes based on the transaction reciept."""
        self.block_number = receipt["blockNumber"]
        self.txindex = receipt["transactionIndex"]
        self.gas_used = receipt["gasUsed"]
        self.logs = receipt["logs"]
        self.status = Status(receipt["status"])
        if "effectiveGasPrice" in receipt:
            self.gas_price = receipt["effectiveGasPrice"]

        self.contract_address = receipt["contractAddress"]
        if self.contract_address and not self.contract_name:
            self.contract_name = "UnknownContract"

        base = (
            f"{self.nonce}{self.block_number}{self.sender}{self.receiver}"
            f"{self.value}{self.input}{int(self.status)}{self.gas_used}{self.txindex}"
        )
        self.coverage_hash = sha1(base.encode()).hexdigest()

        if self.fn_name:
            state.TxHistory()._gas(self._full_name(), receipt["gasUsed"], self.status == Status(1))

    def _confirm_output(self) -> str:
        status = ""
        if not self.status:
            revert_msg = self.revert_msg if web3.supports_traces else None
            status = f"({color('bright red')}{revert_msg or 'reverted'}{color}) "
        result = (
            f"\r  {self._full_name()} confirmed {status}  "
            f"Block: {color('bright blue')}{self.block_number}{color}   "
            f"Gas used: {color('bright blue')}{self.gas_used}{color} "
            f"({color('bright blue')}{self.gas_used / self.gas_limit:.2%}{color})"
        )
        if self.type == 2 and self.gas_price is not None:
            result += f"   Gas price: {color('bright blue')}{self.gas_price / 10 ** 9}{color} gwei"
        if self.status and self.contract_address:
            result += (
                f"\n  {self.contract_name} deployed at: "
                f"{color('bright blue')}{self.contract_address}{color}"
            )
        return result + "\n"

    def _get_trace(self) -> None:
        """Retrieves the stack trace via debug_traceTransaction and finds the
        return value, revert message and event logs in the trace.
        """

        # check if trace has already been retrieved, or the tx warrants it
        if self._raw_trace is not None:
            return
        self._raw_trace = []
        if self.input == "0x" and self.gas_used == 21000:
            self._modified_state = False
            self._trace = []
            return

        if not web3.supports_traces:
            raise RPCRequestError("Node client does not support `debug_traceTransaction`")
        try:
            trace = web3.provider.make_request(  # type: ignore
                "debug_traceTransaction", (self.txid, {"disableStorage": CONFIG.mode != "console"})
            )
        except (requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e:
            msg = f"Encountered a {type(e).__name__} while requesting "
            msg += "`debug_traceTransaction`. The local RPC client has likely crashed."
            if CONFIG.argv["coverage"]:
                msg += " If the error persists, add the `skip_coverage` marker to this test."
            raise RPCRequestError(msg) from None

        if "error" in trace:
            self._modified_state = None
            self._trace_exc = RPCRequestError(trace["error"]["message"])
            raise self._trace_exc

        self._raw_trace = trace = trace["result"]["structLogs"]
        if not trace:
            self._modified_state = False
            return

        # different nodes return slightly different formats. its really fun to handle
        # geth/nethermind returns unprefixed and with 0-padding for stack and memory
        # erigon returns 0x-prefixed and without padding (but their memory values are like geth)
        fix_stack = False
        for step in trace:
            if not step["stack"]:
                continue
            check = step["stack"][0]
            if not isinstance(check, str):
                break
            if check.startswith("0x"):
                fix_stack = True
                break

        fix_gas = isinstance(trace[0]["gas"], str)

        if fix_stack or fix_gas:
            for step in trace:
                if fix_stack:
                    # for stack values, we need 32 bytes (64 chars) without the 0x prefix
                    step["stack"] = [HexBytes(s).hex()[2:].zfill(64) for s in step["stack"]]
                if fix_gas:
                    # handle traces where numeric values are returned as hex (Nethermind)
                    step["gas"] = int(step["gas"], 16)
                    step["gasCost"] = int.from_bytes(HexBytes(step["gasCost"]), "big", signed=True)
                    step["pc"] = int(step["pc"], 16)

        if self.status:
            self._confirmed_trace(trace)
        else:
            self._reverted_trace(trace)

    def _confirmed_trace(self, trace: Sequence) -> None:
        self._modified_state = next((True for i in trace if i["op"] == "SSTORE"), False)

        if trace[-1]["op"] != "RETURN" or self.contract_address:
            return
        contract = state._find_contract(self.receiver)
        if contract:
            data = _get_memory(trace[-1], -1)
            fn = contract.get_method_object(self.input)
            if not fn:
                warn(f"Unable to find function on {contract} for input {self.input}")
                return
            self._return_value = fn.decode_output(data)

    def _reverted_trace(self, trace: Sequence) -> None:
        self._modified_state = False
        if self.contract_address:
            step = next((i for i in trace if i["op"] == "CODECOPY"), None)
            if step is not None and int(step["stack"][-3], 16) > 24577:
                self._revert_msg = "exceeds EIP-170 size limit"
                self._dev_revert_msg = ""

        if self._dev_revert_msg is not None:
            return

        # iterate over revert instructions in reverse to find revert message
        for step in (i for i in trace[::-1] if i["op"] in ("REVERT", "INVALID")):
            if step["op"] == "REVERT" and int(step["stack"][-2], 16):
                # get returned error string from stack
                data = _get_memory(step, -1)

                selector = data[:4].hex()

                if selector == "0x4e487b71":  # keccak of Panic(uint256)
                    error_code = int(data[4:].hex(), 16)
                    if error_code in SOLIDITY_ERROR_CODES:
                        self._revert_msg = SOLIDITY_ERROR_CODES[error_code]
                    else:
                        self._revert_msg = f"Panic (error code: {error_code})"
                elif selector == "0x08c379a0":  # keccak of Error(string)
                    self._revert_msg = decode_abi(["string"], data[4:])[0]
                else:
                    # TODO: actually parse the data
                    self._revert_msg = f"typed error: {data.hex()}"

            elif self.contract_address:
                self._revert_msg = "invalid opcode" if step["op"] == "INVALID" else ""
                self._dev_revert_msg = ""
                return

            # check for dev revert string using program counter
            dev_revert = build._get_dev_revert(step["pc"]) or None
            if dev_revert is not None:
                self._dev_revert_msg = dev_revert
                if self._revert_msg is None:
                    self._revert_msg = dev_revert
            else:
                # if none is found, expand the trace and get it from the pcMap
                self._expand_trace()
                try:
                    contract = state._find_contract(step["address"])
                    pc_map = contract._build["pcMap"]
                    # if this is the function selector revert, check for a jump
                    if "first_revert" in pc_map[step["pc"]]:
                        idx = trace.index(step) - 4
                        if trace[idx]["pc"] != step["pc"] - 4:
                            step = trace[idx]

                    # if this is the optimizer revert, find the actual source
                    if "optimizer_revert" in pc_map[step["pc"]]:
                        idx = trace.index(step) - 1

                        # look for the most recent jump
                        while trace[idx + 1]["op"] != "JUMPDEST":
                            if trace[idx]["source"] != step["source"]:
                                # if we find another line with a differing source offset prior
                                # to a JUMPDEST, the optimizer revert is also the actual revert
                                idx = trace.index(step)
                                break
                            idx -= 1
                        while not trace[idx]["source"]:
                            # now we're in a yul optimization, keep stepping back
                            # until we find a source offset
                            idx -= 1
                        # at last we have the real location of the revert
                        step["source"] = trace[idx]["source"]
                        step = trace[idx]

                    if "dev" in pc_map[step["pc"]]:
                        self._dev_revert_msg = pc_map[step["pc"]]["dev"]
                    else:
                        # extract the dev revert string from the source code
                        # TODO this technique appears superior to `_get_dev_revert`, and
                        # changes in solc 0.8.0 have necessitated it. the old approach
                        # of building a dev revert map should be refactored out in favor
                        # of this one.
                        source = contract._sources.get(step["source"]["filename"])
                        offset = step["source"]["offset"][1]
                        line = source[offset:].split("\n")[0]
                        marker = "//" if contract._build["language"] == "Solidity" else "#"
                        revert_str = line[line.index(marker) + len(marker) :].strip()
                        if revert_str.startswith("dev:"):
                            self._dev_revert_msg = revert_str

                    if self._revert_msg is None:
                        self._revert_msg = self._dev_revert_msg or ""
                    return
                except (KeyError, AttributeError, TypeError, ValueError):
                    pass

            if self._revert_msg is not None:
                if self._dev_revert_msg is None:
                    self._dev_revert_msg = ""
                return

        op = next((i["op"] for i in trace[::-1] if i["op"] in ("REVERT", "INVALID")), None)
        self._revert_msg = "invalid opcode" if op == "INVALID" else ""

    def _expand_trace(self) -> None:
        """Adds the following attributes to each step of the stack trace:

        address: The address executing this contract.
        contractName: The name of the contract.
        fn: The name of the function.
        jumpDepth: Number of jumps made since entering this contract. The
                   initial value is 0.
        source: {
            filename: path to the source file for this step
            offset: Start and end offset associated source code
        }
        """
        if self._raw_trace is None:
            self._get_trace()
        if self._trace is not None:
            # in case `_get_trace` also expanded the trace, do not repeat
            return

        self._trace = trace = self._raw_trace
        self._new_contracts = []
        self._internal_transfers = []
        self._subcalls = []
        if self.contract_address or not trace:
            coverage._add_transaction(self.coverage_hash, {})
            return

        if trace[0]["depth"] == 1:
            self._trace_origin = "geth"
            self._call_cost = self.gas_used - trace[0]["gas"] + trace[-1]["gas"]
            for t in trace:
                t["depth"] = t["depth"] - 1
        else:
            self._trace_origin = "ganache"
            if trace[0]["gasCost"] >= 21000:
                # in ganache <6.10.0, gas costs are shifted by one step - we can
                # identify this when the first step has a gas cost >= 21000
                self._call_cost = trace[0]["gasCost"]
                for i in range(len(trace) - 1):
                    trace[i]["gasCost"] = trace[i + 1]["gasCost"]
                trace[-1]["gasCost"] = 0
            else:
                self._call_cost = self.gas_used - trace[0]["gas"] + trace[-1]["gas"]

        # last_map gives a quick reference of previous values at each depth
        last_map = {0: _get_last_map(self.receiver, self.input[:10])}  # type: ignore
        coverage_eval: Dict = {last_map[0]["name"]: {}}
        precompile_contract = re.compile(r"0x0{38}(?:0[1-9]|1[0-8])")
        call_opcodes = ("CALL", "STATICCALL", "DELEGATECALL")
        for i in range(len(trace)):
            # if depth has increased, tx has called into a different contract
            is_depth_increase = trace[i]["depth"] > trace[i - 1]["depth"]
            is_subcall = trace[i - 1]["op"] in call_opcodes
            if is_depth_increase or is_subcall:
                step = trace[i - 1]
                if step["op"] in ("CREATE", "CREATE2"):
                    # creating a new contract
                    out = next(x for x in trace[i:] if x["depth"] == step["depth"])
                    address = out["stack"][-1][-40:]
                    sig = f"<{step['op']}>"
                    calldata = None
                    self._new_contracts.append(EthAddress(address))
                    if int(step["stack"][-1], 16):
                        self._add_internal_xfer(step["address"], address, step["stack"][-1])
                else:
                    # calling an existing contract
                    stack_idx = -4 if step["op"] in ("CALL", "CALLCODE") else -3
                    offset = int(step["stack"][stack_idx], 16)
                    length = int(step["stack"][stack_idx - 1], 16)
                    calldata = HexBytes("".join(step["memory"]))[offset : offset + length]
                    sig = calldata[:4].hex()
                    address = step["stack"][-2][-40:]

                if is_depth_increase:
                    last_map[trace[i]["depth"]] = _get_last_map(address, sig)
                    coverage_eval.setdefault(last_map[trace[i]["depth"]]["name"], {})

                self._subcalls.append(
                    {"from": step["address"], "to": EthAddress(address), "op": step["op"]}
                )
                if step["op"] in ("CALL", "CALLCODE"):
                    self._subcalls[-1]["value"] = int(step["stack"][-3], 16)
                if is_depth_increase and calldata and last_map[trace[i]["depth"]].get("function"):
                    fn = last_map[trace[i]["depth"]]["function"]
                    self._subcalls[-1]["function"] = fn._input_sig
                    try:
                        zip_ = zip(fn.abi["inputs"], fn.decode_input(calldata))
                        inputs = {i[0]["name"]: i[1] for i in zip_}  # type: ignore
                        self._subcalls[-1]["inputs"] = inputs
                    except Exception:
                        self._subcalls[-1]["calldata"] = calldata.hex()
                elif calldata or is_subcall:
                    self._subcalls[-1]["calldata"] = calldata.hex()  # type: ignore

                if precompile_contract.search(str(self._subcalls[-1]["from"])) is not None:
                    caller = self._subcalls.pop(-2)["from"]
                    self._subcalls[-1]["from"] = caller

            # update trace from last_map
            last = last_map[trace[i]["depth"]]
            trace[i].update(
                address=last["address"],
                contractName=last["name"],
                fn=last["internal_calls"][-1],
                jumpDepth=last["jumpDepth"],
                source=False,
            )

            opcode = trace[i]["op"]
            if opcode == "CALL" and int(trace[i]["stack"][-3], 16):
                self._add_internal_xfer(
                    last["address"], trace[i]["stack"][-2][-40:], trace[i]["stack"][-3]
                )

            try:
                pc = last["pc_map"][trace[i]["pc"]]
            except (KeyError, TypeError):
                # we don't have enough information about this contract
                continue

            if trace[i]["depth"] and opcode in ("RETURN", "REVERT", "INVALID", "SELFDESTRUCT"):
                subcall: dict = next(
                    i for i in self._subcalls[::-1] if i["to"] == last["address"]  # type: ignore
                )

                if opcode == "RETURN":
                    returndata = _get_memory(trace[i], -1)
                    if returndata:
                        fn = last["function"]
                        try:
                            return_values = fn.decode_output(returndata)
                            if len(fn.abi["outputs"]) == 1:
                                return_values = (return_values,)
                            subcall["return_value"] = return_values
                        except Exception:
                            subcall["returndata"] = returndata.hex()
                    else:
                        subcall["return_value"] = None
                elif opcode == "SELFDESTRUCT":
                    subcall["selfdestruct"] = True
                else:
                    if opcode == "REVERT":
                        data = _get_memory(trace[i], -1)
                        if len(data) > 4:
                            try:
                                subcall["revert_msg"] = decode_abi(["string"], data[4:])[0]
                            except Exception:
                                subcall["revert_msg"] = data.hex()
                    if "revert_msg" not in subcall and "dev" in pc:
                        subcall["revert_msg"] = pc["dev"]

            if "path" not in pc:
                continue
            trace[i]["source"] = {"filename": last["path_map"][pc["path"]], "offset": pc["offset"]}

            if "fn" not in pc:
                continue

            # calculate coverage
            if last["coverage"]:
                if pc["path"] not in coverage_eval[last["name"]]:
                    coverage_eval[last["name"]][pc["path"]] = [set(), set(), set()]
                if "statement" in pc:
                    coverage_eval[last["name"]][pc["path"]][0].add(pc["statement"])
                if "branch" in pc:
                    if pc["op"] != "JUMPI":
                        last["active_branches"].add(pc["branch"])
                    elif "active_branches" not in last or pc["branch"] in last["active_branches"]:
                        # false, true
                        key = 1 if trace[i + 1]["pc"] == trace[i]["pc"] + 1 else 2
                        coverage_eval[last["name"]][pc["path"]][key].add(pc["branch"])
                        if "active_branches" in last:
                            last["active_branches"].remove(pc["branch"])

            # ignore jumps with no function - they are compiler optimizations
            if "jump" in pc:
                # jump 'i' is calling into an internal function
                if pc["jump"] == "i":
                    try:
                        fn = last["pc_map"][trace[i + 1]["pc"]]["fn"]
                    except (KeyError, IndexError):
                        continue
                    if fn != last["internal_calls"][-1]:
                        last["internal_calls"].append(fn)
                        last["jumpDepth"] += 1
                # jump 'o' is returning from an internal function
                elif last["jumpDepth"] > 0:
                    del last["internal_calls"][-1]
                    last["jumpDepth"] -= 1
        coverage._add_transaction(
            self.coverage_hash, dict((k, v) for k, v in coverage_eval.items() if v)
        )

    def _add_internal_xfer(self, from_: str, to: str, value: str) -> None:
        if not value.startswith("0x"):
            value = f"0x{value}"

        self._internal_transfers.append(  # type: ignore
            {"from": EthAddress(from_), "to": EthAddress(to), "value": Wei(value)}
        )

    def _full_name(self) -> str:
        if self.contract_name and self.fn_name:
            return f"{self.contract_name}.{self.fn_name}"
        return self.fn_name or "Transaction"

    def info(self) -> None:
        """Displays verbose information about the transaction, including decoded event logs."""
        result = f"Tx Hash: {self.txid}\nFrom: {self.sender}\n"
        if self.contract_address and self.status:
            result += f"New {self.contract_name} address: {self.contract_address}\n"
        else:
            result += f"To: {self.receiver}\n" f"Value: {self.value}\n"
            if self.input != "0x" and int(self.input, 16):
                result += f"Function: {self._full_name()}\n"

        result += (
            f"Block: {self.block_number}\nGas Used: "
            f"{self.gas_used} / {self.gas_limit} "
            f"({self.gas_used / self.gas_limit:.1%})\n"
        )

        if self.events:
            events = list(self.events)
            call_tree: List = ["--------------------------"]
            while events:
                idx = next(
                    (events.index(i) for i in events if i.address != events[0].address), len(events)
                )
                contract = state._find_contract(events[0].address)
                if contract:
                    try:
                        name = contract.name()
                    except Exception:
                        name = contract._name
                    sub_tree: List = [f"{name} ({events[0].address})"]
                else:
                    sub_tree = [f"{events[0].address}"]
                for event in events[:idx]:
                    sub_tree.append([event.name, *(f"{k}: {v}" for k, v in event.items())])
                call_tree.append(sub_tree)
                events = events[idx:]
            event_tree = build_tree([call_tree], multiline_pad=0, pad_depth=[0, 1])
            result = f"{result}\nEvents In This Transaction\n{event_tree}"

        result = color.highlight(result)
        status = ""
        if not self.status:
            status = f"({color('bright red')}{self.revert_msg or 'reverted'}{color})"
        print(f"Transaction was Mined {status}\n---------------------\n{result}")

    def _get_trace_gas(self, start: int, stop: int) -> Tuple[int, int]:
        total_gas = 0
        internal_gas = 0
        is_internal = True
        trace = self.trace

        for i in range(start, stop):
            # Check if we are in a subfunction or not
            if is_internal and not _step_compare(trace[i], trace[start]):
                is_internal = False
                # For the internal gas tracking we ignore the gas passed to an external call
                if trace[i]["depth"] > trace[start]["depth"]:
                    internal_gas -= trace[i - 1]["gasCost"]
            elif not is_internal and _step_compare(trace[i], trace[start]):
                is_internal = True

            total_gas += trace[i]["gasCost"]
            if is_internal:
                internal_gas += trace[i]["gasCost"]

            # manually add gas refunds where they occur
            if trace[i]["op"] == "SSTORE" and int(trace[i]["stack"][-2], 16) == 0:
                # 15000 gas is refunded if a word is set to 0x0
                # Note: There is currently no way to check if the value was 0x0 before.
                # This will give an incorrect refund if 0x0 is assigned to 0x0.
                total_gas -= 15000
                if is_internal:
                    internal_gas -= 15000
            if trace[i]["op"] == "SELFDESTRUCT":
                # 24000 gas is refunded on selfdestruct
                total_gas -= 24000
                if is_internal:
                    internal_gas -= 24000

        # For external calls, add the remaining gas returned back
        if start > 0 and trace[start]["depth"] > trace[start - 1]["depth"]:
            total_gas += trace[start - 1]["gasCost"]
            internal_gas += trace[start - 1]["gasCost"]

        return internal_gas, total_gas

    @trace_inspection
    def call_trace(self, expand: bool = False) -> None:
        """
        Display the complete sequence of contracts and methods called during
        the transaction. The format:

        Contract.functionName  [instruction]  start:stop  [gas used]

        * start:stop are index values for the `trace` member of this object,
          showing the points where the call begins and ends
        * for calls that include subcalls, gas use is displayed as
          [gas used in this frame / gas used in this frame + subcalls]
        * Calls displayed in red ended with a `REVERT` or `INVALID` instruction.

        Arguments
        ---------
        expand : bool
            If `True`, show an expanded call trace including inputs and return values
        """

        trace = self.trace
        key = _step_internal(
            trace[0], trace[-1], 0, len(trace), self._get_trace_gas(0, len(self.trace))
        )

        call_tree: List = [[key]]
        active_tree: List = [call_tree[0]]

        # (index, depth, jumpDepth) for relevent steps in the trace
        trace_index = [(0, 0, 0)] + [
            (i, trace[i]["depth"], trace[i]["jumpDepth"])
            for i in range(1, len(trace))
            if not _step_compare(trace[i], trace[i - 1])
        ]

        subcalls = self.subcalls[::-1]
        for i, (idx, depth, jump_depth) in enumerate(trace_index[1:], start=1):
            last = trace_index[i - 1]
            if depth == last[1] and jump_depth < last[2]:
                # returning from an internal function, reduce tree by one
                active_tree.pop()
                continue
            elif depth < last[1]:
                # returning from an external call, return tree by jumpDepth of the previous depth
                active_tree = active_tree[: -(last[2] + 1)]
                continue

            if depth > last[1]:
                # called to a new contract
                end = next((x[0] for x in trace_index[i + 1 :] if x[1] < depth), len(trace))
                total_gas, internal_gas = self._get_trace_gas(idx, end)
                key = _step_external(
                    trace[idx],
                    trace[end - 1],
                    idx,
                    end,
                    (total_gas, internal_gas),
                    subcalls.pop(),
                    expand,
                )
            elif depth == last[1] and jump_depth > last[2]:
                # jumped into an internal function
                end = next(
                    (
                        x[0]
                        for x in trace_index[i + 1 :]
                        if x[1] < depth or (x[1] == depth and x[2] < jump_depth)
                    ),
                    len(trace),
                )

                total_gas, internal_gas = self._get_trace_gas(idx, end)
                key = _step_internal(
                    trace[idx], trace[end - 1], idx, end, (total_gas, internal_gas)
                )

            active_tree[-1].append([key])
            active_tree.append(active_tree[-1][-1])

        print(
            f"Call trace for '{color('bright blue')}{self.txid}{color}':\n"
            f"Initial call cost  [{color('bright yellow')}{self._call_cost} gas{color}]"
        )
        print(build_tree(call_tree).rstrip())

    def traceback(self) -> None:
        print(self._traceback_string() or "")

    @trace_inspection
    def _traceback_string(self) -> str:
        """Returns an error traceback for the transaction."""
        if self.status == 1:
            return ""
        trace = self.trace

        try:
            idx = next(i for i in range(len(trace)) if trace[i]["op"] in ("REVERT", "INVALID"))
            trace_range = range(idx, -1, -1)
        except StopIteration:
            return ""

        try:
            result = [next(i for i in trace_range if trace[i]["source"])]
        except StopIteration:
            return ""
        depth, jump_depth = trace[idx]["depth"], trace[idx]["jumpDepth"]

        while True:
            try:
                idx = next(
                    i
                    for i in trace_range
                    if trace[i]["depth"] < depth
                    or (trace[i]["depth"] == depth and trace[i]["jumpDepth"] < jump_depth)
                )
                result.append(idx)
                depth, jump_depth = trace[idx]["depth"], trace[idx]["jumpDepth"]
            except StopIteration:
                break
        return f"{color}Traceback for '{color('bright blue')}{self.txid}{color}':\n" + "\n".join(
            self._source_string(i, 0) for i in result[::-1]
        )

    def error(self, pad: int = 3) -> None:
        print(self._error_string(pad) or "")

    @trace_inspection
    def _error_string(self, pad: int = 3) -> str:
        """Returns the source code that caused the transaction to revert.

        Args:
            pad: Number of unrelated lines of code to include before and after

        Returns: source code string
        """
        if self.status == 1:
            return ""

        # if RPC returned a program counter, try to find source without querying trace
        if self._revert_pc:
            highlight, linenos, path, fn_name = build._get_error_source_from_pc(self._revert_pc)
            if highlight:
                return _format_source(highlight, linenos, path, self._revert_pc, -1, fn_name)
            self._revert_pc = None

        # iterate backward through the trace until a step has a source offset
        trace = self.trace
        trace_range = range(len(trace) - 1, -1, -1)
        try:
            idx = next(i for i in trace_range if trace[i]["op"] in {"REVERT", "INVALID"})
            idx = next(i for i in trace_range if trace[i]["source"])
            return self._source_string(idx, pad)
        except StopIteration:
            return ""

    def source(self, idx: int, pad: int = 3) -> None:
        print(self._source_string(idx, pad) or "")

    @trace_inspection
    def _source_string(self, idx: int, pad: int) -> str:
        """Displays the associated source code for a given stack trace step.

        Args:
            idx: Stack trace step index
            pad: Number of unrelated lines of code to include before and after

        Returns: source code string
        """
        trace = self.trace[idx]
        if not trace.get("source", None):
            return ""
        contract = state._find_contract(self.trace[idx]["address"])
        source, linenos = highlight_source(
            contract._sources.get(trace["source"]["filename"]), trace["source"]["offset"], pad
        )
        if not source:
            return ""
        return _format_source(
            source,
            linenos,
            trace["source"]["filename"],
            trace["pc"],
            self.trace.index(trace),
            trace["fn"],
        )


def _format_source(source: str, linenos: Tuple, path: Path, pc: int, idx: int, fn_name: str) -> str:
    ln = f" {color('bright blue')}{linenos[0]}"
    if linenos[1] > linenos[0]:
        ln = f"s{ln}{color('dark white')}-{color('bright blue')}{linenos[1]}"
    return (
        f"{color('dark white')}Trace step {color('bright blue')}{idx}{color('dark white')}, "
        f"program counter {color('bright blue')}{pc}{color('dark white')}:\n  {color('dark white')}"
        f"File {color('bright magenta')}\"{path}\"{color('dark white')}, line{ln}"
        f"{color('dark white')}, in {color('bright cyan')}{fn_name}{color('dark white')}:{source}"
    )


def _step_compare(a: Dict, b: Dict) -> bool:
    return a["depth"] == b["depth"] and a["jumpDepth"] == b["jumpDepth"]


def _step_internal(
    step: Dict,
    last_step: Dict,
    start: Union[str, int],
    stop: Union[str, int],
    gas: Tuple[int, int],
    subcall: Dict = None,
) -> str:
    if last_step["op"] in {"REVERT", "INVALID"} and _step_compare(step, last_step):
        contract_color = color("bright red")
    else:
        contract_color = color("bright cyan") if not step["jumpDepth"] else color()
    key = f"{color('dark white')}{contract_color}{step['fn']}  {color('dark white')}"

    left_bracket = f"{color('dark white')}["
    right_bracket = f"{color('dark white')}]"

    if subcall:
        key = f"{key}[{color}{subcall['op']}{right_bracket}  "

    key = f"{key}{start}:{stop}{color}"

    if gas:
        if gas[0] == gas[1]:
            gas_str = f"{color('bright yellow')}{gas[0]} gas"
        else:
            gas_str = f"{color('bright yellow')}{gas[0]} / {gas[1]} gas"
        key = f"{key}  {left_bracket}{gas_str}{right_bracket}{color}"

    if last_step["op"] == "SELFDESTRUCT":
        key = f"{key}  {left_bracket}{color('bright red')}SELFDESTRUCT{right_bracket}{color}"

    return key


def _convert_0x_to_empty_bytes(value: Any) -> Any:
    # black cannot parse `0x` without any trailing zeros, so we temporarily
    # replace it with an empty bytestring
    final = []
    for item in value:
        if isinstance(item, (list, tuple)):
            final.append(_convert_0x_to_empty_bytes(item))
        elif str(item) == "0x":
            final.append(b"")
        else:
            final.append(item)
    return type(value)(final)


def _format(value: Any) -> str:
    if isinstance(value, (list, tuple)):
        value = _convert_0x_to_empty_bytes(value)
        mode = black.FileMode(line_length=60)
        value = black.format_str(str(value), mode=mode).replace('b""', "0x")
    return str(value)


def _step_external(
    step: Dict,
    last_step: Dict,
    start: Union[str, int],
    stop: Union[str, int],
    gas: Tuple[int, int],
    subcall: Dict,
    expand: bool,
) -> str:
    key = _step_internal(step, last_step, start, stop, gas, subcall)
    if not expand:
        return key

    result: List = [key, f"address: {step['address']}"]

    if "value" in subcall:
        result.append(f"value: {subcall['value']}")

    if "inputs" not in subcall:
        result.append(f"calldata: {subcall.get('calldata')}")
    elif subcall["inputs"]:
        result.append(
            ["input arguments:", *(f"{k}: {_format(v)}" for k, v in subcall["inputs"].items())]
        )
    else:
        result.append("input arguments: None")

    if "return_value" in subcall:
        value = subcall["return_value"]
        if isinstance(value, tuple) and len(value) > 1:
            result.append(["return values:", *(_format(i) for i in value)])
        else:
            if isinstance(value, tuple):
                value = value[0]
            result.append(f"return value: {_format(value)}")
    elif "returndata" in subcall:
        result.append(f"returndata: {subcall['returndata']}")

    if "revert_msg" in subcall:
        result.append(f"revert reason: {color('bright red')}{subcall['revert_msg']}{color}")

    return build_tree([result], multiline_pad=0).rstrip()


def _get_memory(step: Dict, idx: int) -> HexBytes:
    offset = int(step["stack"][idx], 16)
    length = int(step["stack"][idx - 1], 16)
    data = HexBytes("".join(step["memory"]))[offset : offset + length]
    # append zero-bytes if allocated memory ends before `length` bytes
    data = HexBytes(data + b"\x00" * (length - len(data)))
    return data


def _get_last_map(address: EthAddress, sig: str) -> Dict:
    contract = state._find_contract(address)
    last_map = {"address": EthAddress(address), "jumpDepth": 0, "name": None, "coverage": False}

    if contract:
        if contract.get_method(sig):
            full_fn_name = f"{contract._name}.{contract.get_method(sig)}"
        else:
            full_fn_name = contract._name
        last_map.update(
            contract=contract,
            function=contract.get_method_object(sig),
            name=contract._name,
            internal_calls=[full_fn_name],
            path_map=contract._build.get("allSourcePaths"),
            pc_map=contract._build.get("pcMap"),
        )
        if isinstance(contract._project, project_main.Project):
            # only evaluate coverage for contracts that are part of a `Project`
            last_map["coverage"] = True
            if contract._build.get("language") == "Solidity":
                last_map["active_branches"] = set()
    else:
        last_map.update(contract=None, internal_calls=[f"<UnknownContract>.{sig}"], pc_map=None)

    return last_map


def _is_call_to_precompile(subcall: dict) -> bool:
    precompile_contract = re.compile(r"0x0{38}(?:0[1-9]|1[0-8])")
    return True if precompile_contract.search(str(subcall["to"])) is not None else False