Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
from functools import partial
from operator import setitem
from typing import Awaitable, Callable, Mapping, Sequence, TypeVar, cast, overload

from anyio import create_task_group

from .race import _wait_and_call

__all__ = ("gather",)

T = TypeVar("T")


@overload
async def gather(funcs: Sequence[Callable[[], Awaitable[T]]]) -> Sequence[T]: ...


@overload
async def gather(
    funcs: Mapping[str, Callable[[], Awaitable[T]]],
) -> Mapping[str, T]: ...


async def gather(
    funcs: Mapping[str, Callable[[], Awaitable[T]]]
    | Sequence[Callable[[], Awaitable[T]]],
) -> Mapping[str, T] | Sequence[T]:
    """Run multiple async functions concurrently and wait for all of them to
    complete. The results are collected in a list or mapping, depending on the
    format of the input argument.

    Raises:
        ExceptionGroup: when an exception happens in one of the functions
    """
    if isinstance(funcs, Mapping):
        return await _gather_map(funcs)  # ty:ignore[invalid-argument-type]
    elif isinstance(funcs, Sequence):
        return await _gather_seq(funcs)
    else:
        raise TypeError("funcs must be a Mapping or a Sequence of callables")


async def _gather_map(
    funcs: Mapping[str, Callable[[], Awaitable[T]]],
) -> Mapping[str, T]:
    result: dict[str, T] = {}

    async with create_task_group() as tg:
        for key, func in funcs.items():
            set_result = partial(setitem, result, key)
            tg.start_soon(_wait_and_call, func, set_result)

    return result


async def _gather_seq(
    funcs: Sequence[Callable[[], Awaitable[T]]],
) -> Sequence[T]:
    result: list[T | None] = [None] * len(funcs)

    async with create_task_group() as nursery:
        for index, func in enumerate(funcs):
            set_result = partial(setitem, result, index)
            nursery.start_soon(_wait_and_call, func, set_result)

    return cast(Sequence[T], result)