Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
eth-brownie / brownie / project / compiler / vyper.py
Size: Mime:
#!/usr/bin/python3

import logging
from collections import deque
from hashlib import sha1
from typing import Dict, List, Optional, Tuple, Union

import vvm
import vyper
from requests.exceptions import ConnectionError
from semantic_version import Version
from vyper.cli import vyper_json
from vyper.exceptions import VyperException

from brownie.exceptions import CompilerError, IncompatibleVyperVersion
from brownie.project import sources
from brownie.project.compiler.utils import expand_source_map
from brownie.project.sources import is_inside_offset

vvm_logger = logging.getLogger("vvm")
vvm_logger.setLevel(10)
sh = logging.StreamHandler()
sh.setLevel(10)
sh.setFormatter(logging.Formatter("%(message)s"))
vvm_logger.addHandler(sh)

AVAILABLE_VYPER_VERSIONS = None
_active_version = Version(vyper.__version__)


def get_version() -> Version:
    return _active_version


def set_vyper_version(version: Union[str, Version]) -> str:
    """Sets the vyper version. If not available it will be installed."""
    global _active_version
    if isinstance(version, str):
        version = Version(version)
    if version != Version(vyper.__version__):
        try:
            vvm.set_vyper_version(version, silent=True)
        except vvm.exceptions.VyperNotInstalled:
            install_vyper(version)
            vvm.set_vyper_version(version, silent=True)
    _active_version = version
    return str(_active_version)


def get_abi(contract_source: str, name: str) -> Dict:
    """
    Given a contract source and name, returns a dict of {name: abi}

    This function is deprecated in favor of `brownie.project.compiler.get_abi`
    """
    input_json = {
        "language": "Vyper",
        "sources": {name: {"content": contract_source}},
        "settings": {"outputSelection": {"*": {"*": ["abi"]}}},
    }
    if _active_version == Version(vyper.__version__):
        try:
            compiled = vyper_json.compile_json(input_json)
        except VyperException as exc:
            raise exc.with_traceback(None)
    else:
        try:
            compiled = vvm.compile_standard(input_json, vyper_version=_active_version)
        except vvm.exceptions.VyperError as exc:
            raise CompilerError(exc, "vyper")

    return {name: compiled["contracts"][name][name]["abi"]}


def _get_vyper_version_list() -> Tuple[List, List]:
    global AVAILABLE_VYPER_VERSIONS
    installed_versions = vvm.get_installed_vyper_versions()
    lib_version = Version(vyper.__version__)
    if lib_version not in installed_versions:
        installed_versions.append(lib_version)
    if AVAILABLE_VYPER_VERSIONS is None:
        try:
            AVAILABLE_VYPER_VERSIONS = vvm.get_installable_vyper_versions()
        except ConnectionError:
            if not installed_versions:
                raise ConnectionError("Vyper not installed and cannot connect to GitHub")
            AVAILABLE_VYPER_VERSIONS = installed_versions
    return AVAILABLE_VYPER_VERSIONS, installed_versions


def install_vyper(*versions: str) -> None:
    """Installs vyper versions."""
    for version in versions:
        vvm.install_vyper(version, show_progress=True)


def find_vyper_versions(
    contract_sources: Dict[str, str],
    install_needed: bool = False,
    install_latest: bool = False,
    silent: bool = True,
) -> Dict:

    """
    Analyzes contract pragmas and determines which vyper version(s) to use.

    Args:
        contract_sources: a dictionary in the form of {'path': "source code"}
        install_needed: if True, will install when no installed version matches
                        the contract pragma
        install_latest: if True, will install when a newer version is available
                        than the installed one
        silent: set to False to enable verbose reporting

    Returns: dictionary of {'version': ['path', 'path', ..]}
    """

    available_versions, installed_versions = _get_vyper_version_list()

    pragma_specs: Dict = {}
    to_install = set()
    new_versions = set()

    for path, source in contract_sources.items():
        pragma_specs[path] = sources.get_vyper_pragma_spec(source, path)
        version = pragma_specs[path].select(installed_versions)

        if not version and not (install_needed or install_latest):
            raise IncompatibleVyperVersion(
                f"No installed vyper version matching '{pragma_specs[path]}' in '{path}'"
            )

        # if no installed version of vyper matches the pragma, find the latest available version
        latest = pragma_specs[path].select(available_versions)

        if not version and not latest:
            raise IncompatibleVyperVersion(
                f"No installable vyper version matching '{pragma_specs[path]}' in '{path}'"
            )

        if not version or (install_latest and latest > version):
            to_install.add(latest)
        elif latest and latest > version:
            new_versions.add(str(version))

    # install new versions if needed
    if to_install:
        install_vyper(*to_install)
        _, installed_versions = _get_vyper_version_list()
    elif new_versions and not silent:
        print(
            f"New compatible vyper version{'s' if len(new_versions) > 1 else ''}"
            f" available: {', '.join(new_versions)}"
        )

    # organize source paths by latest available vyper version
    compiler_versions: Dict = {}
    for path, spec in pragma_specs.items():
        version = spec.select(installed_versions)
        compiler_versions.setdefault(str(version), []).append(path)

    return compiler_versions


def find_best_vyper_version(
    contract_sources: Dict[str, str],
    install_needed: bool = False,
    install_latest: bool = False,
    silent: bool = True,
) -> str:

    """
    Analyze contract pragma and find the best compatible version across multiple sources.

    Args:
        contract_sources: a dictionary in the form of {'path': "source code"}
        install_needed: if True, will install when no installed version matches
                        the contract pragma
        install_latest: if True, will install when a newer version is available
                        than the installed one
        silent: set to False to enable verbose reporting

    Returns: version string
    """

    available_versions, installed_versions = _get_vyper_version_list()

    for path, source in contract_sources.items():

        pragma_spec = sources.get_vyper_pragma_spec(source, path)
        installed_versions = [i for i in installed_versions if i in pragma_spec]
        available_versions = [i for i in available_versions if i in pragma_spec]

    if not available_versions:
        raise IncompatibleVyperVersion("No installable vyper version compatible across all sources")

    if not installed_versions and not (install_needed or install_latest):
        raise IncompatibleVyperVersion("No installed vyper version compatible across all sources")

    if max(available_versions) > max(installed_versions, default=Version("0.0.0")):
        if install_latest or (install_needed and not installed_versions):
            install_vyper(max(available_versions))
            return str(max(available_versions))
        if not silent:
            print(f"New compatible vyper version available: {max(available_versions)}")

    return str(max(installed_versions))


def compile_from_input_json(
    input_json: Dict, silent: bool = True, allow_paths: Optional[str] = None
) -> Dict:

    """
    Compiles contracts from a standard input json.

    Args:
        input_json: vyper input json
        silent: verbose reporting
        allow_paths: compiler allowed filesystem import path

    Returns: standard compiler output json
    """

    version = get_version()
    if not silent:
        print("Compiling contracts...")
        print(f"  Vyper version: {version}")
    if version < Version("0.1.0-beta.17"):
        outputs = input_json["settings"]["outputSelection"]["*"]["*"]
        outputs.remove("userdoc")
        outputs.remove("devdoc")
    if version == Version(vyper.__version__):
        try:
            return vyper_json.compile_json(input_json, root_path=allow_paths)
        except VyperException as exc:
            raise exc.with_traceback(None)
    else:
        try:
            return vvm.compile_standard(input_json, base_path=allow_paths, vyper_version=version)
        except vvm.exceptions.VyperError as exc:
            raise CompilerError(exc, "vyper")


def _get_unique_build_json(
    output_evm: Dict, path_str: str, contract_name: str, ast_json: Union[Dict, List], offset: Tuple
) -> Dict:

    ast: List
    if isinstance(ast_json, dict):
        ast = ast_json["body"]
    else:
        ast = ast_json

    pc_map, statement_map, branch_map = _generate_coverage_data(
        output_evm["deployedBytecode"]["sourceMap"],
        output_evm["deployedBytecode"]["opcodes"],
        contract_name,
        ast,
    )
    return {
        "allSourcePaths": {"0": path_str},
        "bytecode": output_evm["bytecode"]["object"],
        "bytecodeSha1": sha1(output_evm["bytecode"]["object"].encode()).hexdigest(),
        "coverageMap": {"statements": statement_map, "branches": branch_map},
        "dependencies": _get_dependencies(ast),
        "offset": offset,
        "pcMap": pc_map,
        "type": "contract",
    }


def _get_dependencies(ast_json: List) -> List:
    import_nodes = [i for i in ast_json if i["ast_type"] == "Import"]
    import_nodes += [
        i for i in ast_json if i["ast_type"] == "ImportFrom" if i["module"] != "vyper.interfaces"
    ]
    return sorted(set([i["name"].split(".")[-1] for i in import_nodes]))


def _is_revert_jump(pc_list: List, revert_pc: int) -> bool:
    return pc_list[-1]["op"] == "JUMPI" and int(pc_list[-2].get("value", "0"), 16) == revert_pc


def _generate_coverage_data(
    source_map_str: str, opcodes_str: str, contract_name: str, ast_json: List
) -> Tuple:
    if not opcodes_str:
        return {}, {}, {}

    source_map = deque(expand_source_map(source_map_str))
    opcodes = deque(opcodes_str.split(" "))

    fn_nodes = [i for i in ast_json if i["ast_type"] == "FunctionDef"]
    fn_offsets = dict((i["name"], _convert_src(i["src"])) for i in fn_nodes)
    stmt_nodes = set(_convert_src(i["src"]) for i in _get_statement_nodes(fn_nodes))

    statement_map: Dict = {}
    branch_map: Dict = {}

    pc_list: List = []
    count, pc = 0, 0

    revert_pc = -1
    if opcodes[-5] == "JUMPDEST" and opcodes[-1] == "REVERT":
        # starting in vyper 0.2.14, reverts without a reason string are optimized
        # with a jump to the end of the bytecode. if the bytecode ends with this
        # pattern, we set `revert_pc` as the program counter of the jumpdest so
        # we can identify these optimizer reverts within traces.
        revert_pc = len(opcodes) + sum(int(i[4:]) - 1 for i in opcodes if i.startswith("PUSH")) - 5

    while opcodes and source_map:

        # format of source is [start, stop, contract_id, jump code]
        source = source_map.popleft()
        pc_list.append({"op": opcodes.popleft(), "pc": pc})

        if source[3] != "-":
            pc_list[-1]["jump"] = source[3]

        pc += 1
        if opcodes and opcodes[0][:2] == "0x":
            pc_list[-1]["value"] = opcodes.popleft()
            pc += int(pc_list[-1]["op"][4:])

        # set source offset (-1 means none)
        if source[0] == -1:
            if (
                len(pc_list) > 6
                and pc_list[-7]["op"] == "CALLVALUE"
                and pc_list[-1]["op"] == "REVERT"
            ) or (
                len(pc_list) > 2
                and pc_list[-3]["op"] == "CALLVALUE"
                and _is_revert_jump(pc_list[-2:], revert_pc)
            ):
                # special case - initial nonpayable check on vyper >=0.2.5
                pc_list[-1]["dev"] = "Cannot send ether to nonpayable function"
                # hackiness to prevent the source highlight from showing the entire contract
                if pc_list[-1]["op"] == "REVERT":
                    # for REVERT, apply to the previous opcode
                    pc_list[-2].update(path="0", offset=[0, 0])
                else:
                    # for JUMPI we need the mapping on the actual opcode
                    pc_list[-1].update(path="0", offset=[0, 0])
            continue
        offset = (source[0], source[0] + source[1])
        pc_list[-1]["path"] = "0"
        pc_list[-1]["offset"] = offset

        try:
            if "offset" in pc_list[-2] and offset == pc_list[-2]["offset"]:
                pc_list[-1]["fn"] = pc_list[-2]["fn"]
            else:
                # statement coverage
                fn = next(k for k, v in fn_offsets.items() if is_inside_offset(offset, v))
                pc_list[-1]["fn"] = f"{contract_name}.{fn}"
                stmt_offset = next(i for i in stmt_nodes if is_inside_offset(offset, i))
                stmt_nodes.remove(stmt_offset)
                statement_map.setdefault(pc_list[-1]["fn"], {})[count] = stmt_offset
                pc_list[-1]["statement"] = count
                count += 1
        except (KeyError, IndexError, StopIteration):
            pass

        if pc_list[-1]["op"] not in ("JUMPI", "REVERT"):
            continue

        node = _find_node_by_offset(ast_json, offset)
        if node is None:
            continue

        if pc_list[-1]["op"] == "REVERT" or _is_revert_jump(pc_list[-2:], revert_pc):
            # custom revert error strings
            if node["ast_type"] == "FunctionDef":
                if (pc_list[-1]["op"] == "REVERT" and pc_list[-7]["op"] == "CALLVALUE") or (
                    pc_list[-1]["op"] == "JUMPI" and pc_list[-3]["op"] == "CALLVALUE"
                ):
                    pc_list[-1]["dev"] = "Cannot send ether to nonpayable function"
            elif node["ast_type"] == "Subscript":
                pc_list[-1]["dev"] = "Index out of range"
            elif node["ast_type"] in ("AugAssign", "BinOp"):
                if node["op"]["ast_type"] == "Sub":
                    pc_list[-1]["dev"] = "Integer underflow"
                elif node["op"]["ast_type"] == "Div":
                    pc_list[-1]["dev"] = "Division by zero"
                elif node["op"]["ast_type"] == "Mod":
                    pc_list[-1]["dev"] = "Modulo by zero"
                else:
                    pc_list[-1]["dev"] = "Integer overflow"
            continue

        if node["ast_type"] in ("Assert", "If") or (
            node["ast_type"] == "Expr"
            and node["value"]["func"].get("id", None) == "assert_modifiable"
        ):
            # branch coverage
            pc_list[-1]["branch"] = count
            branch_map.setdefault(pc_list[-1]["fn"], {})
            if node["ast_type"] == "If":
                branch_map[pc_list[-1]["fn"]][count] = _convert_src(node["test"]["src"]) + (False,)
            else:
                branch_map[pc_list[-1]["fn"]][count] = offset + (True,)
            count += 1

    pc_list[0]["path"] = "0"
    pc_list[0]["offset"] = [0, _convert_src(ast_json[-1]["src"])[1]]
    if revert_pc != -1:
        pc_list[-1]["optimizer_revert"] = True

    pc_map = dict((i.pop("pc"), i) for i in pc_list)

    return pc_map, {"0": statement_map}, {"0": branch_map}


def _convert_src(src: str) -> Tuple[int, int]:
    if src is None:
        return -1, -1
    src_int = [int(i) for i in src.split(":")[:2]]
    return src_int[0], src_int[0] + src_int[1]


def _find_node_by_offset(ast_json: List, offset: Tuple) -> Optional[Dict]:
    for node in [i for i in ast_json if is_inside_offset(offset, _convert_src(i["src"]))]:
        if _convert_src(node["src"]) == offset:
            return node
        node_list = [i for i in node.values() if isinstance(i, dict) and "ast_type" in i]
        node_list.extend([x for i in node.values() if isinstance(i, list) for x in i])
        if node_list:
            result = _find_node_by_offset(node_list, offset)
        else:
            result = _find_node_by_offset(ast_json[ast_json.index(node) + 1 :], offset)
        if result is not None:
            return result
    return None


def _get_statement_nodes(ast_json: List) -> List:
    stmt_nodes: List = []
    for node in ast_json:
        children = [x for v in node.values() if isinstance(v, list) for x in v]
        if children:
            stmt_nodes += _get_statement_nodes(children)
        else:
            stmt_nodes.append(node)
    return stmt_nodes