Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

aroundthecode / python-miio   python

Repository URL to install this package:

Version: 0.4.4 

/ click_common.py

"""Click commons.

This file contains common functions for cli tools.
"""
import ast
import sys
if sys.version_info < (3, 5):
    print("To use this script you need python 3.5 or newer, got %s" %
          (sys.version_info,))
    sys.exit(1)
import click
import ipaddress
import miio
import logging
import json
import re
from typing import Union
from functools import wraps
from functools import partial
from .exceptions import DeviceError


_LOGGER = logging.getLogger(__name__)


def validate_ip(ctx, param, value):
    if value is None:
        return None
    try:
        ipaddress.ip_address(value)
        return value
    except ValueError as ex:
        raise click.BadParameter("Invalid IP: %s" % ex)


def validate_token(ctx, param, value):
    if value is None:
        return None
    token_len = len(value)
    if token_len != 32:
        raise click.BadParameter("Token length != 32 chars: %s" % token_len)
    return value


class ExceptionHandlerGroup(click.Group):
    """Add a simple group for catching the miio-related exceptions.

    This simplifies catching the exceptions from different click commands.

    Idea from https://stackoverflow.com/a/44347763
    """
    def __call__(self, *args, **kwargs):
        try:
            return self.main(*args, **kwargs)
        except miio.DeviceException as ex:
            _LOGGER.debug("Exception: %s", ex, exc_info=True)
            click.echo(click.style("Error: %s" % ex, fg='red', bold=True))


class EnumType(click.Choice):
    def __init__(self, enumcls, casesensitive=True):
        choices = enumcls.__members__

        if not casesensitive:
            choices = (_.lower() for _ in choices)

        self._enumcls = enumcls
        self._casesensitive = casesensitive

        super().__init__(list(sorted(set(choices))))

    def convert(self, value, param, ctx):
        if not self._casesensitive:
            value = value.lower()

        value = super().convert(value, param, ctx)

        if not self._casesensitive:
            return next(_ for _ in self._enumcls if _.name.lower() == value.lower())
        else:
            return next(_ for _ in self._enumcls if _.name == value)

    def get_metavar(self, param):
        word = self._enumcls.__name__

        # Stolen from jpvanhal/inflection
        word = re.sub(r"([A-Z]+)([A-Z][a-z])", r'\1_\2', word)
        word = re.sub(r"([a-z\d])([A-Z])", r'\1_\2', word)
        word = word.replace("-", "_").lower().split("_")

        if word[-1] == "enum":
            word.pop()

        return ("_".join(word)).upper()


class LiteralParamType(click.ParamType):
    name = 'literal'

    def convert(self, value, param, ctx):
        try:
            return ast.literal_eval(value)
        except ValueError:
            self.fail('%s is not a valid literal' % value, param, ctx)


class GlobalContextObject:
    def __init__(self, debug: int=0, output: callable=None):
        self.debug = debug
        self.output = output


class DeviceGroupMeta(type):

    device_classes = set()

    def __new__(mcs, name, bases, namespace) -> type:
        commands = {}

        def _get_commands_for_namespace(namespace):
            commands = {}
            for key, val in namespace.items():
                if not callable(val):
                    continue
                device_group_command = getattr(val, '_device_group_command', None)
                if device_group_command is None:
                    continue
                commands[device_group_command.command_name] = device_group_command

            return commands

        # 1. Go through base classes for commands
        for base in bases:
            commands.update(getattr(base, '_device_group_commands', {}))

        # 2. Add commands from the current class
        commands.update(_get_commands_for_namespace(namespace))

        namespace['_device_group_commands'] = commands
        if 'get_device_group' not in namespace:

            def get_device_group(dcls):
                return DeviceGroup(dcls)

            namespace['get_device_group'] = classmethod(get_device_group)

        cls = super().__new__(mcs, name, bases, namespace)
        mcs.device_classes.add(cls)
        return cls


class DeviceGroup(click.MultiCommand):

    class Command:
        def __init__(self, name, decorators, *, default_output=None, **kwargs):
            self.name = name
            self.decorators = list(decorators)
            self.decorators.reverse()
            self.default_output = default_output

            self.kwargs = kwargs

        def __call__(self, func):
            self.func = func
            func._device_group_command = self
            self.kwargs.setdefault('help', self.func.__doc__)
            return func

        @property
        def command_name(self):
            return self.name or self.func.__name__.lower()

        def wrap(self, ctx, func):
            gco = ctx.find_object(GlobalContextObject)
            if gco is not None and gco.output is not None:
                output = gco.output
            elif self.default_output:
                output = self.default_output
            else:
                output = format_output(
                    "Running command {0}".format(self.command_name)
                )

            func = output(func)
            for decorator in self.decorators:
                func = decorator(func)
            return click.command(self.command_name, **self.kwargs)(func)

        def call(self, owner, *args, **kwargs):
            method = getattr(owner, self.func.__name__)
            return method(*args, **kwargs)

    DEFAULT_PARAMS = [
        click.Option(['--ip'], required=True, callback=validate_ip),
        click.Option(['--token'], required=True, callback=validate_token),
    ]

    def __init__(self, device_class, name=None, invoke_without_command=False,
                 no_args_is_help=None, subcommand_metavar=None, chain=False,
                 result_callback=None, result_callback_pass_device=True,
                 **attrs):

        self.commands = getattr(device_class, '_device_group_commands', None)
        if self.commands is None:
            raise RuntimeError(
                "Class {} doesn't use DeviceGroupMeta meta class."
                " It can't be used with DeviceGroup."
            )

        self.device_class = device_class
        self.device_pass = click.make_pass_decorator(device_class)

        attrs.setdefault('params', self.DEFAULT_PARAMS)
        attrs.setdefault('callback', click.pass_context(self.group_callback))
        if result_callback_pass_device and callable(result_callback):
            result_callback = self.device_pass(result_callback)

        super().__init__(name or device_class.__name__.lower(),
                         invoke_without_command, no_args_is_help,
                         subcommand_metavar, chain, result_callback, **attrs)

    def group_callback(self, ctx, *args, **kwargs):
        gco = ctx.find_object(GlobalContextObject)
        if gco:
            kwargs['debug'] = gco.debug
        ctx.obj = self.device_class(*args, **kwargs)

    def command_callback(self, miio_command, miio_device, *args, **kwargs):
        return miio_command.call(miio_device, *args, **kwargs)

    def get_command(self, ctx, cmd_name):
        if cmd_name not in self.commands:
            ctx.fail('Unknown command (%s)' % cmd_name)

        cmd = self.commands[cmd_name]
        return self.commands[cmd_name].wrap(ctx, self.device_pass(partial(
            self.command_callback, cmd
        )))

    def list_commands(self, ctx):
        return sorted(self.commands.keys())


def command(*decorators, name=None, default_output=None, **kwargs):
    return DeviceGroup.Command(
        name, decorators, default_output=default_output, **kwargs
    )


def format_output(msg_fmt: Union[str, callable]= "",
                  result_msg_fmt: Union[str, callable]="{result}"):
    def decorator(func):
        @wraps(func)
        def wrap(*args, **kwargs):
            if msg_fmt:
                if callable(msg_fmt):
                    msg = msg_fmt(**kwargs)
                else:
                    msg = msg_fmt.format(**kwargs)
                if msg:
                    click.echo(msg.strip())
            kwargs['result'] = func(*args, **kwargs)
            if result_msg_fmt:
                if callable(result_msg_fmt):
                    result_msg = result_msg_fmt(**kwargs)
                else:
                    result_msg = result_msg_fmt.format(**kwargs)
                if result_msg:
                    click.echo(result_msg.strip())
        return wrap
    return decorator


def json_output(pretty=False):
    indent = 2 if pretty else None

    def decorator(func):
        @wraps(func)
        def wrap(*args, **kwargs):
            try:
                result = func(*args, **kwargs)
            except DeviceError as ex:
                click.echo(json.dumps(ex.args[0], indent=indent))
                return

            get_json_data_func = getattr(result, '__json__', None)
            if get_json_data_func is not None:
                result = get_json_data_func()
            click.echo(json.dumps(result, indent=indent))

        return wrap
    return decorator