Repository URL to install this package:
|
Version:
1.0.0 ▾
|
frutils
/
frutils_cli.py
|
|---|
# -*- coding: utf-8 -*-
"""Commandline-line related utility methods that are used across the frkl-suite (https://frkl.io) of tools.
To use this module, the 'Click' and 'cursor' packages need to be available.
"""
import logging
import subprocess # nosec
import sys
import click
import cursor
import logzero
from plumbum import local
from six import string_types
from .frutils import parse_host_string, parse_ssh_config_string, readable
log = logging.getLogger("frutils")
class CursorOff(object):
"""A context-manager wrapper to hide the cursor on long(-ish) running computations for a cli-app."""
def __enter__(self):
"""Hide cursor."""
cursor.hide()
def __exit__(self, *args):
"""Show cursor again."""
cursor.show()
class HostType(click.ParamType):
name = "host_type"
def convert(self, value, param, ctx):
try:
details = parse_host_string(value)
return details
except (Exception):
self.fail("%s is not a valid host string" % value, param, ctx)
class HostTypePlus(HostType):
name = "host_type_plus"
def list_hosts(self):
try:
status_out = (
subprocess.check_output( # nosec
["vagrant", "status", "--machine-readable"]
)
.decode("utf-8")
.rstrip()
)
except (Exception):
return None
hosts = {}
for line in status_out.split("\n"):
tokens = line.split(",")
# timestamp = tokens[0]
hostname = tokens[1]
data_type = tokens[2]
data = tokens[3:]
if data_type == "state":
if data[0] == "running":
hosts[hostname] = True
else:
hosts[hostname] = False
return hosts
def convert(self, value, param, ctx):
if value == "vagrant" or value.startswith("vagrant:"):
return self.process_vagrant(value)
elif "find-pi" in value:
return self.process_find_pi(value)
else:
return super(HostTypePlus, self).convert(value, param, ctx)
def process_find_pi(self, value):
if value == "find-pi":
host = "default"
user = "pi"
else:
if "@" in value:
user, host = value.split("@")
else:
click.echo(
"Can't parse Raspberry pi host-value, '{}', please use '<user>@find-pi".format(
value
)
)
sys.exit(1)
log.debug("Raspberry Pi user: '{}', host alias: '{}'".format(user, host))
nmap_available = check_local_executable("arp")
if not nmap_available:
click.echo(
"\n'arp' is not installed on this machine, can't discover Rasperry Pis on this network. Either install it manually, or use the 'pkg-arp-installed' frecklecutable:\n\nfrecklecute pkg-arp-installed\n"
)
sys.exit(1)
arp = local["arp"]
awk = local["awk"]
chain = arp["-n"] | awk["/b8:27:eb/ {print $1}"]
result = chain()
addresses = result.strip().split("\n")
if len(addresses) == 0:
click.echo(
"No Raspberry Pi found in this network, you'll have to specify the ip address manually..."
)
sys.exit()
if len(addresses) > 1:
click.echo("\nMore than one IP addresses for Raspbery pi's found:")
for a in addresses:
click.echo(" - {}".format(a))
click.echo()
click.echo("Using first one: {}".format(addresses[0]))
else:
click.echo(
"\nFound exactly one IP belonging to a Raspberry Pi: {}".format(
addresses[0]
)
)
address = addresses[0]
host_details = {"host": address, "user": user, "connection_type": "ssh"}
return host_details
def process_vagrant(self, value):
if value == "vagrant":
host = "default"
else:
host = value.split(":")[1]
hosts = self.list_hosts()
if hosts is None:
self.fail("Not in a Vagrant folder, or Vagrant not installed.")
return
if host not in hosts.keys():
self.fail(
"No Vagrant host '{}' available. Are you in the right folder?".format(
host
)
)
return
if not hosts[host]:
self.fail("Vagrant host '{}' not running.".format(host))
config = subprocess.check_output(["vagrant", "ssh-config", host]).decode(
"utf-8"
) # nosec
host_details = parse_ssh_config_string(config)
host_details["connection_type"] = "ssh"
host_details["is_vagrant"] = True
return host_details
def configure_logger(*logger_names, **kwargs):
verbosity = kwargs.get("verbosity", logging.WARN)
logformat = kwargs.get("logformat", None)
dateformat = kwargs.get("dateformat", None)
color = kwargs.get("color", True)
if verbosity is None:
verbosity = logging.WARN
if isinstance(verbosity, string_types):
verbosity = logging._nameToLevel[verbosity]
if logformat is None:
logformat = "%(color)s[%(levelname)1.1s %(name)s]%(end_color)s %(message)s"
if dateformat is None:
dateformat = "%H:%M:%S"
formatter = logzero.LogFormatter(color=color, fmt=logformat, datefmt=dateformat)
logzero.loglevel(verbosity)
for logger_name in logger_names:
logzero.setup_logger(logger_name, level=verbosity, formatter=formatter)
DEFAULT_LOG_NAMES = [
"freckles",
"ting",
"frutils",
"frkl",
"nsbl",
"tempting",
"shellting",
]
def logzero_option(*logger_names, **kwargs):
if not logger_names:
logger_names = DEFAULT_LOG_NAMES
def option(f):
def _configure_logging(ctx, param, value):
kwargs["verbosity"] = value.upper()
configure_logger(*logger_names, **kwargs)
default_verbosity = kwargs.get("default_verbosity", "WARNING")
return click.option(
"--verbosity",
callback=_configure_logging,
default=default_verbosity,
metavar="LEVEL",
expose_value=False,
help="The log level: CRITICAL, ERROR, WARNING, INFO, DEBUG",
type=click.Choice(["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG"]),
required=False,
is_eager=True,
)(f)
return option
def output_item(item_name, item, format, pager):
"""Print a 'title'-'item' formatted text."""
click.secho("{}:".format(item_name), bold=True)
click.echo("")
output(item, output_type=format, pager=pager)
click.echo("")
def output(
python_object,
output_type="raw",
pager=False,
safe=False,
indent=0,
nl=True,
ignore_aliases=False,
sort_keys=False,
yaml_representers=None,
):
"""Print a Python object in a certain format."""
output_string = readable(
python_object,
out=output_type,
safe=safe,
indent=indent,
ignore_aliases=ignore_aliases,
sort_keys=sort_keys,
yaml_representers=yaml_representers,
)
if pager:
click.echo_via_pager(output_string)
else:
click.echo(output_string, nl=nl)
# def create_flag_var(help_string, flag_names):
# """Create a dict to be used with lucify to create a flag option for a cli interface.
#
# Args:
# help_string (str): the user-facing explanation what this flag does
# flag_names (list): the cli option/flag names (e.g. ["--help", "-h"])
#
# Returns:
# dict: configuration to create a flag option
#
# """
# flag_dict = {
# "type": bool,
# "required": False,
# "default": False,
# "doc": {"help": help_string},
# "click": {"option": {"is_flag": True, "param_decls": flag_names}},
# }
#
# return flag_dict
#
# def create_output_type_var():
# """Create a dict to be used with lucify to create an option to specify the output format for a command in a cli interface."""
# return {
# "type": str,
# "required": False,
# "doc": {"help": "the output type"},
# "click": {
# "option": {
# "param_decls": ["--format", "-f"],
# "type": click.Choice(["yaml", "json", "raw"]),
# }
# },
# }
#
# def create_pager_var():
# """Create a dict to be used with lucify to create an option to use a pager in a cli interface command."""
# return {
# "type": bool,
# "required": False,
# "default": False,
# "doc": {"help": "whether to use a pager for display"},
# "click": {
# "option": {"param_decls": ["--pager", "-p"], "type": bool, "is_flag": True}
# },
# }
# # cli util methods
# def clean_user_input(user_input, vars_desc):
# """Clean up and re-map user input according to the description used to create the cli options/arguments.
#
# Args:
# user_input (dict): the dictionary with the arg-name/value mapping.
# vars_desc (dict): the dictionary used to create the cli options/arguments (with luci/lucify)
#
# Returns:
# dict: the final user input key/values
#
# """
# result = OrderedDict()
# for key in vars_desc.keys():
#
# if user_input.get(key, None) is not None:
# add_key_to_dict(result, key, user_input[key])
#
# return result
# def convert_args_to_dict(args):
# """Ensure the input to create cli options is a dictionary.
#
# If that is not the case, the input list or string will be converted to a (properly formatted) dict.
#
# Args:
# args (list, str, dict): the input to create the cli options
#
# Returns:
# OrderedDict: the formatted dict
#
# """
# if isinstance(args, string_types):
# args = [args]
#
# result = OrderedDict()
#
# for arg in args:
#
# if isinstance(arg, (dict, OrderedDict, CommentedMap)):
# dict_merge(result, arg, copy_dct=False)
# elif isinstance(arg, string_types):
# arg_dict = {arg: create_default_arg(arg)}
# dict_merge(result, arg_dict, copy_dct=False)
# else:
# raise Exception("Can't parse arg(s): {}".format(str(args)))
#
# return result
# def create_parameter(arg_name=None, arg_details=None, type_map=None):
# """Creates a :class:`~Parameter` object.
#
# Args:
# arg_name (str): the parameter name
# arg_details (dict): the parameter details
#
# Returns:
# Parameter: the parameter object
# """
# if arg_name is not None:
# return Parameter(arg_name, arg_details, type_map=type_map)
# else:
# raise Exception("No arg_name provided")
# def create_parameters(args, default_enabled=True, default_vars=None, type_map=None):
# """Parses an list/string/dictionary and returns an expanded dict of arguments.
#
# Args:
# args: the argument(s)
# default_enabled (bool): whether to enable/disable options by default (not implemented yet)
# default_vars (dict): (extra) default vars
# type_map (dict): type mapping of click to cerberus types
#
# Returns:
# list: a list of options/arguments
# """
#
# if default_vars is None:
# default_vars = {}
#
# default_vars["omit"] = OMIT_VALUE
#
# parameters = []
# parameters_optional = []
#
# if isinstance(args, (dict, CommentedMap, OrderedDict)):
#
# for name, details in args.items():
#
# parameter = create_parameter(name, details, type_map=type_map)
# default = default_vars.get(name, None)
# if default: # TODO: check whether that should instead test for None
# parameter.set_default(default)
# if parameter.scheme.get("required", True):
# parameters.append(parameter)
# else:
# parameters_optional.append(parameter)
#
# elif isinstance(args, string_types):
# parameter = create_parameter(args, None, type_map=type_map)
# default = default_vars.get(args, None)
# if default: # TODO: check whether that should instead test for None
# parameter.set_default(default)
# if parameter.scheme.get("required", True):
# parameters.append(parameter)
# else:
# parameters_optional.append(parameter)
# elif isinstance(args, (list, tuple, CommentedSeq)):
# for arg in args:
# if isinstance(arg, string_types):
# parameter = create_parameter(arg, None, type_map=type_map)
# default = default_vars.get(arg, None)
# if default: # TODO: check whether that should instead test for None
# parameter.set_default(default)
# if parameter.scheme.get("required", True):
# parameters.append(parameter)
# else:
# parameters_optional.append(parameter)
# elif isinstance(arg, (dict, OrderedDict, CommentedMap)):
# for key, value in arg.items():
# parameter = create_parameter(key, value, type_map=type_map)
# default = default_vars.get(key, None)
# if default: # TODO: check whether that should instead test for None
# parameter.set_default(default)
# parameters.append(parameter)
# if parameter.scheme.get("required", True):
# parameters.append(parameter)
# else:
# parameters_optional.append(parameter)
#
# else:
# raise Exception("Invalid type for argument: {}".format(arg))
# else:
# raise Exception("Invalid type for argument: {}".format(args))
#
# result = Parameters(parameters + parameters_optional)
#
# return result
# def parse_to_click_args_list(args, default_enabled=True):
# """Parse a dictionary and create a list of options/arguments the 'click' library can use to render a command-line interface.
#
# Args:
# args (dict): the input dictionary
# default_enabled (bool): whether to enable/disable options by default
#
# Returns:
# list: a list of :class:~click.Option and :class:~click.Argument objects
#
# """
# options_list = []
#
# if not isinstance(args, (dict, OrderedDict, CommentedMap)):
# args = convert_args_to_dict(args)
#
# for opt_name, details in args.items():
#
# if KEY_PARAM_ENABLED_NAME in details.keys():
# cli_enabled = details[KEY_PARAM_ENABLED_NAME]
# else:
# cli_enabled = default_enabled
#
# if not cli_enabled:
# log.debug("Argument '{}' not enabled for cli. Ignoring.".format(opt_name))
# continue
#
# # meta = details.get(KEY_META_GROUP, {})
# doc = details.get(KEY_DOC_GROUP, {})
# cli = details.get(KEY_CLI_GROUP, {})
#
# # arg_type = details.get(KEY_TYPE_NAME, KEY_TYPE_DEFAULT)
# required = details.get(KEY_REQUIRED_NAME, KEY_REQUIRED_DEFAULT)
# default = details.get(KEY_DEFAULT_VALUE_NAME, None)
# help_string = doc.get(KEY_HELP_NAME, None)
#
# alias = details.get(KEY_ALIAS_NAME, opt_name.replace(".", "_"))
# cli_option = cli.get(KEY_CLI_CLICK_OPTIONS_NAME, None)
# cli_argument = cli.get(KEY_CLI_CLICK_ARGUMENT_NAME, None)
#
# if cli_option and cli_argument:
# raise Exception(
# "Both '{}' and '{}' specified for argument '{}'. This is not possible, please remove one key.".format(
# KEY_CLI_CLICK_OPTIONS_NAME, KEY_CLI_CLICK_ARGUMENT_NAME, opt_name
# )
# )
#
# param_is_option = True
# if cli_option:
# cli_parameters = cli_option
# elif cli_argument:
# cli_parameters = cli_argument
# param_is_option = False
# else:
# cli_parameters = {}
#
# default_parameters = {KEY_REQUIRED_NAME: required, KEY_HELP_NAME: help_string}
# if param_is_option:
# default_parameters[KEY_CLI_CLICK_PARAM_NAME] = ["--{}".format(alias)]
# else:
# default_parameters[KEY_CLI_CLICK_PARAM_NAME] = [alias]
# default_parameters[KEY_DEFAULT_VALUE_NAME] = default
#
# # pprint.pprint(default_parameters)
# # pprint.pprint(cli_parameters)
# # print("--------")
# param_details = dict_merge(default_parameters, cli_parameters, copy_dct=True)
#
# opt_type = param_details.get("type", None)
# if isinstance(opt_type, (list, tuple, CommentedSeq)):
# log.debug(
# "Found list in 'click.*.type' value, converting to click.Choice: {}".format(
# opt_type
# )
# )
# opt_type = click.Choice(opt_type)
# param_details["type"] = opt_type
# elif isinstance(opt_type, string_types):
# opt_type_converted = locate(opt_type)
# if not opt_type_converted:
# raise Exception("No type found for: {}".format(opt_type))
#
# if inspect.isclass(opt_type_converted):
#
# if issubclass(opt_type_converted, click.ParamType):
# param_details["type"] = opt_type_converted()
# else:
# param_details["type"] = opt_type_converted
#
# if param_is_option:
# option_names = param_details.pop(KEY_CLI_CLICK_PARAM_NAME)
# o = click.Option(option_names, **param_details)
# else:
# param_details.pop(KEY_HELP_NAME)
# o = click.Argument(**param_details)
#
# # hope this doesn't interfere with anything internal in click
# o.name = opt_name
#
# options_list.append(o)
#
# return options_list
def check_local_executable(executable_name, msg_if_missing=None, exit_if_missing=False):
"""
Checks whether an executable is available on the local machine.
"""
try:
local[executable_name]
return True
except (Exception) as e:
log.debug("failed to load 'sshpass': {}".format(e))
if msg_if_missing:
click.echo(msg_if_missing)
if exit_if_missing is True or (isinstance(exit, int) and exit_if_missing > 0):
if exit_if_missing is True:
sys.exit(1)
else:
sys.exit(exit_if_missing)
return False
def output_to_terminal(line, nl=True, no_output=False):
if no_output:
return
click.echo(line.encode("utf-8"), nl=nl)