Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

agriconnect / uvloop   python

Repository URL to install this package:

Version: 0.9.1 

/ loop.pyx

# cython: language_level=3, embedsignature=True

import asyncio
cimport cython

from .includes.debug cimport UVLOOP_DEBUG
from .includes cimport uv
from .includes cimport system
from .includes.python cimport PyMem_RawMalloc, PyMem_RawFree, \
                              PyMem_RawCalloc, PyMem_RawRealloc, \
                              PyUnicode_EncodeFSDefault, \
                              PyErr_SetInterrupt, \
                              PyOS_AfterFork, \
                              _PyImport_AcquireLock, \
                              _PyImport_ReleaseLock, \
                              _Py_RestoreSignals

from libc.stdint cimport uint64_t
from libc.string cimport memset, strerror, memcpy
from libc cimport errno

from cpython cimport PyObject
from cpython cimport PyErr_CheckSignals, PyErr_Occurred
from cpython cimport PyThread_get_thread_ident
from cpython cimport Py_INCREF, Py_DECREF, Py_XDECREF, Py_XINCREF
from cpython cimport PyObject_GetBuffer, PyBuffer_Release, PyBUF_SIMPLE, \
                     Py_buffer, PyBytes_AsString, PyBytes_CheckExact, \
                     Py_SIZE, PyBytes_AS_STRING

from cpython cimport PyErr_CheckSignals

from . import _noop


include "includes/consts.pxi"
include "includes/stdlib.pxi"

include "errors.pyx"


cdef _is_sock_stream(sock_type):
    if SOCK_NONBLOCK == -1:
        return sock_type == uv.SOCK_STREAM
    else:
        # Linux's socket.type is a bitmask that can include extra info
        # about socket (like SOCK_NONBLOCK bit), therefore we can't do simple
        # `sock_type == socket.SOCK_STREAM`, see
        # https://github.com/torvalds/linux/blob/v4.13/include/linux/net.h#L77
        # for more details.
        return (sock_type & 0xF) == uv.SOCK_STREAM


cdef _is_sock_dgram(sock_type):
    if SOCK_NONBLOCK == -1:
        return sock_type == uv.SOCK_DGRAM
    else:
        # Read the comment in `_is_sock_stream`.
        return (sock_type & 0xF) == uv.SOCK_DGRAM


cdef isfuture(obj):
    if aio_isfuture is None:
        return isinstance(obj, aio_Future)
    else:
        return aio_isfuture(obj)


cdef inline socket_inc_io_ref(sock):
    if isinstance(sock, socket_socket):
        sock._io_refs += 1


cdef inline socket_dec_io_ref(sock):
    if isinstance(sock, socket_socket):
        sock._decref_socketios()


@cython.no_gc_clear
cdef class Loop:
    def __cinit__(self):
        cdef int err

        # Install PyMem* memory allocators if they aren't installed yet.
        __install_pymem()

        # Install pthread_atfork handlers
        __install_atfork()

        self.uvloop = <uv.uv_loop_t*> \
                            PyMem_RawMalloc(sizeof(uv.uv_loop_t))
        if self.uvloop is NULL:
            raise MemoryError()

        self.slow_callback_duration = 0.1

        self._closed = 0
        self._debug = 0
        self._thread_is_main = 0
        self._thread_id = 0
        self._running = 0
        self._stopping = 0

        self._transports = weakref_WeakValueDictionary()

        # Used to keep a reference (and hence keep the fileobj alive)
        # for as long as its registered by add_reader or add_writer.
        # This is how the selector module and hence asyncio behaves.
        self._fd_to_reader_fileobj = {}
        self._fd_to_writer_fileobj = {}

        self._timers = set()
        self._polls = {}

        self._recv_buffer_in_use = 0

        err = uv.uv_loop_init(self.uvloop)
        if err < 0:
            raise convert_error(err)
        self.uvloop.data = <void*> self

        self._init_debug_fields()

        self.active_process_handler = None

        self._last_error = None

        self._task_factory = None
        self._exception_handler = None
        self._default_executor = None

        self._queued_streams = set()
        self._ready = col_deque()
        self._ready_len = 0

        self.handler_async = UVAsync.new(
            self, <method_t>self._on_wake, self)

        self.handler_idle = UVIdle.new(
            self,
            new_MethodHandle(
                self, "loop._on_idle", <method_t>self._on_idle, self))

        # Needed to call `UVStream._exec_write` for writes scheduled
        # during `Protocol.data_received`.
        self.handler_check__exec_writes = UVCheck.new(
            self,
            new_MethodHandle(
                self, "loop._exec_queued_writes",
                <method_t>self._exec_queued_writes, self))

        self._ssock = self._csock = None
        self._signal_handlers = {}
        self._listening_signals = False

        self._coroutine_wrapper_set = False

        if hasattr(sys, 'get_asyncgen_hooks'):
            # Python >= 3.6
            # A weak set of all asynchronous generators that are
            # being iterated by the loop.
            self._asyncgens = weakref_WeakSet()
        else:
            self._asyncgens = None

        # Set to True when `loop.shutdown_asyncgens` is called.
        self._asyncgens_shutdown_called = False

        self._servers = set()

    def __init__(self):
        self.set_debug((not sys_ignore_environment
                        and bool(os_environ.get('PYTHONASYNCIODEBUG'))))

    def __dealloc__(self):
        if self._running == 1:
            raise RuntimeError('deallocating a running event loop!')
        if self._closed == 0:
            aio_logger.error("deallocating an open event loop")
            return
        PyMem_RawFree(self.uvloop)
        self.uvloop = NULL

    cdef _init_debug_fields(self):
        self._debug_cc = bool(UVLOOP_DEBUG)

        if UVLOOP_DEBUG:
            self._debug_handles_current = col_Counter()
            self._debug_handles_closed = col_Counter()
            self._debug_handles_total = col_Counter()
        else:
            self._debug_handles_current = None
            self._debug_handles_closed = None
            self._debug_handles_total = None

        self._debug_uv_handles_total = 0
        self._debug_uv_handles_freed = 0

        self._debug_stream_read_cb_total = 0
        self._debug_stream_read_eof_total = 0
        self._debug_stream_read_errors_total = 0
        self._debug_stream_read_cb_errors_total = 0
        self._debug_stream_read_eof_cb_errors_total = 0

        self._debug_stream_shutdown_errors_total = 0
        self._debug_stream_listen_errors_total = 0

        self._debug_stream_write_tries = 0
        self._debug_stream_write_errors_total = 0
        self._debug_stream_write_ctx_total = 0
        self._debug_stream_write_ctx_cnt = 0
        self._debug_stream_write_cb_errors_total = 0

        self._debug_cb_handles_total = 0
        self._debug_cb_handles_count = 0

        self._debug_cb_timer_handles_total = 0
        self._debug_cb_timer_handles_count = 0

        self._poll_read_events_total = 0
        self._poll_read_cb_errors_total = 0
        self._poll_write_events_total = 0
        self._poll_write_cb_errors_total = 0

        self._sock_try_write_total = 0

        self._debug_exception_handler_cnt = 0

    cdef _setup_signals(self):
        if self._listening_signals:
            return

        self._ssock, self._csock = socket_socketpair()
        self._ssock.setblocking(False)
        self._csock.setblocking(False)
        try:
            signal_set_wakeup_fd(self._csock.fileno())
        except (OSError, ValueError):
            # Not the main thread
            self._ssock.close()
            self._csock.close()
            self._ssock = self._csock = None
            return

        self._listening_signals = True

    cdef _recv_signals_start(self):
        if self._ssock is None:
            self._setup_signals()
            if self._ssock is None:
                # Not the main thread.
                return

        self._add_reader(
            self._ssock,
            new_MethodHandle(
                self,
                "Loop._read_from_self",
                <method_t>self._read_from_self,
                self))

    cdef _recv_signals_stop(self):
        if self._ssock is None:
            return

        self._remove_reader(self._ssock)

    cdef _shutdown_signals(self):
        if not self._listening_signals:
            return

        for sig in list(self._signal_handlers):
            self.remove_signal_handler(sig)

        if not self._listening_signals:
            # `remove_signal_handler` will call `_shutdown_signals` when
            # removing last signal handler.
            return

        try:
            signal_set_wakeup_fd(-1)
        except (ValueError, OSError) as exc:
            aio_logger.info('set_wakeup_fd(-1) failed: %s', exc)

        self._remove_reader(self._ssock)
        self._ssock.close()
        self._csock.close()
        self._ssock = None
        self._csock = None

        self._listening_signals = False

    cdef _read_from_self(self):
        while True:
            try:
                data = self._ssock.recv(4096)
                if not data:
                    break
                self._process_self_data(data)
            except InterruptedError:
                continue
            except BlockingIOError:
                break

    cdef _process_self_data(self, data):
        for signum in data:
            if not signum:
                # ignore null bytes written by _write_to_self()
                continue
            self._handle_signal(signum)

    cdef _handle_signal(self, sig):
        cdef Handle handle

        try:
            handle = <Handle>(self._signal_handlers[sig])
        except KeyError:
            handle = None

        if handle is None:
            # Some signal that we aren't listening through
            # add_signal_handler.  Invoke CPython eval loop
            # to let it being processed.
            PyErr_CheckSignals()
            _noop.noop()
            return

        if handle._cancelled:
            self.remove_signal_handler(sig)  # Remove it properly.
        else:
            self._call_soon_handle(handle)
            self.handler_async.send()

    cdef _on_wake(self):
        if (self._ready_len > 0 or self._stopping) \
                            and not self.handler_idle.running:
            self.handler_idle.start()

    cdef _on_idle(self):
        cdef:
            int i, ntodo
            object popleft = self._ready.popleft
            Handle handler

        ntodo = len(self._ready)
        if self._debug:
Loading ...