Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
uvloop / handles / udp.pyx
Size: Mime:
@cython.no_gc_clear
cdef class UDPTransport(UVBaseTransport):

    def __cinit__(self):
        self.sock = None
        self.poll = None
        self.buffer = col_deque()
        self._has_handle = 0

    cdef _init(self, Loop loop, object sock, object r_addr):
        self._start_init(loop)
        try:
            # It's important to incref the socket in case it
            # was created outside of uvloop,
            # i.e. `look.create_datagram_endpoint(sock=sock)`.
            socket_inc_io_ref(sock)

            self.sock = sock
            self.address = r_addr
            self.poll = UVPoll.new(loop, sock.fileno())
            self._finish_init()
        except:
            self._abort_init()
            raise

    cdef size_t _get_write_buffer_size(self):
        cdef int size = 0
        for data, addr in self.buffer:
            size += len(data)
        return size

    cdef _fileno(self):
        return self.sock.fileno()

    cdef bint _is_reading(self):
        return self.poll is not None and self.poll.is_reading()

    cdef _start_reading(self):
        self._ensure_alive()

        self.poll.start_reading(
            new_MethodHandle(
                self._loop,
                "UDPTransport._on_read_ready",
                <method_t>self._on_read_ready,
                self))

    cdef _stop_reading(self):
        self._ensure_alive()
        self.poll.stop_reading()

    cdef _on_read_ready(self):
        if self._conn_lost:
            return
        try:
            data, addr = self.sock.recvfrom(UV_STREAM_RECV_BUF_SIZE)
        except (BlockingIOError, InterruptedError):
            pass
        except OSError as exc:
            self._protocol.error_received(exc)
        except Exception as exc:
            self._fatal_error(exc, 'Fatal read error on datagram transport')
        else:
            self._protocol.datagram_received(data, addr)

    cdef _on_write_ready(self):
        while self.buffer:
            data, addr = self.buffer.popleft()
            try:
                if self.address:
                    self.sock.send(data)
                else:
                    self.sock.sendto(data, addr)
            except (BlockingIOError, InterruptedError):
                self.buffer.appendleft((data, addr))  # Try again later.
                break
            except OSError as exc:
                self._protocol.error_received(exc)
                return
            except Exception as exc:
                self._fatal_error(
                    exc, 'Fatal write error on datagram transport')
                return

        self._maybe_resume_protocol()  # May append to buffer.
        if not self.buffer:
            self.poll.stop_writing()
            if self._closing:
                self._call_connection_lost(None)

    cdef _new_socket(self):
        return PseudoSocket(self.sock.family, self.sock.type,
                            self.sock.proto, self.sock.fileno())

    @staticmethod
    cdef UDPTransport new(Loop loop, object sock, object r_addr):
        cdef UDPTransport udp
        udp = UDPTransport.__new__(UDPTransport)
        udp._init(loop, sock, r_addr)
        return udp

    cdef _dealloc_impl(self):
        if self._closed == 0:
            self._warn_unclosed()

        # It is unsafe to call `self.poll._close()` here as
        # we might be at the stage where all CPython objects
        # are being freed, and `self.poll` points to some
        # zombie Python object.  So we do nothing.

        UVHandle._dealloc_impl(self)

    cdef _close(self):
        if self.poll is not None:
            self.poll._close()
            self.poll = None

        if self.sock is not None:
            try:
                socket_dec_io_ref(self.sock)
                self.sock.close()
            finally:
                self.sock = None

        if UVLOOP_DEBUG:
            self._loop._debug_handles_closed.update([
                self.__class__.__name__])

        UVSocketHandle._close(<UVSocketHandle>self)

    def sendto(self, data, addr=None):
        if not isinstance(data, (bytes, bytearray, memoryview)):
            raise TypeError('data argument must be a bytes-like object, '
                            'not {!r}'.format(type(data).__name__))
        if not data:
            return

        if self.address and addr not in (None, self.address):
            raise ValueError(
                'Invalid address: must be None or {}'.format(self.address))

        if addr is not None:
            addrinfo = __static_getaddrinfo_pyaddr(
                addr[0], addr[1],
                uv.AF_UNSPEC, self.sock.type, self.sock.proto, 0)
            if addrinfo is None:
                raise ValueError(
                    'UDP.sendto(): address {!r} requires a DNS lookup'.format(
                        addr))
            if addrinfo[0] != self.sock.family:
                raise ValueError(
                    'UDP.sendto(): {!r} socket family mismatch'.format(
                        addr))

        if self._conn_lost and self._address:
            if self._conn_lost >= LOG_THRESHOLD_FOR_CONNLOST_WRITES:
                aio_logger.warning('socket.send() raised exception.')
            self._conn_lost += 1
            return

        if not self.buffer:
            # Attempt to send it right away first.
            try:
                if self.address:
                    self.sock.send(data)
                else:
                    self.sock.sendto(data, addr)
                return
            except (BlockingIOError, InterruptedError):
                self.poll.start_writing(
                    new_MethodHandle(
                        self._loop,
                        "UDPTransport._on_write_ready",
                        <method_t>self._on_write_ready,
                        self))
            except OSError as exc:
                self._protocol.error_received(exc)
                return
            except Exception as exc:
                self._fatal_error(
                    exc, 'Fatal write error on datagram transport')
                return

        # Ensure that what we buffer is immutable.
        self.buffer.append((bytes(data), addr))
        self._maybe_pause_protocol()