Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
apache-airflow / jobs / backfill_job.py
Size: Mime:
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#

import time
from collections import OrderedDict
from typing import Optional, Set

import pendulum
from sqlalchemy.orm.session import Session, make_transient
from tabulate import tabulate

from airflow import models
from airflow.exceptions import (
    AirflowException,
    BackfillUnfinished,
    DagConcurrencyLimitReached,
    NoAvailablePoolSlot,
    PoolNotFound,
    TaskConcurrencyLimitReached,
)
from airflow.executors import executor_constants
from airflow.jobs.base_job import BaseJob
from airflow.models import DAG, DagPickle
from airflow.models.dagrun import DagRun
from airflow.models.taskinstance import TaskInstance, TaskInstanceKey
from airflow.ti_deps.dep_context import DepContext
from airflow.ti_deps.dependencies_deps import BACKFILL_QUEUED_DEPS
from airflow.timetables.base import DagRunInfo
from airflow.utils import helpers, timezone
from airflow.utils.configuration import conf as airflow_conf, tmp_configuration_copy
from airflow.utils.session import provide_session
from airflow.utils.state import State
from airflow.utils.types import DagRunType


class BackfillJob(BaseJob):
    """
    A backfill job consists of a dag or subdag for a specific time range. It
    triggers a set of task instance runs, in the right order and lasts for
    as long as it takes for the set of task instance to be completed.
    """

    STATES_COUNT_AS_RUNNING = (State.RUNNING, State.QUEUED)

    __mapper_args__ = {'polymorphic_identity': 'BackfillJob'}

    class _DagRunTaskStatus:
        """
        Internal status of the backfill job. This class is intended to be instantiated
        only within a BackfillJob instance and will track the execution of tasks,
        e.g. running, skipped, succeeded, failed, etc. Information about the dag runs
        related to the backfill job are also being tracked in this structure,
        .e.g finished runs, etc. Any other status related information related to the
        execution of dag runs / tasks can be included in this structure since it makes
        it easier to pass it around.

        :param to_run: Tasks to run in the backfill
        :type to_run: dict[tuple[TaskInstanceKey], airflow.models.TaskInstance]
        :param running: Maps running task instance key to task instance object
        :type running: dict[tuple[TaskInstanceKey], airflow.models.TaskInstance]
        :param skipped: Tasks that have been skipped
        :type skipped: set[tuple[TaskInstanceKey]]
        :param succeeded: Tasks that have succeeded so far
        :type succeeded: set[tuple[TaskInstanceKey]]
        :param failed: Tasks that have failed
        :type failed: set[tuple[TaskInstanceKey]]
        :param not_ready: Tasks not ready for execution
        :type not_ready: set[tuple[TaskInstanceKey]]
        :param deadlocked: Deadlocked tasks
        :type deadlocked: set[airflow.models.TaskInstance]
        :param active_runs: Active dag runs at a certain point in time
        :type active_runs: list[DagRun]
        :param executed_dag_run_dates: Datetime objects for the executed dag runs
        :type executed_dag_run_dates: set[datetime.datetime]
        :param finished_runs: Number of finished runs so far
        :type finished_runs: int
        :param total_runs: Number of total dag runs able to run
        :type total_runs: int
        """

        # TODO(edgarRd): AIRFLOW-1444: Add consistency check on counts
        def __init__(
            self,
            to_run=None,
            running=None,
            skipped=None,
            succeeded=None,
            failed=None,
            not_ready=None,
            deadlocked=None,
            active_runs=None,
            executed_dag_run_dates=None,
            finished_runs=0,
            total_runs=0,
        ):
            self.to_run = to_run or OrderedDict()
            self.running = running or {}
            self.skipped = skipped or set()
            self.succeeded = succeeded or set()
            self.failed = failed or set()
            self.not_ready = not_ready or set()
            self.deadlocked = deadlocked or set()
            self.active_runs = active_runs or []
            self.executed_dag_run_dates = executed_dag_run_dates or set()
            self.finished_runs = finished_runs
            self.total_runs = total_runs

    def __init__(
        self,
        dag,
        start_date=None,
        end_date=None,
        mark_success=False,
        donot_pickle=False,
        ignore_first_depends_on_past=False,
        ignore_task_deps=False,
        pool=None,
        delay_on_limit_secs=1.0,
        verbose=False,
        conf=None,
        rerun_failed_tasks=False,
        run_backwards=False,
        run_at_least_once=False,
        *args,
        **kwargs,
    ):
        """
        :param dag: DAG object.
        :type dag: airflow.models.DAG
        :param start_date: start date for the backfill date range.
        :type start_date: datetime.datetime
        :param end_date: end date for the backfill date range.
        :type end_date: datetime.datetime
        :param mark_success: flag whether to mark the task auto success.
        :type mark_success: bool
        :param donot_pickle: whether pickle
        :type donot_pickle: bool
        :param ignore_first_depends_on_past: whether to ignore depend on past
        :type ignore_first_depends_on_past: bool
        :param ignore_task_deps: whether to ignore the task dependency
        :type ignore_task_deps: bool
        :param pool: pool to backfill
        :type pool: str
        :param delay_on_limit_secs:
        :param verbose:
        :type verbose: flag to whether display verbose message to backfill console
        :param conf: a dictionary which user could pass k-v pairs for backfill
        :type conf: dictionary
        :param rerun_failed_tasks: flag to whether to
                                   auto rerun the failed task in backfill
        :type rerun_failed_tasks: bool
        :param run_backwards: Whether to process the dates from most to least recent
        :type run_backwards bool
        :param run_at_least_once: If true, always run the DAG at least once even
            if no logical run exists within the time range.
        :type: bool
        :param args:
        :param kwargs:
        """
        self.dag = dag
        self.dag_id = dag.dag_id
        self.bf_start_date = start_date
        self.bf_end_date = end_date
        self.mark_success = mark_success
        self.donot_pickle = donot_pickle
        self.ignore_first_depends_on_past = ignore_first_depends_on_past
        self.ignore_task_deps = ignore_task_deps
        self.pool = pool
        self.delay_on_limit_secs = delay_on_limit_secs
        self.verbose = verbose
        self.conf = conf
        self.rerun_failed_tasks = rerun_failed_tasks
        self.run_backwards = run_backwards
        self.run_at_least_once = run_at_least_once
        super().__init__(*args, **kwargs)

    @provide_session
    def _update_counters(self, ti_status, session=None):
        """
        Updates the counters per state of the tasks that were running. Can re-add
        to tasks to run in case required.

        :param ti_status: the internal status of the backfill job tasks
        :type ti_status: BackfillJob._DagRunTaskStatus
        """
        tis_to_be_scheduled = []
        refreshed_tis = []
        TI = TaskInstance

        filter_for_tis = TI.filter_for_tis(list(ti_status.running.values()))
        if filter_for_tis is not None:
            refreshed_tis = session.query(TI).filter(filter_for_tis).all()

        for ti in refreshed_tis:
            # Here we remake the key by subtracting 1 to match in memory information
            reduced_key = ti.key.reduced
            if ti.state == State.SUCCESS:
                ti_status.succeeded.add(reduced_key)
                self.log.debug("Task instance %s succeeded. Don't rerun.", ti)
                ti_status.running.pop(reduced_key)
                continue
            if ti.state == State.SKIPPED:
                ti_status.skipped.add(reduced_key)
                self.log.debug("Task instance %s skipped. Don't rerun.", ti)
                ti_status.running.pop(reduced_key)
                continue
            if ti.state == State.FAILED:
                self.log.error("Task instance %s failed", ti)
                ti_status.failed.add(reduced_key)
                ti_status.running.pop(reduced_key)
                continue
            # special case: if the task needs to run again put it back
            if ti.state == State.UP_FOR_RETRY:
                self.log.warning("Task instance %s is up for retry", ti)
                ti_status.running.pop(reduced_key)
                ti_status.to_run[ti.key] = ti
            # special case: if the task needs to be rescheduled put it back
            elif ti.state == State.UP_FOR_RESCHEDULE:
                self.log.warning("Task instance %s is up for reschedule", ti)
                # During handling of reschedule state in ti._handle_reschedule, try number is reduced
                # by one, so we should not use reduced_key to avoid key error
                ti_status.running.pop(ti.key)
                ti_status.to_run[ti.key] = ti
            # special case: The state of the task can be set to NONE by the task itself
            # when it reaches concurrency limits. It could also happen when the state
            # is changed externally, e.g. by clearing tasks from the ui. We need to cover
            # for that as otherwise those tasks would fall outside of the scope of
            # the backfill suddenly.
            elif ti.state == State.NONE:
                self.log.warning(
                    "FIXME: task instance %s state was set to none externally or "
                    "reaching concurrency limits. Re-adding task to queue.",
                    ti,
                )
                tis_to_be_scheduled.append(ti)
                ti_status.running.pop(reduced_key)
                ti_status.to_run[ti.key] = ti

        # Batch schedule of task instances
        if tis_to_be_scheduled:
            filter_for_tis = TI.filter_for_tis(tis_to_be_scheduled)
            session.query(TI).filter(filter_for_tis).update(
                values={TI.state: State.SCHEDULED}, synchronize_session=False
            )

    def _manage_executor_state(self, running):
        """
        Checks if the executor agrees with the state of task instances
        that are running

        :param running: dict of key, task to verify
        """
        executor = self.executor

        # TODO: query all instead of refresh from db
        for key, value in list(executor.get_event_buffer().items()):
            state, info = value
            if key not in running:
                self.log.warning("%s state %s not in running=%s", key, state, running.values())
                continue

            ti = running[key]
            ti.refresh_from_db()

            self.log.debug("Executor state: %s task %s", state, ti)

            if state in (State.FAILED, State.SUCCESS) and ti.state in self.STATES_COUNT_AS_RUNNING:
                msg = (
                    "Executor reports task instance {} finished ({}) "
                    "although the task says its {}. Was the task "
                    "killed externally? Info: {}".format(ti, state, ti.state, info)
                )
                self.log.error(msg)
                ti.handle_failure_with_callback(error=msg)

    @provide_session
    def _get_dag_run(self, dagrun_info: DagRunInfo, dag: DAG, session: Session = None):
        """
        Returns a dag run for the given run date, which will be matched to an existing
        dag run if available or create a new dag run otherwise. If the max_active_runs
        limit is reached, this function will return None.

        :param dagrun_info: Schedule information for the dag run
        :param dag: DAG
        :param session: the database session object
        :return: a DagRun in state RUNNING or None
        """
        run_date = dagrun_info.logical_date

        # consider max_active_runs but ignore when running subdags
        respect_dag_max_active_limit = bool(dag.timetable.can_run and not dag.is_subdag)

        current_active_dag_count = dag.get_num_active_runs(external_trigger=False)

        # check if we are scheduling on top of a already existing dag_run
        # we could find a "scheduled" run instead of a "backfill"
        runs = DagRun.find(dag_id=dag.dag_id, execution_date=run_date, session=session)
        run: Optional[DagRun]
        if runs:
            run = runs[0]
            if run.state == State.RUNNING:
                respect_dag_max_active_limit = False
        else:
            run = None

        # enforce max_active_runs limit for dag, special cases already
        # handled by respect_dag_max_active_limit
        if respect_dag_max_active_limit and current_active_dag_count >= dag.max_active_runs:
            return None

        run = run or dag.create_dagrun(
            execution_date=run_date,
            data_interval=dagrun_info.data_interval,
            start_date=timezone.utcnow(),
            state=State.RUNNING,
            external_trigger=False,
            session=session,
            conf=self.conf,
            run_type=DagRunType.BACKFILL_JOB,
            creating_job_id=self.id,
        )

        # set required transient field
        run.dag = dag

        # explicitly mark as backfill and running
        run.state = State.RUNNING
        run.run_type = DagRunType.BACKFILL_JOB
        run.verify_integrity(session=session)
        return run

    @provide_session
    def _task_instances_for_dag_run(self, dag_run, session=None):
        """
        Returns a map of task instance key to task instance object for the tasks to
        run in the given dag run.

        :param dag_run: the dag run to get the tasks from
        :type dag_run: airflow.models.DagRun
        :param session: the database session object
        :type session: sqlalchemy.orm.session.Session
        """
        tasks_to_run = {}

        if dag_run is None:
            return tasks_to_run

        # check if we have orphaned tasks
        self.reset_state_for_orphaned_tasks(filter_by_dag_run=dag_run, session=session)

        # for some reason if we don't refresh the reference to run is lost
        dag_run.refresh_from_db()
        make_transient(dag_run)

        try:
            for ti in dag_run.get_task_instances():
                # all tasks part of the backfill are scheduled to run
                if ti.state == State.NONE:
                    ti.set_state(State.SCHEDULED, session=session)
                if ti.state != State.REMOVED:
                    tasks_to_run[ti.key] = ti
            session.commit()
        except Exception:
            session.rollback()
            raise

        return tasks_to_run

    def _log_progress(self, ti_status):
        self.log.info(
            '[backfill progress] | finished run %s of %s | tasks waiting: %s | succeeded: %s | '
            'running: %s | failed: %s | skipped: %s | deadlocked: %s | not ready: %s',
            ti_status.finished_runs,
            ti_status.total_runs,
            len(ti_status.to_run),
            len(ti_status.succeeded),
            len(ti_status.running),
            len(ti_status.failed),
            len(ti_status.skipped),
            len(ti_status.deadlocked),
            len(ti_status.not_ready),
        )

        self.log.debug("Finished dag run loop iteration. Remaining tasks %s", ti_status.to_run.values())

    @provide_session
    def _process_backfill_task_instances(
        self,
        ti_status,
        executor,
        pickle_id,
        start_date=None,
        session=None,
    ):
        """
        Process a set of task instances from a set of dag runs. Special handling is done
        to account for different task instance states that could be present when running
        them in a backfill process.

        :param ti_status: the internal status of the job
        :type ti_status: BackfillJob._DagRunTaskStatus
        :param executor: the executor to run the task instances
        :type executor: BaseExecutor
        :param pickle_id: the pickle_id if dag is pickled, None otherwise
        :type pickle_id: int
        :param start_date: the start date of the backfill job
        :type start_date: datetime.datetime
        :param session: the current session object
        :type session: sqlalchemy.orm.session.Session
        :return: the list of execution_dates for the finished dag runs
        :rtype: list
        """
        executed_run_dates = []

        is_unit_test = airflow_conf.getboolean('core', 'unit_test_mode')

        while (len(ti_status.to_run) > 0 or len(ti_status.running) > 0) and len(ti_status.deadlocked) == 0:
            self.log.debug("*** Clearing out not_ready list ***")
            ti_status.not_ready.clear()

            # we need to execute the tasks bottom to top
            # or leaf to root, as otherwise tasks might be
            # determined deadlocked while they are actually
            # waiting for their upstream to finish
            @provide_session
            def _per_task_process(key, ti: TaskInstance, session=None):
                ti.refresh_from_db(lock_for_update=True, session=session)

                task = self.dag.get_task(ti.task_id, include_subdags=True)
                ti.task = task

                self.log.debug("Task instance to run %s state %s", ti, ti.state)

                # The task was already marked successful or skipped by a
                # different Job. Don't rerun it.
                if ti.state == State.SUCCESS:
                    ti_status.succeeded.add(key)
                    self.log.debug("Task instance %s succeeded. Don't rerun.", ti)
                    ti_status.to_run.pop(key)
                    if key in ti_status.running:
                        ti_status.running.pop(key)
                    return
                elif ti.state == State.SKIPPED:
                    ti_status.skipped.add(key)
                    self.log.debug("Task instance %s skipped. Don't rerun.", ti)
                    ti_status.to_run.pop(key)
                    if key in ti_status.running:
                        ti_status.running.pop(key)
                    return

                # guard against externally modified tasks instances or
                # in case max concurrency has been reached at task runtime
                elif ti.state == State.NONE:
                    self.log.warning(
                        "FIXME: Task instance %s state was set to None externally. This should not happen", ti
                    )
                    ti.set_state(State.SCHEDULED, session=session)
                if self.rerun_failed_tasks:
                    # Rerun failed tasks or upstreamed failed tasks
                    if ti.state in (State.FAILED, State.UPSTREAM_FAILED):
                        self.log.error("Task instance %s with state %s", ti, ti.state)
                        if key in ti_status.running:
                            ti_status.running.pop(key)
                        # Reset the failed task in backfill to scheduled state
                        ti.set_state(State.SCHEDULED, session=session)
                else:
                    # Default behaviour which works for subdag.
                    if ti.state in (State.FAILED, State.UPSTREAM_FAILED):
                        self.log.error("Task instance %s with state %s", ti, ti.state)
                        ti_status.failed.add(key)
                        ti_status.to_run.pop(key)
                        if key in ti_status.running:
                            ti_status.running.pop(key)
                        return

                if self.ignore_first_depends_on_past:
                    dagrun = ti.get_dagrun(session=session)
                    ignore_depends_on_past = dagrun.execution_date == (start_date or ti.start_date)
                else:
                    ignore_depends_on_past = False

                backfill_context = DepContext(
                    deps=BACKFILL_QUEUED_DEPS,
                    ignore_depends_on_past=ignore_depends_on_past,
                    ignore_task_deps=self.ignore_task_deps,
                    flag_upstream_failed=True,
                )

                # Is the task runnable? -- then run it
                # the dependency checker can change states of tis
                if ti.are_dependencies_met(
                    dep_context=backfill_context, session=session, verbose=self.verbose
                ):
                    if executor.has_task(ti):
                        self.log.debug("Task Instance %s already in executor waiting for queue to clear", ti)
                    else:
                        self.log.debug('Sending %s to executor', ti)
                        # Skip scheduled state, we are executing immediately
                        ti.state = State.QUEUED
                        ti.queued_by_job_id = self.id
                        ti.queued_dttm = timezone.utcnow()
                        session.merge(ti)

                        cfg_path = None
                        if self.executor_class in (
                            executor_constants.LOCAL_EXECUTOR,
                            executor_constants.SEQUENTIAL_EXECUTOR,
                        ):
                            cfg_path = tmp_configuration_copy()

                        executor.queue_task_instance(
                            ti,
                            mark_success=self.mark_success,
                            pickle_id=pickle_id,
                            ignore_task_deps=self.ignore_task_deps,
                            ignore_depends_on_past=ignore_depends_on_past,
                            pool=self.pool,
                            cfg_path=cfg_path,
                        )
                        ti_status.running[key] = ti
                        ti_status.to_run.pop(key)
                    session.commit()
                    return

                if ti.state == State.UPSTREAM_FAILED:
                    self.log.error("Task instance %s upstream failed", ti)
                    ti_status.failed.add(key)
                    ti_status.to_run.pop(key)
                    if key in ti_status.running:
                        ti_status.running.pop(key)
                    return

                # special case
                if ti.state == State.UP_FOR_RETRY:
                    self.log.debug("Task instance %s retry period not expired yet", ti)
                    if key in ti_status.running:
                        ti_status.running.pop(key)
                    ti_status.to_run[key] = ti
                    return

                # special case
                if ti.state == State.UP_FOR_RESCHEDULE:
                    self.log.debug("Task instance %s reschedule period not expired yet", ti)
                    if key in ti_status.running:
                        ti_status.running.pop(key)
                    ti_status.to_run[key] = ti
                    return

                # all remaining tasks
                self.log.debug('Adding %s to not_ready', ti)
                ti_status.not_ready.add(key)

            try:
                for task in self.dag.topological_sort(include_subdag_tasks=True):
                    for key, ti in list(ti_status.to_run.items()):
                        if task.task_id != ti.task_id:
                            continue

                        pool = session.query(models.Pool).filter(models.Pool.pool == task.pool).first()
                        if not pool:
                            raise PoolNotFound(f'Unknown pool: {task.pool}')

                        open_slots = pool.open_slots(session=session)
                        if open_slots <= 0:
                            raise NoAvailablePoolSlot(
                                "Not scheduling since there are "
                                "{} open slots in pool {}".format(open_slots, task.pool)
                            )

                        num_running_task_instances_in_dag = DAG.get_num_task_instances(
                            self.dag_id,
                            states=self.STATES_COUNT_AS_RUNNING,
                            session=session,
                        )

                        if num_running_task_instances_in_dag >= self.dag.max_active_tasks:
                            raise DagConcurrencyLimitReached(
                                "Not scheduling since DAG max_active_tasks limit is reached."
                            )

                        if task.max_active_tis_per_dag:
                            num_running_task_instances_in_task = DAG.get_num_task_instances(
                                dag_id=self.dag_id,
                                task_ids=[task.task_id],
                                states=self.STATES_COUNT_AS_RUNNING,
                                session=session,
                            )

                            if num_running_task_instances_in_task >= task.max_active_tis_per_dag:
                                raise TaskConcurrencyLimitReached(
                                    "Not scheduling since Task concurrency limit is reached."
                                )

                        _per_task_process(key, ti)
            except (NoAvailablePoolSlot, DagConcurrencyLimitReached, TaskConcurrencyLimitReached) as e:
                self.log.debug(e)

            self.heartbeat(only_if_necessary=is_unit_test)
            # execute the tasks in the queue
            executor.heartbeat()

            # If the set of tasks that aren't ready ever equals the set of
            # tasks to run and there are no running tasks then the backfill
            # is deadlocked
            if (
                ti_status.not_ready
                and ti_status.not_ready == set(ti_status.to_run)
                and len(ti_status.running) == 0
            ):
                self.log.warning("Deadlock discovered for ti_status.to_run=%s", ti_status.to_run.values())
                ti_status.deadlocked.update(ti_status.to_run.values())
                ti_status.to_run.clear()

            # check executor state
            self._manage_executor_state(ti_status.running)

            # update the task counters
            self._update_counters(ti_status=ti_status)

            # update dag run state
            _dag_runs = ti_status.active_runs[:]
            for run in _dag_runs:
                run.update_state(session=session)
                if run.state in State.finished:
                    ti_status.finished_runs += 1
                    ti_status.active_runs.remove(run)
                    executed_run_dates.append(run.execution_date)

            self._log_progress(ti_status)

        # return updated status
        return executed_run_dates

    @provide_session
    def _collect_errors(self, ti_status, session=None):
        def tabulate_ti_keys_set(set_ti_keys: Set[TaskInstanceKey]) -> str:
            # Sorting by execution date first
            sorted_ti_keys = sorted(
                set_ti_keys,
                key=lambda ti_key: (ti_key.run_id, ti_key.dag_id, ti_key.task_id, ti_key.try_number),
            )
            return tabulate(sorted_ti_keys, headers=["DAG ID", "Task ID", "Run ID", "Try number"])

        def tabulate_tis_set(set_tis: Set[TaskInstance]) -> str:
            # Sorting by execution date first
            sorted_tis = sorted(set_tis, key=lambda ti: (ti.run_id, ti.dag_id, ti.task_id, ti.try_number))
            tis_values = ((ti.dag_id, ti.task_id, ti.run_id, ti.try_number) for ti in sorted_tis)
            return tabulate(tis_values, headers=["DAG ID", "Task ID", "Run ID", "Try number"])

        err = ''
        if ti_status.failed:
            err += "Some task instances failed:\n"
            err += tabulate_ti_keys_set(ti_status.failed)
        if ti_status.deadlocked:
            err += 'BackfillJob is deadlocked.'
            deadlocked_depends_on_past = any(
                t.are_dependencies_met(
                    dep_context=DepContext(ignore_depends_on_past=False),
                    session=session,
                    verbose=self.verbose,
                )
                != t.are_dependencies_met(
                    dep_context=DepContext(ignore_depends_on_past=True), session=session, verbose=self.verbose
                )
                for t in ti_status.deadlocked
            )
            if deadlocked_depends_on_past:
                err += (
                    'Some of the deadlocked tasks were unable to run because '
                    'of "depends_on_past" relationships. Try running the '
                    'backfill with the option '
                    '"ignore_first_depends_on_past=True" or passing "-I" at '
                    'the command line.'
                )
            err += '\nThese tasks have succeeded:\n'
            err += tabulate_ti_keys_set(ti_status.succeeded)
            err += '\n\nThese tasks are running:\n'
            err += tabulate_ti_keys_set(ti_status.running)
            err += '\n\nThese tasks have failed:\n'
            err += tabulate_ti_keys_set(ti_status.failed)
            err += '\n\nThese tasks are skipped:\n'
            err += tabulate_ti_keys_set(ti_status.skipped)
            err += '\n\nThese tasks are deadlocked:\n'
            err += tabulate_tis_set(ti_status.deadlocked)

        return err

    @provide_session
    def _execute_dagruns(self, dagrun_infos, ti_status, executor, pickle_id, start_date, session=None):
        """
        Computes the dag runs and their respective task instances for
        the given run dates and executes the task instances.
        Returns a list of execution dates of the dag runs that were executed.

        :param dagrun_infos: Schedule information for dag runs
        :type dagrun_infos: list[DagRunInfo]
        :param ti_status: internal BackfillJob status structure to tis track progress
        :type ti_status: BackfillJob._DagRunTaskStatus
        :param executor: the executor to use, it must be previously started
        :type executor: BaseExecutor
        :param pickle_id: numeric id of the pickled dag, None if not pickled
        :type pickle_id: int
        :param start_date: backfill start date
        :type start_date: datetime.datetime
        :param session: the current session object
        :type session: sqlalchemy.orm.session.Session
        """
        for dagrun_info in dagrun_infos:
            for dag in [self.dag] + self.dag.subdags:
                dag_run = self._get_dag_run(dagrun_info, dag, session=session)
                tis_map = self._task_instances_for_dag_run(dag_run, session=session)
                if dag_run is None:
                    continue

                ti_status.active_runs.append(dag_run)
                ti_status.to_run.update(tis_map or {})

        processed_dag_run_dates = self._process_backfill_task_instances(
            ti_status=ti_status,
            executor=executor,
            pickle_id=pickle_id,
            start_date=start_date,
            session=session,
        )

        ti_status.executed_dag_run_dates.update(processed_dag_run_dates)

    @provide_session
    def _set_unfinished_dag_runs_to_failed(self, dag_runs, session=None):
        """
        Go through the dag_runs and update the state based on the task_instance state.
        Then set DAG runs that are not finished to failed.

        :param dag_runs: DAG runs
        :param session: session
        :return: None
        """
        for dag_run in dag_runs:
            dag_run.update_state()
            if dag_run.state not in State.finished:
                dag_run.set_state(State.FAILED)
            session.merge(dag_run)

    @provide_session
    def _execute(self, session=None):
        """
        Initializes all components required to run a dag for a specified date range and
        calls helper method to execute the tasks.
        """
        ti_status = BackfillJob._DagRunTaskStatus()

        start_date = self.bf_start_date

        # Get DagRun schedule between the start/end dates, which will turn into dag runs.
        dagrun_start_date = timezone.coerce_datetime(start_date)
        if self.bf_end_date is None:
            dagrun_end_date = pendulum.now(timezone.utc)
        else:
            dagrun_end_date = pendulum.instance(self.bf_end_date)
        dagrun_infos = list(self.dag.iter_dagrun_infos_between(dagrun_start_date, dagrun_end_date))
        if self.run_backwards:
            tasks_that_depend_on_past = [t.task_id for t in self.dag.task_dict.values() if t.depends_on_past]
            if tasks_that_depend_on_past:
                raise AirflowException(
                    'You cannot backfill backwards because one or more tasks depend_on_past: {}'.format(
                        ",".join(tasks_that_depend_on_past)
                    )
                )
            dagrun_infos = dagrun_infos[::-1]

        if not dagrun_infos:
            if not self.run_at_least_once:
                self.log.info("No run dates were found for the given dates and dag interval.")
                return
            dagrun_infos = [DagRunInfo.interval(dagrun_start_date, dagrun_end_date)]

        # picklin'
        pickle_id = None

        if not self.donot_pickle and self.executor_class not in (
            executor_constants.LOCAL_EXECUTOR,
            executor_constants.SEQUENTIAL_EXECUTOR,
            executor_constants.DASK_EXECUTOR,
        ):
            pickle = DagPickle(self.dag)
            session.add(pickle)
            session.commit()
            pickle_id = pickle.id

        executor = self.executor
        executor.job_id = "backfill"
        executor.start()

        ti_status.total_runs = len(dagrun_infos)  # total dag runs in backfill

        try:
            remaining_dates = ti_status.total_runs
            while remaining_dates > 0:
                dagrun_infos_to_process = [
                    dagrun_info
                    for dagrun_info in dagrun_infos
                    if dagrun_info.logical_date not in ti_status.executed_dag_run_dates
                ]
                self._execute_dagruns(
                    dagrun_infos=dagrun_infos_to_process,
                    ti_status=ti_status,
                    executor=executor,
                    pickle_id=pickle_id,
                    start_date=start_date,
                    session=session,
                )

                remaining_dates = ti_status.total_runs - len(ti_status.executed_dag_run_dates)
                err = self._collect_errors(ti_status=ti_status, session=session)
                if err:
                    raise BackfillUnfinished(err, ti_status)

                if remaining_dates > 0:
                    self.log.info(
                        "max_active_runs limit for dag %s has been reached "
                        " - waiting for other dag runs to finish",
                        self.dag_id,
                    )
                    time.sleep(self.delay_on_limit_secs)
        except (KeyboardInterrupt, SystemExit):
            self.log.warning("Backfill terminated by user.")

            # TODO: we will need to terminate running task instances and set the
            # state to failed.
            self._set_unfinished_dag_runs_to_failed(ti_status.active_runs)
        finally:
            session.commit()
            executor.end()

        self.log.info("Backfill done. Exiting.")

    @provide_session
    def reset_state_for_orphaned_tasks(self, filter_by_dag_run=None, session=None):
        """
        This function checks if there are any tasks in the dagrun (or all) that
        have a schedule or queued states but are not known by the executor. If
        it finds those it will reset the state to None so they will get picked
        up again.  The batch option is for performance reasons as the queries
        are made in sequence.

        :param filter_by_dag_run: the dag_run we want to process, None if all
        :type filter_by_dag_run: airflow.models.DagRun
        :return: the number of TIs reset
        :rtype: int
        """
        queued_tis = self.executor.queued_tasks
        # also consider running as the state might not have changed in the db yet
        running_tis = self.executor.running

        # Can't use an update here since it doesn't support joins.
        resettable_states = [State.SCHEDULED, State.QUEUED]
        if filter_by_dag_run is None:
            resettable_tis = (
                session.query(TaskInstance)
                .join(TaskInstance.dag_run)
                .filter(
                    DagRun.state == State.RUNNING,
                    DagRun.run_type != DagRunType.BACKFILL_JOB,
                    TaskInstance.state.in_(resettable_states),
                )
            ).all()
        else:
            resettable_tis = filter_by_dag_run.get_task_instances(state=resettable_states, session=session)

        tis_to_reset = [ti for ti in resettable_tis if ti.key not in queued_tis and ti.key not in running_tis]
        if not tis_to_reset:
            return 0

        def query(result, items):
            if not items:
                return result

            filter_for_tis = TaskInstance.filter_for_tis(items)
            reset_tis = (
                session.query(TaskInstance)
                .filter(filter_for_tis, TaskInstance.state.in_(resettable_states))
                .with_for_update()
                .all()
            )

            for ti in reset_tis:
                ti.state = State.NONE
                session.merge(ti)

            return result + reset_tis

        reset_tis = helpers.reduce_in_chunks(query, tis_to_reset, [], self.max_tis_per_query)

        task_instance_str = '\n\t'.join(repr(x) for x in reset_tis)
        session.flush()

        self.log.info("Reset the following %s TaskInstances:\n\t%s", len(reset_tis), task_instance_str)
        return len(reset_tis)