Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
pyledctrl / src / pyledctrl / executor.py
Size: Mime:
"""Executor for abstract syntax trees generated by the LedCtrl compiler."""

from decimal import Decimal, ROUND_DOWN, ROUND_UP
from itertools import chain, count, groupby
from numbers import Number
from operator import attrgetter
from typing import ClassVar, Iterable, NamedTuple, Optional, TypeVar, Union

from .compiler.ast import (
    Duration,
    EndCommand,
    FadeToBlackCommand,
    FadeToColorCommand,
    FadeToGrayCommand,
    FadeToWhiteCommand,
    LoopBlock,
    SetBlackCommand,
    SetColorCommand,
    SetGrayCommand,
    SetWhiteCommand,
    SleepCommand,
    StatementSequence,
    WaitUntilCommand,
)
from .utils import consecutive_pairs, last

__all__ = ("Color", "ExecutorState", "Executor")


_Color = NamedTuple("Color", [("red", int), ("green", int), ("blue", int)])
_Color.__new__.__defaults__ = (0, 0, 0)


C = TypeVar("C", bound="Color")


class Color(_Color):
    """Color of LEDs on a LED strip."""

    BLACK: ClassVar["Color"]
    WHITE: ClassVar["Color"]

    @classmethod
    def black(cls):
        """Returns an instance of the color black."""
        return cls.BLACK

    @classmethod
    def gray(cls, value: int):
        """Returns an instance of the color gray with the given value."""
        return cls(red=value, green=value, blue=value)

    @classmethod
    def white(cls):
        """Returns an instance of the color white."""
        return cls.WHITE

    def mix_with(self: C, other: C, ratio: float = 0.5, integral: bool = False) -> C:
        """Mixes the RGB components of this color with some other color,
        and returns a new color.

        Parameters:
            other: the other color to mix with this color
            ratio: the mixing ratio
            integral: whether the new RGB values should be rounded to the
                nearest integer

        Returns:
            the new, mixed color
        """
        if ratio <= 0:
            return self
        elif ratio >= 1:
            return other

        red = self.red * (1 - ratio) + other.red * ratio
        green = self.green * (1 - ratio) + other.green * ratio
        blue = self.blue * (1 - ratio) + other.blue * ratio

        if integral:
            red, green, blue = round(red), round(green), round(blue)

        return self._replace(red=red, green=green, blue=blue)

    def update_from(self, obj):
        """Updates the color from another object that has ``red``, ``green``
        and ``blue`` properties containing numeric abstract syntax tree nodes,
        and returns the updated color.
        """
        return self._replace(
            red=obj.red.value, green=obj.green.value, blue=obj.blue.value
        )


Color.BLACK = Color.gray(0)
Color.WHITE = Color.gray(255)


class ExecutorState:
    """Mutable state of an executor object."""

    def __init__(
        self, timestamp: float = 0, color: Optional[Color] = None, is_fade: bool = False
    ):
        """Constructor.

        Parameters:
            timestamp: the initial timestamp
            color: the initial color (if any), defaults to black
        """
        self.timestamp = timestamp
        self.color = Color.black() if color is None else color
        self.is_fade = is_fade

    def advance_time_by(self, duration: Union[Duration, Decimal, float]):
        """Increases the timestamp of the state with the given duration.

        Parameters:
            duration: the duration to increase the timestamp with; may be a
                number (which is treated as seconds), or a Duration AST node
        """
        if not isinstance(duration, Number):
            # We assume that 'duration' is a numeric AST node and its value
            # gives the duration in units specified by Duration.FPS
            assert isinstance(duration, Duration)
            duration = duration.value / Duration.FPS
        else:
            # We assume that 'duration' specifies the duration in seconds
            pass
        self.timestamp += duration  # type: ignore

    def copy(self):
        """Returns an independent copy of this state object."""
        return self.__class__(
            timestamp=self.timestamp, color=self.color, is_fade=self.is_fade
        )


class StopExecution(Exception):
    """Exception raised by an executor to stop execution."""

    pass


def do_nothing(*args, **kwds) -> Iterable[ExecutorState]:
    """Fake opcode handler for the executor in case we encounter an opcode that
    we don't need to react to.
    """
    return iter(())


class Executor:
    """Executor for abstract syntax trees generated by the LedCtrl compiler.

    The executor manages the state of a virtual LED strip with red, green and
    blue channels, and is able to execute the command represented by any
    abstract tree node on the virtual LED strip. The executor method yields
    state objects for each interesting point in the execution where a color
    change occurs.
    """

    state: ExecutorState
    """The current state of the executor."""

    def __init__(self):
        """Constructor.

        Creates a virtual LED strip set to black color at timestamp zero.
        """
        self.state = ExecutorState()

    def execute(self, node) -> Iterable[ExecutorState]:
        """Executes the command(s) in the given abstract syntax tree node
        and updates the state accordingly, yielding the state after every
        timestamp change.

        Note that the state is copied before it is yielded back to the caller,
        so it is safe to mutate the state object outside the executor; it will
        not affect the executor itself.
        """
        try:
            for state in self._execute(node):
                yield state.copy()
        except StopExecution:
            pass

    def _execute(self, node):
        class_name = node.__class__.__name__
        method = getattr(self, "_execute_{0}".format(class_name), None)
        if method is None:
            raise RuntimeError("cannot execute {0}".format(class_name))

        for state in method(node):
            yield state

    def _execute_EndCommand(self, node: EndCommand) -> Iterable[ExecutorState]:
        raise StopExecution()

    def _execute_FadeToBlackCommand(
        self, node: FadeToBlackCommand
    ) -> Iterable[ExecutorState]:
        for state in self._fade_to(Color.black(), node.duration):
            yield state

    def _execute_FadeToColorCommand(
        self, node: FadeToColorCommand
    ) -> Iterable[ExecutorState]:
        for state in self._fade_to(
            Color(
                red=node.color.red.value,
                green=node.color.green.value,
                blue=node.color.blue.value,
            ),
            node.duration,
        ):
            yield state

    def _execute_FadeToGrayCommand(
        self, node: FadeToGrayCommand
    ) -> Iterable[ExecutorState]:
        for state in self._fade_to(Color.gray(node.value.value), node.duration):
            yield state

    def _execute_FadeToWhiteCommand(
        self, node: FadeToWhiteCommand
    ) -> Iterable[ExecutorState]:
        for state in self._fade_to(Color.white(), node.duration):
            yield state

    def _execute_LoopBlock(self, node: LoopBlock) -> Iterable[ExecutorState]:
        num_iterations = node.iterations.value
        if num_iterations > 0:
            iterator = range(num_iterations)
        else:
            iterator = count()
        for _i in iterator:
            for state in self._execute(node.body):
                yield state

    def _execute_SetBlackCommand(
        self, node: SetBlackCommand
    ) -> Iterable[ExecutorState]:
        self.state.color = Color.black()
        self.state.is_fade = False
        yield self.state
        self.state.advance_time_by(node.duration)

    def _execute_SetColorCommand(
        self, node: SetColorCommand
    ) -> Iterable[ExecutorState]:
        self.state.color = self.state.color.update_from(node.color)
        self.state.is_fade = False
        yield self.state
        self.state.advance_time_by(node.duration)

    def _execute_SetGrayCommand(self, node: SetGrayCommand) -> Iterable[ExecutorState]:
        self.state.color = Color.gray(node.value.value)
        self.state.is_fade = False
        yield self.state
        self.state.advance_time_by(node.duration)

    def _execute_SetWhiteCommand(
        self, node: SetWhiteCommand
    ) -> Iterable[ExecutorState]:
        self.state.color = Color.white()
        self.state.is_fade = False
        yield self.state
        self.state.advance_time_by(node.duration)

    def _execute_StatementSequence(
        self, node: StatementSequence
    ) -> Iterable[ExecutorState]:
        for statement in node.statements:
            for state in self._execute(statement):
                yield state

    def _execute_SleepCommand(self, node: SleepCommand) -> Iterable[ExecutorState]:
        self.state.advance_time_by(node.duration)
        self.state.is_fade = False
        yield self.state

    def _execute_WaitUntilCommand(
        self, node: WaitUntilCommand
    ) -> Iterable[ExecutorState]:
        new_timestamp = node.timestamp.value
        self.state.timestamp = max(self.state.timestamp, new_timestamp)
        self.state.is_fade = False
        yield self.state

    _execute_NopCommand = do_nothing
    _execute_SetPyroCommand = do_nothing
    _execute_SetPyroAllCommand = do_nothing

    def _fade_to(self, color: Color, duration: Duration) -> Iterable[ExecutorState]:
        if not self.state.is_fade:
            yield self.state
            self.state.is_fade = True

        self.state.advance_time_by(duration)
        self.state.color = color
        yield self.state


def _frames_between(
    start: float, end: float, fps: Decimal = Duration.FPS
) -> Iterable[Decimal]:
    """Returns an iterator yielding the timestamps of all 'whole' frames that
    fall between the given start and end times, _excluding_ the endpoints.

    Parameters:
        start: the start time, in seconds
        end: the end time, in seconds
        fps: number of frames per second

    Yields:
        timestamps between the given start and end time that correspond to
        whole frames
    """
    start_index = int((start * fps).to_integral_value(rounding=ROUND_DOWN))  # type: ignore
    end_index = int((end * fps).to_integral_value(rounding=ROUND_UP))  # type: ignore
    return (index / fps for index in range(start_index + 1, end_index))


def remove_duplicates(events):
    """Given a stream of timestamped events in ascending order, removes
    duplicate events that refer to the same time instant, except the last one.
    """
    for _, grouped_events in groupby(events, attrgetter("timestamp")):
        yield last(grouped_events)


def unroll(
    events: Iterable[ExecutorState], fps: Decimal = Duration.FPS
) -> Iterable[ExecutorState]:
    return remove_duplicates(_unroll(events, fps))


def _unroll(
    events: Iterable[ExecutorState], fps: Union[Decimal, float]
) -> Iterable[ExecutorState]:
    start = ExecutorState()
    fps = Decimal(fps)

    for prev_event, event in consecutive_pairs(chain([start], events)):
        if event.is_fade:
            event.is_fade = False
            start, end = prev_event.timestamp, event.timestamp
            length = end - start
            for timestamp in _frames_between(start, end, fps=fps):
                extra_event = event.copy()
                ratio = (timestamp - start) / length  # type: ignore
                extra_event.color = prev_event.color.mix_with(
                    event.color, ratio=ratio, integral=True
                )
                extra_event.timestamp = timestamp  # type: ignore
                yield extra_event
        yield event