# cython: language_level=3, embedsignature=True
import asyncio
cimport cython
from .includes.debug cimport UVLOOP_DEBUG
from .includes cimport uv
from .includes cimport system
from .includes.python cimport PyMem_RawMalloc, PyMem_RawFree, \
PyMem_RawCalloc, PyMem_RawRealloc, \
PyUnicode_EncodeFSDefault, \
PyErr_SetInterrupt, \
PyOS_AfterFork, \
_PyImport_AcquireLock, \
_PyImport_ReleaseLock, \
_Py_RestoreSignals
from libc.stdint cimport uint64_t
from libc.string cimport memset, strerror, memcpy
from libc cimport errno
from cpython cimport PyObject
from cpython cimport PyErr_CheckSignals, PyErr_Occurred
from cpython cimport PyThread_get_thread_ident
from cpython cimport Py_INCREF, Py_DECREF, Py_XDECREF, Py_XINCREF
from cpython cimport PyObject_GetBuffer, PyBuffer_Release, PyBUF_SIMPLE, \
Py_buffer, PyBytes_AsString, PyBytes_CheckExact, \
Py_SIZE, PyBytes_AS_STRING
from cpython cimport PyErr_CheckSignals
from . import _noop
include "includes/consts.pxi"
include "includes/stdlib.pxi"
include "errors.pyx"
cdef _is_sock_stream(sock_type):
if SOCK_NONBLOCK == -1:
return sock_type == uv.SOCK_STREAM
else:
# Linux's socket.type is a bitmask that can include extra info
# about socket (like SOCK_NONBLOCK bit), therefore we can't do simple
# `sock_type == socket.SOCK_STREAM`, see
# https://github.com/torvalds/linux/blob/v4.13/include/linux/net.h#L77
# for more details.
return (sock_type & 0xF) == uv.SOCK_STREAM
cdef _is_sock_dgram(sock_type):
if SOCK_NONBLOCK == -1:
return sock_type == uv.SOCK_DGRAM
else:
# Read the comment in `_is_sock_stream`.
return (sock_type & 0xF) == uv.SOCK_DGRAM
cdef isfuture(obj):
if aio_isfuture is None:
return isinstance(obj, aio_Future)
else:
return aio_isfuture(obj)
cdef inline socket_inc_io_ref(sock):
if isinstance(sock, socket_socket):
sock._io_refs += 1
cdef inline socket_dec_io_ref(sock):
if isinstance(sock, socket_socket):
sock._decref_socketios()
@cython.no_gc_clear
cdef class Loop:
def __cinit__(self):
cdef int err
# Install PyMem* memory allocators if they aren't installed yet.
__install_pymem()
# Install pthread_atfork handlers
__install_atfork()
self.uvloop = <uv.uv_loop_t*> \
PyMem_RawMalloc(sizeof(uv.uv_loop_t))
if self.uvloop is NULL:
raise MemoryError()
self.slow_callback_duration = 0.1
self._closed = 0
self._debug = 0
self._thread_is_main = 0
self._thread_id = 0
self._running = 0
self._stopping = 0
self._transports = weakref_WeakValueDictionary()
# Used to keep a reference (and hence keep the fileobj alive)
# for as long as its registered by add_reader or add_writer.
# This is how the selector module and hence asyncio behaves.
self._fd_to_reader_fileobj = {}
self._fd_to_writer_fileobj = {}
self._timers = set()
self._polls = {}
self._recv_buffer_in_use = 0
err = uv.uv_loop_init(self.uvloop)
if err < 0:
raise convert_error(err)
self.uvloop.data = <void*> self
self._init_debug_fields()
self.active_process_handler = None
self._last_error = None
self._task_factory = None
self._exception_handler = None
self._default_executor = None
self._queued_streams = set()
self._ready = col_deque()
self._ready_len = 0
self.handler_async = UVAsync.new(
self, <method_t>self._on_wake, self)
self.handler_idle = UVIdle.new(
self,
new_MethodHandle(
self, "loop._on_idle", <method_t>self._on_idle, self))
# Needed to call `UVStream._exec_write` for writes scheduled
# during `Protocol.data_received`.
self.handler_check__exec_writes = UVCheck.new(
self,
new_MethodHandle(
self, "loop._exec_queued_writes",
<method_t>self._exec_queued_writes, self))
self._ssock = self._csock = None
self._signal_handlers = {}
self._listening_signals = False
self._coroutine_wrapper_set = False
if hasattr(sys, 'get_asyncgen_hooks'):
# Python >= 3.6
# A weak set of all asynchronous generators that are
# being iterated by the loop.
self._asyncgens = weakref_WeakSet()
else:
self._asyncgens = None
# Set to True when `loop.shutdown_asyncgens` is called.
self._asyncgens_shutdown_called = False
self._servers = set()
def __init__(self):
self.set_debug((not sys_ignore_environment
and bool(os_environ.get('PYTHONASYNCIODEBUG'))))
def __dealloc__(self):
if self._running == 1:
raise RuntimeError('deallocating a running event loop!')
if self._closed == 0:
aio_logger.error("deallocating an open event loop")
return
PyMem_RawFree(self.uvloop)
self.uvloop = NULL
cdef _init_debug_fields(self):
self._debug_cc = bool(UVLOOP_DEBUG)
if UVLOOP_DEBUG:
self._debug_handles_current = col_Counter()
self._debug_handles_closed = col_Counter()
self._debug_handles_total = col_Counter()
else:
self._debug_handles_current = None
self._debug_handles_closed = None
self._debug_handles_total = None
self._debug_uv_handles_total = 0
self._debug_uv_handles_freed = 0
self._debug_stream_read_cb_total = 0
self._debug_stream_read_eof_total = 0
self._debug_stream_read_errors_total = 0
self._debug_stream_read_cb_errors_total = 0
self._debug_stream_read_eof_cb_errors_total = 0
self._debug_stream_shutdown_errors_total = 0
self._debug_stream_listen_errors_total = 0
self._debug_stream_write_tries = 0
self._debug_stream_write_errors_total = 0
self._debug_stream_write_ctx_total = 0
self._debug_stream_write_ctx_cnt = 0
self._debug_stream_write_cb_errors_total = 0
self._debug_cb_handles_total = 0
self._debug_cb_handles_count = 0
self._debug_cb_timer_handles_total = 0
self._debug_cb_timer_handles_count = 0
self._poll_read_events_total = 0
self._poll_read_cb_errors_total = 0
self._poll_write_events_total = 0
self._poll_write_cb_errors_total = 0
self._sock_try_write_total = 0
self._debug_exception_handler_cnt = 0
cdef _setup_signals(self):
if self._listening_signals:
return
self._ssock, self._csock = socket_socketpair()
self._ssock.setblocking(False)
self._csock.setblocking(False)
try:
signal_set_wakeup_fd(self._csock.fileno())
except (OSError, ValueError):
# Not the main thread
self._ssock.close()
self._csock.close()
self._ssock = self._csock = None
return
self._listening_signals = True
cdef _recv_signals_start(self):
if self._ssock is None:
self._setup_signals()
if self._ssock is None:
# Not the main thread.
return
self._add_reader(
self._ssock,
new_MethodHandle(
self,
"Loop._read_from_self",
<method_t>self._read_from_self,
self))
cdef _recv_signals_stop(self):
if self._ssock is None:
return
self._remove_reader(self._ssock)
cdef _shutdown_signals(self):
if not self._listening_signals:
return
for sig in list(self._signal_handlers):
self.remove_signal_handler(sig)
if not self._listening_signals:
# `remove_signal_handler` will call `_shutdown_signals` when
# removing last signal handler.
return
try:
signal_set_wakeup_fd(-1)
except (ValueError, OSError) as exc:
aio_logger.info('set_wakeup_fd(-1) failed: %s', exc)
self._remove_reader(self._ssock)
self._ssock.close()
self._csock.close()
self._ssock = None
self._csock = None
self._listening_signals = False
cdef _read_from_self(self):
while True:
try:
data = self._ssock.recv(4096)
if not data:
break
self._process_self_data(data)
except InterruptedError:
continue
except BlockingIOError:
break
cdef _process_self_data(self, data):
for signum in data:
if not signum:
# ignore null bytes written by _write_to_self()
continue
self._handle_signal(signum)
cdef _handle_signal(self, sig):
cdef Handle handle
try:
handle = <Handle>(self._signal_handlers[sig])
except KeyError:
handle = None
if handle is None:
# Some signal that we aren't listening through
# add_signal_handler. Invoke CPython eval loop
# to let it being processed.
PyErr_CheckSignals()
_noop.noop()
return
if handle._cancelled:
self.remove_signal_handler(sig) # Remove it properly.
else:
self._call_soon_handle(handle)
self.handler_async.send()
cdef _on_wake(self):
if (self._ready_len > 0 or self._stopping) \
and not self.handler_idle.running:
self.handler_idle.start()
cdef _on_idle(self):
cdef:
int i, ntodo
object popleft = self._ready.popleft
Handle handler
ntodo = len(self._ready)
if self._debug:
Loading ...