Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

agriconnect / trio-gpio   python

Repository URL to install this package:

/ gpio.py

from . import libgpiod as gpio

import sys
import trio
import datetime


class Chip:
    """Represents a GPIO chip.

    Arguments:
        num: Chip number. Defaults to zero.

        consumer: A string for display by kernel utilities.
            Defaults to the program's name.

    """
    _chip = None

    def __init__(self, num=0, consumer=sys.argv[0]):
        self._num = num
        self._consumer = consumer

    def __repr__(self):
        return "%s(%s)" % (self.__class__.__name, self._num)

    def __enter__(self):
        self._chip = gpio.lib.gpiod_chip_open_by_number(self._num)
        if self._chip == gpio.ffi.NULL:
            raise OSError("unable to open chip")
        return self

    def __exit__(self, *tb):
        gpio.lib.gpiod_chip_close(self._chip)
        self._chip = None

    def line(self, offset, consumer=None):
        """Get a descriptor for a single GPIO line.

        Arguments:
            offset: GPIO number within this chip. No default.
            consumer: override the chip's consumer, if required.
        """
        if consumer is None:
            consumer = self._consumer
        return Line(self, offset, consumer=consumer)


_FREE = 0
_PRE_IO = 1
_IN_IO = 2
_PRE_EV = 3
_IN_EV = 4
_IN_USE = {_IN_IO, _IN_EV}


class Line:
    """Represents a single GPIO line.

    Create this object by calling :meth:`Chip.line`.
    """
    _line = None
    _direction = None
    _flags = None
    _ev_flags = None
    _state = _FREE

    def __init__(self, chip, offset, consumer=sys.argv[0][:-3]):
        self._chip = chip
        self._offset = offset
        self._consumer = consumer.encode("utf-8")
        self.__consumer = gpio.ffi.new("char[]", self._consumer)

    def __repr__(self):
        return "<%s %s:%d %s=%d>" % (
            self.__class__.__name__, self._chip, self._offset, self._line, self._state
        )

    def open(self, direction=gpio.DIRECTION_INPUT, default=False, flags=0):
        """
        Create a context manager for controlling this line's input or output.

        Arguments:
            direction: input or output. Default: gpio.DIRECTION_INPUT.
            flags: to request pull-up/down resistors or open-collector outputs.

        Example::
            with gpio.Chip(0) as chip:
                line = chip.line(16)
                with line.open(direction=gpio.DIRECTION_INPUT) as wire:
                    print(wire.value)
        """
        if self._state in _IN_USE:
            raise OSError("This line is already in use")
        self._direction = direction
        self._default = default
        self._flags = flags
        self._state = _PRE_IO
        return self

    def __enter__(self):
        """Context management for use with :meth:`open` and :meth:`monitor`."""
        if self._state in _IN_USE:
            raise OSError("This line is already in use")
        if self._state == _FREE:
            raise RuntimeError("You need to call .open() or .event()")
        self._line = gpio.lib.gpiod_chip_get_line(self._chip._chip, self._offset)
        if self._line == gpio.ffi.NULL:
            raise OSError("unable to get line")

        if self._state == _PRE_IO:
            self._enter_io()
        elif self._state == _PRE_EV:
            self._enter_ev()
        else:
            raise RuntimeError("wrong state", self)
        return self

    def _enter_io(self):
        if self._direction == gpio.DIRECTION_INPUT:
            r = gpio.lib.gpiod_line_request_input_flags(self._line, self._consumer, self._flags)
        elif self._direction == gpio.DIRECTION_OUTPUT:
            r = gpio.lib.gpiod_line_request_output_flags(
                self._line, self._consumer, self._flags, self._default
            )
        else:
            self.__exit__()
            raise RuntimeError("Unknown direction", r)
        if r != 0:
            self.__exit__()
            raise OSError("unable to set direction", r)
        self._state = _IN_IO
        return self

    def _enter_ev(self):
        req = gpio.ffi.new("struct gpiod_line_request_config*")
        req.consumer = self.__consumer
        req.request_type = self._type
        req.flags = self._flags

        if gpio.lib.gpiod_line_request(self._line, req, 0) != 0:
            raise OSError("unable to request event monitoring")
        self._state = _IN_EV

    def __exit__(self, *tb):
        if self._line is not None:
            try:
                gpio.lib.gpiod_line_release(self._line)
            finally:
                self._line = None
        self._state = _FREE

    def _is_open(self):
        if self._state not in _IN_USE:
            raise RuntimeError("Line is not open", self)

    @property
    def value(self):
        self._is_open()
        return gpio.lib.gpiod_line_get_value(self._line)

    @value.setter
    def value(self, value):
        self._is_open()
        gpio.lib.gpiod_line_set_value(self._line, value)

    @property
    def direction(self):
        if self._line is None:
            return self._direction
        return gpio.lib.gpiod_line_direction(self._line)

    @property
    def active_state(self):
        self._is_open()
        return gpio.lib.gpiod_line_active_state(self._line)

    @property
    def is_open_drain(self):
        self._is_open()
        return gpio.lib.gpiod_line_is_open_drain(self._line)

    @property
    def is_open_source(self):
        self._is_open()
        return gpio.lib.gpiod_line_is_open_source(self._line)

    @property
    def is_used(self):
        self._is_open()
        return gpio.lib.gpiod_line_is_used(self._line)

    @property
    def offset(self):
        if self._line is None:
            return self._offset
        return gpio.lib.gpiod_line_offset(self._line)

    @property
    def name(self):
        self._is_open()
        n = gpio.lib.gpiod_line_name(self._line)
        if n == gpio.ffi.NULL:
            return None
        return n

    @property
    def consumer(self):
        if self._line is None:
            return self._consumer
        n = gpio.lib.gpiod_line_consumer(self._line)
        if n == gpio.ffi.NULL:
            return None
        return gpio.ffi.string(n).decode("utf-8")

    def monitor(self, type=gpio.REQUEST_EVENT_RISING_EDGE, flags=0):
        """
        Monitor events.

        Arguments:
            type: which edge to monitor
            flags: REQUEST_FLAG_* values (ORed)

        Usage::

            with gpio.Chip(0) as chip:
                line = chip.line(13)
                with line.monitor():
                    async for event in line:
                        print(event)
        """
        if self._state in _IN_USE:
            raise OSError("This line is already in use")
        self._state = _PRE_EV
        self._type = type
        self._flags = flags
        return self

    def _update(self):
        self._is_open()
        if gpio.lib.gpiod_line_update(self._line) == -1:
            raise OSError("unable to update state")

    def __iter__(self):
        raise RuntimeError("You need to use 'async for'")

    def __aenter__(self):
        raise RuntimeError("YOu need to use 'with'")

    def __aexit__(self):
        raise RuntimeError("YOu need to use 'with'")

    def __aiter__(self):
        if self._state != _IN_EV:
            raise RuntimeError("You need to call 'with LINE.monitor() / async for event in LINE'")
        return self

    async def __anext__(self):
        if self._state != _IN_EV:
            raise RuntimeError("wrong state")

        ev = gpio.ffi.new("struct gpiod_line_event*")
        fd = gpio.lib.gpiod_line_event_get_fd(self._line)
        if fd < 0:
            raise OSError("line is closed")
        await trio.hazmat.wait_readable(fd)
        self._is_open()
        r = gpio.lib.gpiod_line_event_read_fd(fd, ev)
        if r != 0:
            raise OSError("unable to read update")
        return Event(ev)

    async def aclose(self):
        """close the iterator."""
        pass


class Event:
    """Store a Pythonic representation of an event
    """

    def __init__(self, ev):
        if ev.event_type == gpio.EVENT_RISING_EDGE:
            self.value = 1
        elif ev.event_type == gpio.EVENT_FALLING_EDGE:
            self.value = 0
        else:
            raise RuntimeError("Unknown event type")
        self._ts_sec = ev.ts.tv_sec
        self._ts_nsec = ev.ts.tv_nsec

    @property
    def timestamp(self):
        """Return a (second,nanosecond) tuple for fast timestamping"""
        return (self._ts_sec, self._ts_nsec)

    @property
    def time(self):
        """Return the event's proper datetime"""
        return datetime.datetime.fromtimestamp(self._ts_sec + self._ts_nsec / 1000000000)

    def __repr__(self):
        return "<%s @%s>" % (self.value, self.time)