Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
molecule / util.py
Size: Mime:
#  Copyright (c) 2015-2018 Cisco Systems, Inc.
#
#  Permission is hereby granted, free of charge, to any person obtaining a copy
#  of this software and associated documentation files (the "Software"), to
#  deal in the Software without restriction, including without limitation the
#  rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
#  sell copies of the Software, and to permit persons to whom the Software is
#  furnished to do so, subject to the following conditions:
#
#  The above copyright notice and this permission notice shall be included in
#  all copies or substantial portions of the Software.
#
#  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
#  IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
#  FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
#  AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
#  LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
#  FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
"""Molecule Utils Module."""

from __future__ import print_function

import contextlib
import copy
import fnmatch
import logging
import os
import re
import sys
from dataclasses import dataclass
from functools import lru_cache  # noqa
from subprocess import CalledProcessError, CompletedProcess
from typing import Any, Dict, List, MutableMapping, NoReturn, Optional, Union

import jinja2
import yaml
from rich.syntax import Syntax
from subprocess_tee import run

from molecule.console import console
from molecule.constants import MOLECULE_HEADER

LOG = logging.getLogger(__name__)


class SafeDumper(yaml.SafeDumper):
    """SafeDumper YAML Class."""

    def increase_indent(self, flow=False, indentless=False):
        return super(SafeDumper, self).increase_indent(flow, False)


def print_debug(title: str, data: str) -> None:
    """Print debug information."""
    console.print(f"DEBUG: {title}:\n{data}")


def print_environment_vars(env: Optional[Dict[str, str]]) -> None:
    """
    Print ``Ansible`` and ``Molecule`` environment variables and returns None.

    :param env: A dict containing the shell's environment as collected by
    ``os.environ``.
    :return: None
    """
    if env:
        ansible_env = {k: v for (k, v) in env.items() if "ANSIBLE_" in k}
        print_debug("ANSIBLE ENVIRONMENT", safe_dump(ansible_env, explicit_start=False))

        molecule_env = {k: v for (k, v) in env.items() if "MOLECULE_" in k}
        print_debug(
            "MOLECULE ENVIRONMENT", safe_dump(molecule_env, explicit_start=False)
        )

        combined_env = ansible_env.copy()
        combined_env.update(molecule_env)
        print_debug(
            "SHELL REPLAY",
            " ".join(["{}={}".format(k, v) for (k, v) in sorted(combined_env.items())]),
        )
        print()


def do_report() -> None:
    """Dump html report atexit."""
    report_file = os.environ["MOLECULE_REPORT"]
    LOG.info("Writing %s report.", report_file)
    with open(report_file, "w") as f:
        f.write(console.export_html())
        f.close()


def sysexit(code: int = 1) -> NoReturn:
    """Perform a system exit with given code, default 1."""
    sys.exit(code)


def sysexit_with_message(
    msg: str, code: int = 1, detail: Optional[MutableMapping] = None
) -> None:
    """Exit with an error message."""
    # detail is usually a multi-line string which is not suitable for normal
    # logger.
    if detail:
        if isinstance(detail, dict):
            detail_str = safe_dump(detail)
        else:
            detail_str = str(detail)
        print(detail_str)
    LOG.critical(msg)
    sysexit(code)


def run_command(
    cmd, env=None, debug=False, echo=False, quiet=False, check=False, cwd=None
) -> CompletedProcess:
    """
    Execute the given command and returns None.

    :param cmd: :
        - a string or list of strings (similar to subprocess.run)
        - a BakedCommand object (
    :param debug: An optional bool to toggle debug output.
    """
    args = []
    stdout = None
    stderr = None
    if cmd.__class__.__name__ == "Command":
        raise RuntimeError(
            "Molecule 3.2.0 dropped use of sh library, update plugin code to use new API. "
            "See https://github.com/ansible-community/molecule/issues/2678"
        )
    elif cmd.__class__.__name__ == "BakedCommand":
        env = cmd.env if not env else cmd.env.copy().update(env)
        args = cmd.cmd
        stdout = cmd.stdout
        stderr = cmd.stderr
    else:
        args = cmd

    if debug:
        print_environment_vars(env)

    result = run(
        args,
        env=env,
        stdout=stdout,
        stderr=stderr,
        echo=echo or debug,
        quiet=quiet,
        cwd=cwd,
    )
    if result.returncode != 0 and check:
        raise CalledProcessError(
            returncode=result.returncode,
            cmd=result.args,
            output=result.stdout,
            stderr=result.stderr,
        )
    return result


def os_walk(directory, pattern, excludes=[], followlinks=False):
    """Navigate recursively and retried files based on pattern."""
    for root, dirs, files in os.walk(directory, topdown=True, followlinks=followlinks):
        dirs[:] = [d for d in dirs if d not in excludes]
        for basename in files:
            if fnmatch.fnmatch(basename, pattern):
                filename = os.path.join(root, basename)

                yield filename


def render_template(template, **kwargs):
    """Render a jinaj2 template."""
    t = jinja2.Environment()
    t = t.from_string(template)

    return t.render(kwargs)


def write_file(filename: str, content: str):
    """
    Write a file with the given filename and content and returns None.

    :param filename: A string containing the target filename.
    :param content: A string containing the data to be written.
    :return: None
    """
    with open_file(filename, "w") as f:
        f.write(content)

    file_prepender(filename)


def molecule_prepender(content: str):
    """Return molecule identification header."""
    return MOLECULE_HEADER + "\n\n" + content


def file_prepender(filename: str):
    """
    Prepend an informational header on files managed by Molecule and returns \
    None.

    :param filename: A string containing the target filename.
    :return: None
    """
    with open_file(filename, "r+") as f:
        content = f.read()
        f.seek(0, 0)
        f.write(molecule_prepender(content))


def safe_dump(data: Any, explicit_start=True) -> str:
    """
    Dump the provided data to a YAML document and returns a string.

    :param data: A string containing an absolute path to the file to parse.
    :return: str
    """
    return yaml.dump(
        data, Dumper=SafeDumper, default_flow_style=False, explicit_start=explicit_start
    )


def safe_load(string) -> Dict:
    """
    Parse the provided string returns a dict.

    :param string: A string to be parsed.
    :return: dict
    """
    try:
        return yaml.safe_load(string) or {}
    except yaml.scanner.ScannerError as e:
        sysexit_with_message(str(e))
    return {}


def safe_load_file(filename: str):
    """
    Parse the provided YAML file and returns a dict.

    :param filename: A string containing an absolute path to the file to parse.
    :return: dict
    """
    with open_file(filename) as stream:
        return safe_load(stream)


@contextlib.contextmanager
def open_file(filename, mode="r"):
    """
    Open the provide file safely and returns a file type.

    :param filename: A string containing an absolute path to the file to open.
    :param mode: A string describing the way in which the file will be used.
    :return: file type
    """
    with open(filename, mode) as stream:
        yield stream


def instance_with_scenario_name(instance_name, scenario_name):
    """Format instance name that includes scenario."""
    return "{}-{}".format(instance_name, scenario_name)


def verbose_flag(options):
    """Return computed verbosity flag."""
    verbose = "v"
    verbose_flag = []
    for i in range(0, 3):
        if options.get(verbose):
            verbose_flag = ["-{}".format(verbose)]
            del options[verbose]
            if options.get("verbose"):
                del options["verbose"]
            break
        verbose = verbose + "v"

    return verbose_flag


def filter_verbose_permutation(options):
    """Clean verbose information."""
    return {k: options[k] for k in options if not re.match("^[v]+$", k)}


def abs_path(path):
    """Return absolute path."""
    if path:
        return os.path.abspath(path)


def merge_dicts(a: MutableMapping, b: MutableMapping) -> MutableMapping:
    """
    Merge the values of b into a and returns a new dict.

    This function uses the same algorithm as Ansible's `combine(recursive=True)` filter.

    :param a: the target dictionary
    :param b: the dictionary to import
    :return: dict
    """
    result = copy.deepcopy(a)

    for k, v in b.items():
        if k in a and isinstance(a[k], dict) and isinstance(v, dict):
            result[k] = merge_dicts(a[k], v)
        else:
            result[k] = v

    return result


def validate_parallel_cmd_args(cmd_args):
    """Prevents use of options incompatible with parallel mode."""
    if cmd_args.get("parallel") and cmd_args.get("destroy") == "never":
        msg = 'Combining "--parallel" and "--destroy=never" is not supported'
        sysexit_with_message(msg)


def _parallelize_platforms(config, run_uuid):
    def parallelize(platform):
        platform["name"] = "{}-{}".format(platform["name"], run_uuid)
        return platform

    return [parallelize(platform) for platform in config["platforms"]]


@lru_cache()
def find_vcs_root(location="", dirs=(".git", ".hg", ".svn"), default=None) -> str:
    """Return current repository root directory."""
    if not location:
        location = os.getcwd()
    prev, location = None, os.path.abspath(location)
    while prev != location:
        if any(os.path.isdir(os.path.join(location, d)) for d in dirs):
            return location
        prev, location = location, os.path.abspath(os.path.join(location, os.pardir))
    return default


def lookup_config_file(filename: str) -> Optional[str]:
    """Return config file PATH."""
    for path in [find_vcs_root(default="~"), "~"]:
        f = os.path.expanduser("%s/%s" % (path, filename))
        if os.path.isfile(f):
            LOG.info("Found config file %s", f)
            return f
    return None


def boolean(value: Any, strict=True) -> bool:
    """Evaluate any object as boolean matching ansible behavior."""
    # Based on https://github.com/ansible/ansible/blob/devel/lib/ansible/module_utils/parsing/convert_bool.py

    BOOLEANS_TRUE = frozenset(("y", "yes", "on", "1", "true", "t", 1, 1.0, True))
    BOOLEANS_FALSE = frozenset(("n", "no", "off", "0", "false", "f", 0, 0.0, False))
    BOOLEANS = BOOLEANS_TRUE.union(BOOLEANS_FALSE)

    if isinstance(value, bool):
        return value

    normalized_value = value
    if isinstance(value, (str, bytes)):
        normalized_value = str(value).lower().strip()

    if normalized_value in BOOLEANS_TRUE:
        return True
    elif normalized_value in BOOLEANS_FALSE or not strict:
        return False

    raise TypeError(
        "The value '%s' is not a valid boolean.  Valid booleans include: %s"
        % (str(value), ", ".join(repr(i) for i in BOOLEANS))
    )


@dataclass
class BakedCommand:
    """Define a subprocess command to be executed."""

    cmd: Union[str, List[str]]
    env: Optional[Dict]
    cwd: Optional[str] = None
    stdout: Any = None
    stderr: Any = None


def dict2args(data: Dict) -> List[str]:
    """Convert a dictionary of options to command like arguments."""
    result = []
    # keep sorting in order to achieve a predictable behavior
    for k, v in sorted(data.items()):
        if v is not False:
            prefix = "-" if len(k) == 1 else "--"
            result.append(f"{prefix}{k}".replace("_", "-"))
            if v is not True:
                # { foo: True } should produce --foo without any values
                result.append(v)
    return result


def bool2args(data: bool) -> List[str]:
    """Convert a boolean value to command line argument (flag)."""
    return []


def print_as_yaml(data: Any) -> None:
    """Render python object as yaml on console."""
    result = Syntax(safe_dump(data), "yaml")
    console.print(result)