Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
apache-airflow / utils / state.py
Size: Mime:
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

from enum import Enum
from typing import Dict, FrozenSet, Tuple

from airflow.settings import STATE_COLORS
from airflow.utils.types import Optional


class TaskInstanceState(str, Enum):
    """
    Enum that represents all possible states that a Task Instance can be in.

    Note that None is also allowed, so always use this in a type hint with Optional.
    """

    # Set by the scheduler
    # None - Task is created but should not run yet
    REMOVED = "removed"  # Task vanished from DAG before it ran
    SCHEDULED = "scheduled"  # Task should run and will be handed to executor soon

    # Set by the task instance itself
    QUEUED = "queued"  # Executor has enqueued the task
    RUNNING = "running"  # Task is executing
    SUCCESS = "success"  # Task completed
    SHUTDOWN = "shutdown"  # External request to shut down (e.g. marked failed when running)
    RESTARTING = "restarting"  # External request to restart (e.g. cleared when running)
    FAILED = "failed"  # Task errored out
    UP_FOR_RETRY = "up_for_retry"  # Task failed but has retries left
    UP_FOR_RESCHEDULE = "up_for_reschedule"  # A waiting `reschedule` sensor
    UPSTREAM_FAILED = "upstream_failed"  # One or more upstream deps failed
    SKIPPED = "skipped"  # Skipped by branching or some other mechanism
    SENSING = "sensing"  # Smart sensor offloaded to the sensor DAG
    DEFERRED = "deferred"  # Deferrable operator waiting on a trigger

    def __str__(self) -> str:  # pylint: disable=invalid-str-returned
        return self.value


class DagRunState(str, Enum):
    """
    Enum that represents all possible states that a DagRun can be in.

    These are "shared" with TaskInstanceState in some parts of the code,
    so please ensure that their values always match the ones with the
    same name in TaskInstanceState.
    """

    QUEUED = "queued"
    RUNNING = "running"
    SUCCESS = "success"
    FAILED = "failed"

    def __str__(self) -> str:
        return self.value


class State:
    """
    Static class with task instance state constants and color methods to
    avoid hardcoding.
    """

    # Backwards-compat constants for code that does not yet use the enum
    # These first three are shared by DagState and TaskState
    SUCCESS = TaskInstanceState.SUCCESS
    RUNNING = TaskInstanceState.RUNNING
    FAILED = TaskInstanceState.FAILED

    # These are TaskState only
    NONE = None
    REMOVED = TaskInstanceState.REMOVED
    SCHEDULED = TaskInstanceState.SCHEDULED
    QUEUED = TaskInstanceState.QUEUED
    SHUTDOWN = TaskInstanceState.SHUTDOWN
    RESTARTING = TaskInstanceState.RESTARTING
    UP_FOR_RETRY = TaskInstanceState.UP_FOR_RETRY
    UP_FOR_RESCHEDULE = TaskInstanceState.UP_FOR_RESCHEDULE
    UPSTREAM_FAILED = TaskInstanceState.UPSTREAM_FAILED
    SKIPPED = TaskInstanceState.SKIPPED
    SENSING = TaskInstanceState.SENSING
    DEFERRED = TaskInstanceState.DEFERRED

    task_states: Tuple[Optional[TaskInstanceState], ...] = (None,) + tuple(TaskInstanceState)

    dag_states: Tuple[DagRunState, ...] = (
        DagRunState.QUEUED,
        DagRunState.SUCCESS,
        DagRunState.RUNNING,
        DagRunState.FAILED,
    )

    state_color: Dict[Optional[TaskInstanceState], str] = {
        None: 'lightblue',
        TaskInstanceState.QUEUED: 'gray',
        TaskInstanceState.RUNNING: 'lime',
        TaskInstanceState.SUCCESS: 'green',
        TaskInstanceState.SHUTDOWN: 'blue',
        TaskInstanceState.RESTARTING: 'violet',
        TaskInstanceState.FAILED: 'red',
        TaskInstanceState.UP_FOR_RETRY: 'gold',
        TaskInstanceState.UP_FOR_RESCHEDULE: 'turquoise',
        TaskInstanceState.UPSTREAM_FAILED: 'orange',
        TaskInstanceState.SKIPPED: 'pink',
        TaskInstanceState.REMOVED: 'lightgrey',
        TaskInstanceState.SCHEDULED: 'tan',
        TaskInstanceState.DEFERRED: 'mediumpurple',
    }
    state_color[TaskInstanceState.SENSING] = state_color[TaskInstanceState.DEFERRED]
    state_color.update(STATE_COLORS)  # type: ignore

    @classmethod
    def color(cls, state):
        """Returns color for a state."""
        return cls.state_color.get(state, 'white')

    @classmethod
    def color_fg(cls, state):
        """Black&white colors for a state."""
        color = cls.color(state)
        if color in ['green', 'red']:
            return 'white'
        return 'black'

    running: FrozenSet[TaskInstanceState] = frozenset(
        [TaskInstanceState.RUNNING, TaskInstanceState.SENSING, TaskInstanceState.DEFERRED]
    )
    """
    A list of states indicating that a task is being executed.
    """

    finished: FrozenSet[TaskInstanceState] = frozenset(
        [
            TaskInstanceState.SUCCESS,
            TaskInstanceState.FAILED,
            TaskInstanceState.SKIPPED,
            TaskInstanceState.UPSTREAM_FAILED,
        ]
    )
    """
    A list of states indicating a task has reached a terminal state (i.e. it has "finished") and needs no
    further action.

    Note that the attempt could have resulted in failure or have been
    interrupted; or perhaps never run at all (skip, or upstream_failed) in any
    case, it is no longer running.
    """

    unfinished: FrozenSet[Optional[TaskInstanceState]] = frozenset(
        [
            None,
            TaskInstanceState.SCHEDULED,
            TaskInstanceState.QUEUED,
            TaskInstanceState.RUNNING,
            TaskInstanceState.SENSING,
            TaskInstanceState.SHUTDOWN,
            TaskInstanceState.RESTARTING,
            TaskInstanceState.UP_FOR_RETRY,
            TaskInstanceState.UP_FOR_RESCHEDULE,
            TaskInstanceState.DEFERRED,
        ]
    )
    """
    A list of states indicating that a task either has not completed
    a run or has not even started.
    """

    failed_states: FrozenSet[TaskInstanceState] = frozenset(
        [TaskInstanceState.FAILED, TaskInstanceState.UPSTREAM_FAILED]
    )
    """
    A list of states indicating that a task or dag is a failed state.
    """

    success_states: FrozenSet[TaskInstanceState] = frozenset(
        [TaskInstanceState.SUCCESS, TaskInstanceState.SKIPPED]
    )
    """
    A list of states indicating that a task or dag is a success state.
    """

    terminating_states = frozenset([TaskInstanceState.SHUTDOWN, TaskInstanceState.RESTARTING])
    """
    A list of states indicating that a task has been terminated.
    """


class PokeState:
    """Static class with poke states constants used in smart operator."""

    LANDED = 'landed'
    NOT_LANDED = 'not_landed'
    POKE_EXCEPTION = 'poke_exception'