Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
flockwave-async / src / flockwave / concurrency / scheduler.py
Size: Mime:
"""Simple scheduler for starting asynchronous tasks in the future."""

from dataclasses import dataclass, field
from datetime import datetime
from functools import partial
from heapq import heappop, heappush
from math import inf
from time import time as posix_time
from typing import (
    Any,
    Awaitable,
    Callable,
    Generic,
    TypeVar,
    cast,
)
from warnings import warn

from outcome import Error, Outcome, acapture
from trio import (
    TASK_STATUS_IGNORED,
    CancelScope,
    Event,
    Nursery,
    open_nursery,
    sleep_forever,
)
from trio import (
    current_time as trio_time,
)

__all__ = ("Job", "JobCancelled", "LateSubmissionError", "Scheduler")

T = TypeVar("T")


class JobCancelled(RuntimeError):
    """Error thrown when trying to retrieve the result of a job that has been
    cancelled.
    """

    pass


class LateSubmissionError(RuntimeError):
    """Error thrown when trying to schedule a job to a timestamp that is in the
    past and the scheduler is set up to disallow such submissions.
    """

    pass


@dataclass
class Job(Generic[T]):
    """A single job in the scheduler."""

    func: Callable[[], Awaitable[T]]
    """The sync or async function of the job."""

    allow_late_start: bool = field(default=False)
    """Whether the job is allowed to start even if the submission time has
    passed.
    """

    outcome: Outcome | None = None
    """The result of the job"""

    _cancel_scope: CancelScope = field(default_factory=CancelScope)
    """The cancel scope of the job that can be used to cancel it when it is
    already running.
    """

    _running: bool = False
    """Whether the job is running."""

    _completed: Event = field(default_factory=Event)
    """Event that is triggered when the job has finished."""

    @property
    def completed(self) -> bool:
        """Returns whether the job has finished."""
        return self._completed.is_set()

    @property
    def running(self) -> bool:
        """Returns whether the job is running."""
        return self._running

    def _cancel(self) -> None:
        """Cancels the job if it has not completed yet. No-op if the job has
        already finished.
        """
        if self.outcome is not None:
            # Job was already executed, do nothing
            return

        if self._running:
            self._cancel_scope.cancel()

        self._record_cancellation()

    async def _run(self) -> None:
        """Executes the job and stores its result when done."""
        if self.outcome is not None:
            raise RuntimeError("The job was already executed")
        elif self._running:
            raise RuntimeError("The job is already running")

        self._running = True
        with self._cancel_scope:
            self._set_outcome(await acapture(self.func))

    def _record_cancellation(self) -> None:
        self._set_outcome(Error(JobCancelled()))

    def _set_outcome(self, outcome: Outcome) -> None:
        self.outcome = outcome
        self._completed.set()
        self._running = False

    async def wait(self) -> T:
        """Waits for the result of the job. Returns the result of the job
        when completed successfully, or raises an exception if the job
        terminated with an exception.
        """
        await self._completed.wait()
        assert self.outcome is not None
        object.__setattr__(
            self.outcome, "_unwrapped", False
        )  # to allow calling wait() twice
        return cast(T, self.outcome.unwrap())


@dataclass(order=True)
class SchedulerItem(Generic[T]):
    """A single job in the scheduler."""

    scheduled_time: float
    """Time when the job is supposed to start."""

    job: Job[T] | None = field(compare=False)
    """The job itself; ``None`` if the item was invalidated."""

    def _invalidate(self) -> None:
        """Invalidates the item when the job is re-scheduled with a new
        start time.
        """
        self.job = None


class Scheduler(Generic[T]):
    """Simple scheduler for starting asynchronous tasks in the future."""

    _cancel_scope: CancelScope
    """The cancel scope that stores the next wakeup time of the scheduler."""

    _heap: list[SchedulerItem[T]]
    """The list of jobs in the scheduler, along with their timestamps."""

    _jobs_to_items: dict[int, SchedulerItem[T]]
    """Dictionary mapping IDs jobs to their currently active scheduler items."""

    allow_late_start: bool = True
    """Whether the scheduler allows late submissions (i.e. jobs that are
    scheduled to a time that is earlier than the current time when they are
    submitted). Setting this property to ``False`` will make the scheduler throw
    a LateSubmissionError_ when someone tries to (re)schedule a job to a
    timestamp that is earlier than the current time.
    """

    def __init__(self, allow_late_start: bool = True, **kwds):
        """Constructor."""
        if kwds:
            if "allow_late_submissions" in kwds:
                warn(
                    "allow_late_submissions=... is deprecated, use allow_late_start=...",
                    DeprecationWarning,
                    stacklevel=1,
                )
                allow_late_start = kwds.pop("allow_late_submissions")
            if kwds:
                key, _ = kwds.popitem()
                raise TypeError(f"unexpected keyword argument: {key!r}")

        self.allow_late_start = bool(allow_late_start)
        self._cancel_scope = CancelScope()
        self._heap = []
        self._jobs_to_items = {}

    def cancel(self, job: Job[T]) -> None:
        """Cancels an already scheduled job."""
        item = self._jobs_to_items.pop(id(job), None)
        if item is not None:
            item._invalidate()
        job._cancel()

    def schedule_at(
        self,
        scheduled_time: float | datetime,
        func: Callable[..., Awaitable[T]],
        *args,
        allow_late_start: bool | None = None,
    ) -> Job[T]:
        """Schedules the given function to be called at the given time.

        Extra positional arguments are forwarded to the function.

        Note that this function will _not_ work correctly if the system clock
        is adjusted after the job has been scheduled; the delay to the job start
        will still be based on the old timestamp.

        Parameters:
            scheduled_time: the time when the function must be called, either
                as a POSIX timestamp or as a datetime object
            func: the function to call
            allow_late_start: whether the function can be scheduled even if the
                scheduled start time is already later than the current time.
                ``None`` means to use the default behaviour from the scheduler.

        Returns:
            the scheduled job
        """
        if allow_late_start is None:
            allow_late_start = self.allow_late_start
        timestamp = self._validate_timestamp(
            scheduled_time, allow_late_start=allow_late_start
        )
        job = Job(
            cast(Any, partial(func, *args) if args else func),
            allow_late_start=bool(allow_late_start),
        )
        return self._schedule(timestamp, job)

    def schedule_after(
        self,
        delay: float,
        func: Callable[..., Awaitable[T]],
        *args,
        allow_late_start: bool | None = None,
    ) -> Job[T]:
        """Schedules the given function to be called after a given number of
        seconds.

        Extra positional arguments are forwarded to the function.

        Parameters:
            delay: the number of seconds that must pass before the function is
                called
            func: the function to call
            allow_late_start: whether the function can be scheduled even if the
                scheduled start time is already later than the current time.
                ``None`` means to use the default behaviour from the scheduler.

        Returns:
            the scheduled job
        """
        if allow_late_start is None:
            allow_late_start = self.allow_late_start
        self._validate_delay(delay, allow_late_start=allow_late_start)
        job = Job(
            cast(Any, partial(func, *args) if args else func),
            allow_late_start=bool(allow_late_start),
        )
        return self._schedule(trio_time() + delay, job)

    def reschedule_to(self, scheduled_time: float | datetime, job: Job[T]) -> None:
        """Reschedules a job so it is executed at a later time.

        Parameters:
            scheduled_time: the new scheduled start time of the job
            job: the job to reschedule

        Raises:
            RuntimeError: if the job is already running
        """
        timestamp = self._validate_timestamp(
            scheduled_time, allow_late_start=job.allow_late_start
        )
        item = self._pop_scheduler_item_for_job(job)
        self._reschedule(timestamp, item)

    def reschedule_after(self, delay: float, job: Job[T]) -> None:
        """Reschedules a job so it is executed after a given number of
        seconds.

        Parameters:
            delay: the number of seconds that must pass before the function is
                called
            job: the job to reschedule

        Raises:
            RuntimeError: if the job is already running
        """
        self._validate_delay(delay, allow_late_start=job.allow_late_start)
        item = self._pop_scheduler_item_for_job(job)
        self._reschedule(trio_time() + delay, item)

    async def run(self, task_status=TASK_STATUS_IGNORED) -> None:
        """Runs the scheduler."""
        async with open_nursery() as nursery:
            task_status.started()
            while True:
                self._start_expired_jobs_in(nursery)

                next_deadline = self._heap[0].scheduled_time if self._heap else inf
                self._cancel_scope = CancelScope(deadline=next_deadline)
                with self._cancel_scope:
                    await sleep_forever()

    def _pop_scheduler_item_for_job(self, job: Job[T]) -> SchedulerItem[T]:
        """Returns the scheduler item corresponding to the given job and removes
        it from the internal jobs-to-items hash.

        Raises:
            RuntimeError: if the job execution has started already and the item
                was removed from the scheduler.
        """
        item = self._jobs_to_items.pop(id(job), None)
        if item is None:
            raise RuntimeError("Job execution has started already")
        return item

    def _schedule(self, timestamp: float, job: Job[T]) -> Job[T]:
        """Schedules a job to be called at the given Trio timestamp (which
        is not based on POSIX time). It is assumed that the job is not
        scheduled yet.
        """
        item = SchedulerItem(timestamp, job)

        heappush(self._heap, item)
        self._jobs_to_items[id(job)] = item

        if timestamp < self._cancel_scope.deadline:
            self._cancel_scope.deadline = timestamp

        return job

    def _reschedule(self, timestamp: float, item: SchedulerItem[T]) -> None:
        job = item.job
        assert job is not None

        item._invalidate()
        self._schedule(timestamp, job)

    def _start_expired_jobs_in(self, nursery: Nursery) -> None:
        now = trio_time()
        while self._heap and self._heap[0].scheduled_time <= now:
            item = heappop(self._heap)
            job = item.job
            if job is not None:
                del self._jobs_to_items[id(job)]
                nursery.start_soon(job._run)

    def _validate_delay(self, delay: float, *, allow_late_start: bool) -> None:
        """Validates a delay measured in seconds.

        Raises:
            LateSubmissionError: when the delay is negative and
                ``allow_late_start`` is set to ``False``.
        """
        if delay < 0 and not allow_late_start:
            delay = round(float(delay), 2)
            raise LateSubmissionError(
                f"Tried to schedule job to {-delay} seconds in the past"
            )

    def _validate_timestamp(
        self, timestamp: float | datetime, *, allow_late_start: bool
    ) -> float:
        """Validates a "local" (POSIX) timestamp and returns the equivalent
        Trio timestamp to be used in the scheduler.

        Raises:
            LateSubmissionError: when the timestamp is in the past and
                ``allow_late_start`` is set to ``False``.
        """
        # TODO(ntamas): this solution ignores the case when the system clock is
        # adjusted while the job is already scheduled.
        delay = (
            float(timestamp)
            if isinstance(timestamp, (float, int))
            else timestamp.timestamp()
        ) - posix_time()
        self._validate_delay(delay, allow_late_start=allow_late_start)
        return trio_time() + delay