Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
from collections.abc import Mapping
from functools import partial
from typing import Awaitable, Callable, TypeVar

from anyio import create_task_group

__all__ = ("race",)

T = TypeVar("T")
T2 = TypeVar("T2")


async def race(funcs: Mapping[str, Callable[[], Awaitable[T]]]) -> tuple[str, T]:
    """Run multiple async functions concurrently and wait for at least one of
    them to complete. Return the key corresponding to the function and the
    result of the function as well.

    Raises:
        ExceptionGroup: when an exception happens in one of the functions
    """
    holder: list[tuple[str, T]] = []

    async with create_task_group() as tg:
        cancel = tg.cancel_scope.cancel
        for key, func in funcs.items():
            set_result = partial(_cancel_and_set_result, cancel, holder, key)
            tg.start_soon(_wait_and_call, func, set_result)

    return holder[0]


def _cancel_and_set_result(
    cancel: Callable[[], None], holder: list[tuple[str, T]], key: str, value: T
) -> None:
    holder.append((key, value))
    cancel()


async def _wait_and_call(f1: Callable[[], Awaitable[T]], f2: Callable[[T], T2]) -> T2:
    """Call an async function f1() and wait for its result. Call synchronous
    function `f2()` with the result of `f1()` when `f1()` returns.

    Returns:
        the return value of f2
    """
    result = await f1()
    return f2(result)