Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

edgify / rook   python

Repository URL to install this package:

Version: 0.1.176 

/ com_ws / selectable_event.py

import socket

import rook.com_ws.socketpair_compat  # do not remove - adds socket.socketpair on Windows lgtm[py/unused-import]
from rook.com_ws import poll_select
from threading import Event


class SelectableEvent(object):
    """
    Uses a socketpair to signal when the event is ready. This way, it can be used in a select.
    """
    def __init__(self):
        self._event = Event()
        self._readsocket, self._writesocket = socket.socketpair()
        self._readwaiter = poll_select.Waiter([self._readsocket], [], [])

    def close(self):
        self._readsocket.close()
        self._writesocket.close()

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()

    def __del__(self):
        self.close()

    def fileno(self):
        return self._readsocket.fileno()

    def is_set(self):
        return self._event.is_set()

    def set(self):
        if not self.is_set():
            self._event.set()
            self._writesocket.send(b'1')

    def wait(self, timeout=None):
        if self._event.is_set():
            return True
        rfds, _, _ = self._readwaiter.wait(timeout)
        if len(rfds) != 0 and self._event.is_set():
            return True

        return False