Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
frutils / frutils_cli.py
Size: Mime:
# -*- coding: utf-8 -*-

"""Commandline-line related utility methods that are used across the frkl-suite (https://frkl.io) of tools.

To use this module, the 'Click' and 'cursor' packages need to be available.
"""
import logging
import subprocess  # nosec
import sys

import click
import cursor
import logzero
from plumbum import local
from six import string_types

from .frutils import parse_host_string, parse_ssh_config_string, readable

log = logging.getLogger("frutils")


class CursorOff(object):
    """A context-manager wrapper to hide the cursor on long(-ish) running computations for a cli-app."""

    def __enter__(self):
        """Hide cursor."""
        cursor.hide()

    def __exit__(self, *args):
        """Show cursor again."""
        cursor.show()


class HostType(click.ParamType):

    name = "host_type"

    def convert(self, value, param, ctx):

        try:
            details = parse_host_string(value)
            return details
        except (Exception):
            self.fail("%s is not a valid host string" % value, param, ctx)


class HostTypePlus(HostType):

    name = "host_type_plus"

    def list_hosts(self):
        try:
            status_out = (
                subprocess.check_output(  # nosec
                    ["vagrant", "status", "--machine-readable"]
                )
                .decode("utf-8")
                .rstrip()
            )
        except (Exception):
            return None

        hosts = {}
        for line in status_out.split("\n"):
            tokens = line.split(",")
            # timestamp = tokens[0]
            hostname = tokens[1]
            data_type = tokens[2]
            data = tokens[3:]
            if data_type == "state":
                if data[0] == "running":
                    hosts[hostname] = True
                else:
                    hosts[hostname] = False

        return hosts

    def convert(self, value, param, ctx):

        if value == "vagrant" or value.startswith("vagrant:"):

            return self.process_vagrant(value)

        elif "find-pi" in value:

            return self.process_find_pi(value)

        else:
            return super(HostTypePlus, self).convert(value, param, ctx)

    def process_find_pi(self, value):

        if value == "find-pi":
            host = "default"
            user = "pi"
        else:
            if "@" in value:
                user, host = value.split("@")
            else:
                click.echo(
                    "Can't parse Raspberry pi host-value, '{}', please use '<user>@find-pi".format(
                        value
                    )
                )
                sys.exit(1)

        log.debug("Raspberry Pi user: '{}', host alias: '{}'".format(user, host))

        nmap_available = check_local_executable("arp")

        if not nmap_available:

            click.echo(
                "\n'arp' is not installed on this machine, can't discover Rasperry Pis on this network. Either install it manually, or use the 'pkg-arp-installed' frecklecutable:\n\nfrecklecute pkg-arp-installed\n"
            )
            sys.exit(1)

        arp = local["arp"]
        awk = local["awk"]

        chain = arp["-n"] | awk["/b8:27:eb/ {print $1}"]

        result = chain()

        addresses = result.strip().split("\n")

        if len(addresses) == 0:
            click.echo(
                "No Raspberry Pi found in this network, you'll have to specify the ip address manually..."
            )
            sys.exit()
        if len(addresses) > 1:
            click.echo("\nMore than one IP addresses for Raspbery pi's found:")
            for a in addresses:
                click.echo("  - {}".format(a))
            click.echo()
            click.echo("Using first one: {}".format(addresses[0]))
        else:
            click.echo(
                "\nFound exactly one IP belonging to a Raspberry Pi: {}".format(
                    addresses[0]
                )
            )

        address = addresses[0]

        host_details = {"host": address, "user": user, "connection_type": "ssh"}

        return host_details

    def process_vagrant(self, value):

        if value == "vagrant":
            host = "default"
        else:
            host = value.split(":")[1]

        hosts = self.list_hosts()

        if hosts is None:
            self.fail("Not in a Vagrant folder, or Vagrant not installed.")
            return

        if host not in hosts.keys():
            self.fail(
                "No Vagrant host '{}' available. Are you in the right folder?".format(
                    host
                )
            )
            return

        if not hosts[host]:
            self.fail("Vagrant host '{}' not running.".format(host))

        config = subprocess.check_output(["vagrant", "ssh-config", host]).decode(
            "utf-8"
        )  # nosec
        host_details = parse_ssh_config_string(config)
        host_details["connection_type"] = "ssh"
        host_details["is_vagrant"] = True
        return host_details


def configure_logger(*logger_names, **kwargs):

    verbosity = kwargs.get("verbosity", logging.WARN)
    logformat = kwargs.get("logformat", None)
    dateformat = kwargs.get("dateformat", None)
    color = kwargs.get("color", True)

    if verbosity is None:
        verbosity = logging.WARN
    if isinstance(verbosity, string_types):
        verbosity = logging._nameToLevel[verbosity]

    if logformat is None:
        logformat = "%(color)s[%(levelname)1.1s %(name)s]%(end_color)s %(message)s"
    if dateformat is None:
        dateformat = "%H:%M:%S"
    formatter = logzero.LogFormatter(color=color, fmt=logformat, datefmt=dateformat)

    logzero.loglevel(verbosity)
    for logger_name in logger_names:
        logzero.setup_logger(logger_name, level=verbosity, formatter=formatter)


DEFAULT_LOG_NAMES = [
    "freckles",
    "ting",
    "frutils",
    "frkl",
    "nsbl",
    "tempting",
    "shellting",
]


def logzero_option(*logger_names, **kwargs):

    if not logger_names:
        logger_names = DEFAULT_LOG_NAMES

    def option(f):
        def _configure_logging(ctx, param, value):

            kwargs["verbosity"] = value.upper()

            configure_logger(*logger_names, **kwargs)

        default_verbosity = kwargs.get("default_verbosity", "WARNING")
        return click.option(
            "--verbosity",
            callback=_configure_logging,
            default=default_verbosity,
            metavar="LEVEL",
            expose_value=False,
            help="The log level: CRITICAL, ERROR, WARNING, INFO, DEBUG",
            type=click.Choice(["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG"]),
            required=False,
            is_eager=True,
        )(f)

    return option


def output_item(item_name, item, format, pager):
    """Print a 'title'-'item' formatted text."""
    click.secho("{}:".format(item_name), bold=True)
    click.echo("")
    output(item, output_type=format, pager=pager)
    click.echo("")


def output(
    python_object,
    output_type="raw",
    pager=False,
    safe=False,
    indent=0,
    nl=True,
    ignore_aliases=False,
    sort_keys=False,
    yaml_representers=None,
):
    """Print a Python object in a certain format."""
    output_string = readable(
        python_object,
        out=output_type,
        safe=safe,
        indent=indent,
        ignore_aliases=ignore_aliases,
        sort_keys=sort_keys,
        yaml_representers=yaml_representers,
    )

    if pager:
        click.echo_via_pager(output_string)
    else:
        click.echo(output_string, nl=nl)


# def create_flag_var(help_string, flag_names):
#     """Create a dict to be used with lucify to create a flag option for a cli interface.
#
#     Args:
#       help_string (str): the user-facing explanation what this flag does
#       flag_names (list): the cli option/flag names (e.g. ["--help", "-h"])
#
#     Returns:
#       dict: configuration to create a flag option
#
#     """
#     flag_dict = {
#         "type": bool,
#         "required": False,
#         "default": False,
#         "doc": {"help": help_string},
#         "click": {"option": {"is_flag": True, "param_decls": flag_names}},
#     }
#
#     return flag_dict

#
# def create_output_type_var():
#     """Create a dict to be used with lucify to create an option to specify the output format for a command in a cli interface."""
#     return {
#         "type": str,
#         "required": False,
#         "doc": {"help": "the output type"},
#         "click": {
#             "option": {
#                 "param_decls": ["--format", "-f"],
#                 "type": click.Choice(["yaml", "json", "raw"]),
#             }
#         },
#     }

#
# def create_pager_var():
#     """Create a dict to be used with lucify to create an option to use a pager in a cli interface command."""
#     return {
#         "type": bool,
#         "required": False,
#         "default": False,
#         "doc": {"help": "whether to use a pager for display"},
#         "click": {
#             "option": {"param_decls": ["--pager", "-p"], "type": bool, "is_flag": True}
#         },
#     }


# # cli util methods
# def clean_user_input(user_input, vars_desc):
#     """Clean up and re-map user input according to the description used to create the cli options/arguments.
#
#     Args:
#         user_input (dict): the dictionary with the arg-name/value mapping.
#         vars_desc (dict): the dictionary used to create the cli options/arguments (with luci/lucify)
#
#     Returns:
#         dict: the final user input key/values
#
#     """
#     result = OrderedDict()
#     for key in vars_desc.keys():
#
#         if user_input.get(key, None) is not None:
#             add_key_to_dict(result, key, user_input[key])
#
#     return result


# def convert_args_to_dict(args):
#     """Ensure the input to create cli options is a dictionary.
#
#     If that is not the case, the input list or string will be converted to a (properly formatted) dict.
#
#     Args:
#         args (list, str, dict): the input to create the cli options
#
#     Returns:
#          OrderedDict: the formatted dict
#
#     """
#     if isinstance(args, string_types):
#         args = [args]
#
#     result = OrderedDict()
#
#     for arg in args:
#
#         if isinstance(arg, (dict, OrderedDict, CommentedMap)):
#             dict_merge(result, arg, copy_dct=False)
#         elif isinstance(arg, string_types):
#             arg_dict = {arg: create_default_arg(arg)}
#             dict_merge(result, arg_dict, copy_dct=False)
#         else:
#             raise Exception("Can't parse arg(s): {}".format(str(args)))
#
#     return result


# def create_parameter(arg_name=None, arg_details=None, type_map=None):
#     """Creates a :class:`~Parameter` object.
#
#     Args:
#         arg_name (str): the parameter name
#         arg_details (dict): the parameter details
#
#     Returns:
#         Parameter: the parameter object
#     """
#     if arg_name is not None:
#         return Parameter(arg_name, arg_details, type_map=type_map)
#     else:
#         raise Exception("No arg_name provided")


# def create_parameters(args, default_enabled=True, default_vars=None, type_map=None):
#     """Parses an list/string/dictionary and returns an expanded dict of arguments.
#
#     Args:
#         args: the argument(s)
#         default_enabled (bool): whether to enable/disable options by default (not implemented yet)
#         default_vars (dict): (extra) default vars
#         type_map (dict): type mapping of click to cerberus types
#
#     Returns:
#         list: a list of options/arguments
#     """
#
#     if default_vars is None:
#         default_vars = {}
#
#     default_vars["omit"] = OMIT_VALUE
#
#     parameters = []
#     parameters_optional = []
#
#     if isinstance(args, (dict, CommentedMap, OrderedDict)):
#
#         for name, details in args.items():
#
#             parameter = create_parameter(name, details, type_map=type_map)
#             default = default_vars.get(name, None)
#             if default:  # TODO: check whether that should instead test for None
#                 parameter.set_default(default)
#             if parameter.scheme.get("required", True):
#                 parameters.append(parameter)
#             else:
#                 parameters_optional.append(parameter)
#
#     elif isinstance(args, string_types):
#         parameter = create_parameter(args, None, type_map=type_map)
#         default = default_vars.get(args, None)
#         if default:  # TODO: check whether that should instead test for None
#             parameter.set_default(default)
#         if parameter.scheme.get("required", True):
#             parameters.append(parameter)
#         else:
#             parameters_optional.append(parameter)
#     elif isinstance(args, (list, tuple, CommentedSeq)):
#         for arg in args:
#             if isinstance(arg, string_types):
#                 parameter = create_parameter(arg, None, type_map=type_map)
#                 default = default_vars.get(arg, None)
#                 if default:  # TODO: check whether that should instead test for None
#                     parameter.set_default(default)
#                 if parameter.scheme.get("required", True):
#                     parameters.append(parameter)
#                 else:
#                     parameters_optional.append(parameter)
#             elif isinstance(arg, (dict, OrderedDict, CommentedMap)):
#                 for key, value in arg.items():
#                     parameter = create_parameter(key, value, type_map=type_map)
#                     default = default_vars.get(key, None)
#                     if default:  # TODO: check whether that should instead test for None
#                         parameter.set_default(default)
#                     parameters.append(parameter)
#                     if parameter.scheme.get("required", True):
#                         parameters.append(parameter)
#                     else:
#                         parameters_optional.append(parameter)
#
#             else:
#                 raise Exception("Invalid type for argument: {}".format(arg))
#     else:
#         raise Exception("Invalid type for argument: {}".format(args))
#
#     result = Parameters(parameters + parameters_optional)
#
#     return result


# def parse_to_click_args_list(args, default_enabled=True):
#     """Parse a dictionary and create a list of options/arguments the 'click' library can use to render a command-line interface.
#
#     Args:
#         args (dict): the input dictionary
#         default_enabled (bool): whether to enable/disable options by default
#
#     Returns:
#         list: a list of :class:~click.Option and :class:~click.Argument objects
#
#     """
#     options_list = []
#
#     if not isinstance(args, (dict, OrderedDict, CommentedMap)):
#         args = convert_args_to_dict(args)
#
#     for opt_name, details in args.items():
#
#         if KEY_PARAM_ENABLED_NAME in details.keys():
#             cli_enabled = details[KEY_PARAM_ENABLED_NAME]
#         else:
#             cli_enabled = default_enabled
#
#         if not cli_enabled:
#             log.debug("Argument '{}' not enabled for cli. Ignoring.".format(opt_name))
#             continue
#
#         # meta = details.get(KEY_META_GROUP, {})
#         doc = details.get(KEY_DOC_GROUP, {})
#         cli = details.get(KEY_CLI_GROUP, {})
#
#         # arg_type = details.get(KEY_TYPE_NAME, KEY_TYPE_DEFAULT)
#         required = details.get(KEY_REQUIRED_NAME, KEY_REQUIRED_DEFAULT)
#         default = details.get(KEY_DEFAULT_VALUE_NAME, None)
#         help_string = doc.get(KEY_HELP_NAME, None)
#
#         alias = details.get(KEY_ALIAS_NAME, opt_name.replace(".", "_"))
#         cli_option = cli.get(KEY_CLI_CLICK_OPTIONS_NAME, None)
#         cli_argument = cli.get(KEY_CLI_CLICK_ARGUMENT_NAME, None)
#
#         if cli_option and cli_argument:
#             raise Exception(
#                 "Both '{}' and '{}' specified for argument '{}'. This is not possible, please remove one key.".format(
#                     KEY_CLI_CLICK_OPTIONS_NAME, KEY_CLI_CLICK_ARGUMENT_NAME, opt_name
#                 )
#             )
#
#         param_is_option = True
#         if cli_option:
#             cli_parameters = cli_option
#         elif cli_argument:
#             cli_parameters = cli_argument
#             param_is_option = False
#         else:
#             cli_parameters = {}
#
#         default_parameters = {KEY_REQUIRED_NAME: required, KEY_HELP_NAME: help_string}
#         if param_is_option:
#             default_parameters[KEY_CLI_CLICK_PARAM_NAME] = ["--{}".format(alias)]
#         else:
#             default_parameters[KEY_CLI_CLICK_PARAM_NAME] = [alias]
#         default_parameters[KEY_DEFAULT_VALUE_NAME] = default
#
#         # pprint.pprint(default_parameters)
#         # pprint.pprint(cli_parameters)
#         # print("--------")
#         param_details = dict_merge(default_parameters, cli_parameters, copy_dct=True)
#
#         opt_type = param_details.get("type", None)
#         if isinstance(opt_type, (list, tuple, CommentedSeq)):
#             log.debug(
#                 "Found list in 'click.*.type' value, converting to click.Choice: {}".format(
#                     opt_type
#                 )
#             )
#             opt_type = click.Choice(opt_type)
#             param_details["type"] = opt_type
#         elif isinstance(opt_type, string_types):
#             opt_type_converted = locate(opt_type)
#             if not opt_type_converted:
#                 raise Exception("No type found for: {}".format(opt_type))
#
#             if inspect.isclass(opt_type_converted):
#
#                 if issubclass(opt_type_converted, click.ParamType):
#                     param_details["type"] = opt_type_converted()
#                 else:
#                     param_details["type"] = opt_type_converted
#
#         if param_is_option:
#             option_names = param_details.pop(KEY_CLI_CLICK_PARAM_NAME)
#             o = click.Option(option_names, **param_details)
#         else:
#             param_details.pop(KEY_HELP_NAME)
#             o = click.Argument(**param_details)
#
#         # hope this doesn't interfere with anything internal in click
#         o.name = opt_name
#
#         options_list.append(o)
#
#     return options_list


def check_local_executable(executable_name, msg_if_missing=None, exit_if_missing=False):
    """
    Checks whether an executable is available on the local machine.

    """

    try:
        local[executable_name]
        return True
    except (Exception) as e:
        log.debug("failed to load 'sshpass': {}".format(e))
        if msg_if_missing:
            click.echo(msg_if_missing)

        if exit_if_missing is True or (isinstance(exit, int) and exit_if_missing > 0):
            if exit_if_missing is True:
                sys.exit(1)
            else:
                sys.exit(exit_if_missing)
        return False


def output_to_terminal(line, nl=True, no_output=False):

    if no_output:
        return

    click.echo(line.encode("utf-8"), nl=nl)