Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

agriconnect / uvloop   python

Repository URL to install this package:

Version: 0.9.1 

/ handles / basetransport.pyx

@cython.no_gc_clear
cdef class UVBaseTransport(UVSocketHandle):

    def __cinit__(self):
        # Flow control
        self._high_water = FLOW_CONTROL_HIGH_WATER
        self._low_water = FLOW_CONTROL_LOW_WATER

        self._protocol = None
        self._protocol_connected = 0
        self._protocol_paused = 0
        self._protocol_data_received = None

        self._server = None
        self._waiter = None
        self._extra_info = None

        self._conn_lost = 0

        self._closing = 0

    cdef size_t _get_write_buffer_size(self):
        return 0

    cdef inline _schedule_call_connection_made(self):
        self._loop._call_soon_handle(
            new_MethodHandle(self._loop,
                             "UVTransport._call_connection_made",
                             <method_t>self._call_connection_made,
                             self))

    cdef inline _schedule_call_connection_lost(self, exc):
        self._loop._call_soon_handle(
            new_MethodHandle1(self._loop,
                              "UVTransport._call_connection_lost",
                              <method1_t>self._call_connection_lost,
                              self, exc))

    cdef _fatal_error(self, exc, throw, reason=None):
        # Overload UVHandle._fatal_error

        self._force_close(exc)

        if not isinstance(exc, (BrokenPipeError,
                                ConnectionResetError,
                                ConnectionAbortedError)):

            if throw or self._loop is None:
                raise exc

            msg = 'Fatal error on transport {}'.format(
                    self.__class__.__name__)
            if reason is not None:
                msg = '{} ({})'.format(msg, reason)

            self._loop.call_exception_handler({
                'message': msg,
                'exception': exc,
                'transport': self,
                'protocol': self._protocol,
            })

    cdef inline _set_write_buffer_limits(self, int high=-1, int low=-1):
        if high == -1:
            if low == -1:
                high = FLOW_CONTROL_HIGH_WATER
            else:
                high = FLOW_CONTROL_LOW_WATER

        if low == -1:
            low = high // 4

        if not high >= low >= 0:
            raise ValueError('high (%r) must be >= low (%r) must be >= 0' %
                             (high, low))

        self._high_water = high
        self._low_water = low

        self._maybe_pause_protocol()

    cdef inline _maybe_pause_protocol(self):
        cdef:
            size_t size = self._get_write_buffer_size()

        if size <= self._high_water:
            return

        if not self._protocol_paused:
            self._protocol_paused = 1
            try:
                self._protocol.pause_writing()
            except Exception as exc:
                self._loop.call_exception_handler({
                    'message': 'protocol.pause_writing() failed',
                    'exception': exc,
                    'transport': self,
                    'protocol': self._protocol,
                })

    cdef inline _maybe_resume_protocol(self):
        cdef:
            size_t size = self._get_write_buffer_size()

        if self._protocol_paused and size <= self._low_water:
            self._protocol_paused = 0
            try:
                self._protocol.resume_writing()
            except Exception as exc:
                self._loop.call_exception_handler({
                    'message': 'protocol.resume_writing() failed',
                    'exception': exc,
                    'transport': self,
                    'protocol': self._protocol,
                })

    cdef _wakeup_waiter(self):
        if self._waiter is not None:
            if not self._waiter.cancelled():
                if not self._is_alive():
                    self._waiter.set_exception(
                        RuntimeError(
                            'closed Transport handle and unset waiter'))
                else:
                    self._waiter.set_result(True)
            self._waiter = None

    cdef _call_connection_made(self):
        cdef Py_ssize_t _loop_ready_len
        if self._protocol is None:
            raise RuntimeError(
                'protocol is not set, cannot call connection_made()')

        # We use `_is_alive()` and not `_closing`, because we call
        # `transport._close()` in `loop.create_connection()` if an
        # exception happens during `await waiter`.
        if not self._is_alive():
            # A connection waiter can be cancelled between
            # 'await loop.create_connection()' and
            # `_schedule_call_connection_made` and
            # the actual `_call_connection_made`.
            self._wakeup_waiter()
            return

        _loop_ready_len = self._loop._ready_len

        # Set _protocol_connected to 1 before calling "connection_made":
        # if transport is aborted or closed, "connection_lost" will
        # still be scheduled.
        self._protocol_connected = 1

        try:
            self._protocol.connection_made(self)
        except:
            self._wakeup_waiter()
            raise

        if not self._is_alive():
            # This might happen when "transport.abort()" is called
            # from "Protocol.connection_made".
            self._wakeup_waiter()
            return

        if _loop_ready_len == self._loop._ready_len:
            # No new calls were scheduled by 'protocol.connection_made',
            # so it's safe to start reading right now.
            self._start_reading()
        else:
            # In asyncio we'd just call start_reading() right after we
            # call protocol.connection_made().  However, that breaks
            # SSLProtocol in uvloop, which does some initialization
            # with loop.call_soon in its connection_made.  It appears,
            # that uvloop can call protocol.data_received() *before* it
            # calls the handlers that connection_made set up.
            # That's why we're using another call_soon here.
            self._loop._call_soon_handle(
                new_MethodHandle(self._loop,
                                 "UVTransport._start_reading",
                                 <method_t>self._start_reading,
                                 self))

        self._wakeup_waiter()

    cdef _call_connection_lost(self, exc):
        if self._waiter is not None:
            # This shouldn't ever happen!
            self._loop.call_exception_handler({
                'message': 'waiter is not None in {}._call_connection_lost'.
                    format(self.__class__.__name__)
            })
            if not self._waiter.done():
                self._waiter.set_exception(exc)
            self._waiter = None

        if self._closed:
            # The handle is closed -- likely, _call_connection_lost
            # was already called before.
            return

        try:
            if self._protocol_connected:
                self._protocol.connection_lost(exc)
        finally:
            self._protocol = None
            self._protocol_data_received = None

            self._close()

            server = self._server
            if server is not None:
                (<Server>server)._detach()
                self._server = None

    cdef inline _set_server(self, Server server):
        self._server = server
        (<Server>server)._attach()

    cdef inline _set_waiter(self, object waiter):
        if waiter is not None and not isfuture(waiter):
            raise TypeError(
                'invalid waiter object {!r}, expected asyncio.Future'.
                    format(waiter))

        self._waiter = waiter

    cdef inline _set_protocol(self, object protocol):
        self._protocol = protocol
        # Store a reference to the bound method directly
        try:
            self._protocol_data_received = protocol.data_received
        except AttributeError:
            pass

    cdef inline _init_protocol(self):
        self._loop._track_transport(self)
        if self._protocol is None:
            raise RuntimeError('invalid _init_protocol call')
        self._schedule_call_connection_made()

    cdef inline _add_extra_info(self, str name, object obj):
        if self._extra_info is None:
            self._extra_info = {}
        self._extra_info[name] = obj

    cdef bint _is_reading(self):
        raise NotImplementedError

    cdef _start_reading(self):
        raise NotImplementedError

    cdef _stop_reading(self):
        raise NotImplementedError

    # === Public API ===

    property _paused:
        # Used by SSLProto.  Might be removed in the future.
        def __get__(self):
            return bool(not self._is_reading())

    def get_protocol(self):
        return self._protocol

    def set_protocol(self, protocol):
        self._set_protocol(protocol)

    def _force_close(self, exc):
        # Used by SSLProto.  Might be removed in the future.
        if self._conn_lost or self._closed:
            return
        if not self._closing:
            self._closing = 1
            self._stop_reading()
        self._conn_lost += 1
        self._schedule_call_connection_lost(exc)

    def abort(self):
        self._force_close(None)

    def close(self):
        if self._closing or self._closed:
            return

        self._closing = 1
        self._stop_reading()

        if not self._get_write_buffer_size():
            # The write buffer is empty
            self._conn_lost += 1
            self._schedule_call_connection_lost(None)

    def is_closing(self):
        return self._closing

    def get_write_buffer_size(self):
        return self._get_write_buffer_size()

    def set_write_buffer_limits(self, high=None, low=None):
        self._ensure_alive()

        if high is None:
            high = -1
        if low is None:
            low = -1

        self._set_write_buffer_limits(high, low)

    def get_write_buffer_limits(self):
        return (self._low_water, self._high_water)

    def get_extra_info(self, name, default=None):
        if self._extra_info is not None and name in self._extra_info:
            return self._extra_info[name]
        if name == 'socket':
            return self._get_socket()
        if name == 'sockname':
            return self._get_socket().getsockname()
        if name == 'peername':
            try:
                return self._get_socket().getpeername()
            except socket_error:
                return default
        return default