Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

aroundthecode / python-miio   python

Repository URL to install this package:

Version: 0.4.4 

/ chuangmi_ir.py

import base64
import re

import click
from construct import (
    Struct, Const, Rebuild, this, len_, Adapter, Computed,
    Int16ul, Int32ul, Int16ub, Array, BitStruct, BitsInteger,
)

from .click_common import command, format_output
from .device import Device, DeviceException


class ChuangmiIrException(DeviceException):
    pass


class ChuangmiIr(Device):
    """Main class representing Chuangmi IR Remote Controller."""

    PRONTO_RE = re.compile(r'^([\da-f]{4}\s?){3,}([\da-f]{4})$', re.IGNORECASE)

    @command(
        click.argument("key", type=int),
        default_output=format_output("Learning command into storage key {key}")
    )
    def learn(self, key: int=1):
        """Learn an infrared command.

        :param int key: Storage slot, must be between 1 and 1000000"""

        if key < 1 or key > 1000000:
            raise ChuangmiIrException("Invalid storage slot.")
        return self.send("miIO.ir_learn", {'key': str(key)})

    @command(
        click.argument("key", type=int),
        default_output=format_output("Reading infrared command from storage key {key}")
    )
    def read(self, key: int=1):
        """Read a learned command.

        Positive response (chuangmi.ir.v2):
        {'key': '1', 'code': 'Z6WPAasBAAA3BQAA4AwJAEA....AAABAAEBAQAAAQAA=='}

        Negative response (chuangmi.ir.v2):
        {'error': {'code': -5002, 'message': 'no code for this key'}, 'id': 5}

        Negative response (chuangmi.ir.v2):
        {'error': {'code': -5003, 'message': 'learn timeout'}, 'id': 17}

        :param int key: Slot to read from"""

        if key < 1 or key > 1000000:
            raise ChuangmiIrException("Invalid storage slot.")
        return self.send("miIO.ir_read", {'key': str(key)})

    def play_raw(self, command: str, frequency: int=38400):
        """Play a captured command.

        :param str command: Command to execute
        :param int frequency: Execution frequency"""
        return self.send("miIO.ir_play",
                         {'freq': frequency, 'code': command})

    def play_pronto(self, pronto: str, repeats: int=1):
        """Play a Pronto Hex encoded IR command.
        Supports only raw Pronto format, starting with 0000.

        :param str pronto: Pronto Hex string.
        :param int repeats: Number of extra signal repeats."""
        return self.play_raw(*self.pronto_to_raw(pronto, repeats))

    @classmethod
    def pronto_to_raw(cls, pronto: str, repeats: int=1):
        """Play a Pronto Hex encoded IR command.
        Supports only raw Pronto format, starting with 0000.

        :param str pronto: Pronto Hex string.
        :param int repeats: Number of extra signal repeats."""
        if repeats < 0:
            raise ChuangmiIrException('Invalid repeats value')

        try:
            pronto_data = Pronto.parse(bytearray.fromhex(pronto))
        except Exception as ex:
            raise ChuangmiIrException("Invalid Pronto command") from ex

        if len(pronto_data.intro) == 0:
            repeats += 1

        times = set()
        for pair in pronto_data.intro + pronto_data.repeat * (1 if repeats else 0):
            times.add(pair.pulse)
            times.add(pair.gap)

        times = sorted(times)
        times_map = {t: idx for idx, t in enumerate(times)}
        edge_pairs = []
        for pair in pronto_data.intro + pronto_data.repeat * repeats:
            edge_pairs.append({
                'pulse': times_map[pair.pulse],
                'gap': times_map[pair.gap],
            })

        signal_code = base64.b64encode(ChuangmiIrSignal.build({
            'times_index': times + [0] * (16 - len(times)),
            'edge_pairs': edge_pairs,
        })).decode()

        return signal_code, int(round(pronto_data.frequency))

    @command(
        click.argument("command", type=str),
        default_output=format_output("Playing the supplied command")
    )
    def play(self, command: str):
        """Plays a command in one of the supported formats."""
        if ":" not in command:
            if self.PRONTO_RE.match(command):
                command_type = 'pronto'
            else:
                command_type = 'raw'
            command_args = []
        else:
            command_type, command, *command_args = command.split(":")

        if command_type == "raw":
            play_method = self.play_raw
            arg_types = [int]
        elif command_type == "pronto":
            play_method = self.play_pronto
            arg_types = [int]
        else:
            raise ChuangmiIrException("Invalid command type")

        if len(command_args) > len(arg_types):
            raise ChuangmiIrException("Invalid command arguments count")

        try:
            command_args = [t(v) for v, t in zip(command_args, arg_types)]
        except Exception as ex:
            raise ChuangmiIrException('Invalid command arguments') from ex

        return play_method(command, *command_args)


class ProntoPulseAdapter(Adapter):
    def _decode(self, obj, context, path):
        return int(obj * context._.modulation_period)

    def _encode(self, obj, context, path):
        raise RuntimeError('Not implemented')


ChuangmiIrSignal = Struct(
    Const(0xa567, Int16ul),
    'edge_count' / Rebuild(Int16ul, len_(this.edge_pairs) * 2 - 1),
    'times_index' / Array(16, Int32ul),
    'edge_pairs' / Array((this.edge_count + 1) // 2, BitStruct(
        'gap' / BitsInteger(4),
        'pulse' / BitsInteger(4),
    ))
)

ProntoBurstPair = Struct(
    'pulse' / ProntoPulseAdapter(Int16ub),
    'gap' / ProntoPulseAdapter(Int16ub),
)

Pronto = Struct(
    Const(0, Int16ub),
    '_ticks' / Int16ub,
    'modulation_period' / Computed(this._ticks * 0.241246),
    'frequency' / Computed(1000000 / this.modulation_period),
    'intro_len' / Int16ub,
    'repeat_len' / Int16ub,
    'intro' / Array(this.intro_len, ProntoBurstPair),
    'repeat' / Array(this.repeat_len, ProntoBurstPair),
)