Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
prefect / orion / models / task_run_states.py
Size: Mime:
"""
Functions for interacting with task run state ORM objects.
Intended for internal use by the Orion API.
"""

from uuid import UUID

import sqlalchemy as sa
from sqlalchemy import delete, select

from prefect.orion.database.dependencies import inject_db
from prefect.orion.database.interface import OrionDBInterface


@inject_db
async def read_task_run_state(
    session: sa.orm.Session, task_run_state_id: UUID, db: OrionDBInterface
):
    """
    Reads a task run state by id.

    Args:
        session: A database session
        task_run_state_id: a task run state id

    Returns:
        db.TaskRunState: the task state
    """

    return await session.get(db.TaskRunState, task_run_state_id)


@inject_db
async def read_task_run_states(
    session: sa.orm.Session, task_run_id: UUID, db: OrionDBInterface
):
    """
    Reads task runs states for a task run.

    Args:
        session: A database session
        task_run_id: the task run id

    Returns:
        List[db.TaskRunState]: the task run states
    """

    query = (
        select(db.TaskRunState)
        .filter_by(task_run_id=task_run_id)
        .order_by(db.TaskRunState.timestamp)
    )
    result = await session.execute(query)
    return result.scalars().unique().all()


@inject_db
async def delete_task_run_state(
    session: sa.orm.Session, task_run_state_id: UUID, db: OrionDBInterface
) -> bool:
    """
    Delete a task run state by id.

    Args:
        session: A database session
        task_run_state_id: a task run state id

    Returns:
        bool: whether or not the task run state was deleted
    """

    result = await session.execute(
        delete(db.TaskRunState).where(db.TaskRunState.id == task_run_state_id)
    )
    return result.rowcount > 0