Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
anyio / _core / _exceptions.py
Size: Mime:
from traceback import format_exception
from typing import List


class BrokenResourceError(Exception):
    """
    Raised when trying to use a resource that has been rendered unusable due to external causes
    (e.g. a send stream whose peer has disconnected).
    """


class BrokenWorkerProcess(Exception):
    """
    Raised by :func:`run_sync_in_process` if the worker process terminates abruptly or otherwise
    misbehaves.
    """


class BusyResourceError(Exception):
    """Raised when two tasks are trying to read from or write to the same resource concurrently."""

    def __init__(self, action: str):
        super().__init__(f'Another task is already {action} this resource')


class ClosedResourceError(Exception):
    """Raised when trying to use a resource that has been closed."""


class DelimiterNotFound(Exception):
    """
    Raised during :meth:`~anyio.streams.buffered.BufferedByteReceiveStream.receive_until` if the
    maximum number of bytes has been read without the delimiter being found.
    """

    def __init__(self, max_bytes: int) -> None:
        super().__init__(f'The delimiter was not found among the first {max_bytes} bytes')


class EndOfStream(Exception):
    """Raised when trying to read from a stream that has been closed from the other end."""


class ExceptionGroup(BaseException):
    """
    Raised when multiple exceptions have been raised in a task group.

    :var ~typing.Sequence[BaseException] exceptions: the sequence of exceptions raised together
    """

    SEPARATOR = '----------------------------\n'

    exceptions: List[BaseException]

    def __str__(self) -> str:
        tracebacks = [''.join(format_exception(type(exc), exc, exc.__traceback__))
                      for exc in self.exceptions]
        return f'{len(self.exceptions)} exceptions were raised in the task group:\n' \
               f'{self.SEPARATOR}{self.SEPARATOR.join(tracebacks)}'

    def __repr__(self) -> str:
        exception_reprs = ', '.join(repr(exc) for exc in self.exceptions)
        return f'<{self.__class__.__name__}: {exception_reprs}>'


class IncompleteRead(Exception):
    """
    Raised during :meth:`~anyio.streams.buffered.BufferedByteReceiveStream.receive_exactly` or
    :meth:`~anyio.streams.buffered.BufferedByteReceiveStream.receive_until` if the
    connection is closed before the requested amount of bytes has been read.
    """

    def __init__(self) -> None:
        super().__init__('The stream was closed before the read operation could be completed')


class TypedAttributeLookupError(LookupError):
    """
    Raised by :meth:`~anyio.TypedAttributeProvider.extra` when the given typed attribute is not
    found and no default value has been given.
    """


class WouldBlock(Exception):
    """Raised by ``X_nowait`` functions if ``X()`` would block."""