Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
prefect / cli / agent.py
Size: Mime:
"""
Command line interface for working with agent services
"""
from typing import List
from uuid import UUID

import typer

import prefect
from prefect.agent import OrionAgent
from prefect.cli._types import PrefectTyper, SettingsOption
from prefect.cli._utilities import exit_with_error
from prefect.cli.root import app
from prefect.client import get_client
from prefect.exceptions import ObjectNotFound
from prefect.settings import PREFECT_AGENT_QUERY_INTERVAL, PREFECT_API_URL
from prefect.utilities.services import critical_service_loop

agent_app = PrefectTyper(
    name="agent", help="Commands for starting and interacting with agent processes."
)
app.add_typer(agent_app)


ascii_name = r"""
  ___ ___ ___ ___ ___ ___ _____     _   ___ ___ _  _ _____
 | _ \ _ \ __| __| __/ __|_   _|   /_\ / __| __| \| |_   _|
 |  _/   / _|| _|| _| (__  | |    / _ \ (_ | _|| .` | | |
 |_| |_|_\___|_| |___\___| |_|   /_/ \_\___|___|_|\_| |_|

"""


@agent_app.command()
async def start(
    # deprecated main argument
    work_queue: str = typer.Argument(
        None,
        show_default=False,
        help="DEPRECATED: A work queue name or ID",
    ),
    work_queues: List[str] = typer.Option(
        None,
        "-q",
        "--work-queue",
        help="One or more work queue names for the agent to pull from.",
    ),
    hide_welcome: bool = typer.Option(False, "--hide-welcome"),
    api: str = SettingsOption(PREFECT_API_URL),
    # deprecated tags
    tags: List[str] = typer.Option(
        None,
        "-t",
        "--tag",
        help="DEPRECATED: One or more optional tags that will be used to create a work queue",
    ),
):
    """
    Start an agent process to poll one or more work queues for flow runs.
    """
    work_queues = work_queues or []

    if work_queue is not None:
        # try to treat the work_queue as a UUID
        try:
            async with get_client() as client:
                q = await client.read_work_queue(UUID(work_queue))
                work_queue = q.name
        # otherwise treat it as a string name
        except (TypeError, ValueError):
            pass
        work_queues.append(work_queue)
        app.console.print(
            "Agents now support multiple work queues. Instead of passing a single argument, provide work queue names "
            f"with the `-q` or `--work-queue` flag: `prefect agent start -q {work_queue}`\n",
            style="blue",
        )

    if not work_queues and not tags:
        exit_with_error("No work queues provided!", style="red")
    elif work_queues and tags:
        exit_with_error(
            "Either `work_queues` or `tags` can be provided, but not both.", style="red"
        )

    if tags:
        work_queue_name = f"Agent queue {'-'.join(sorted(tags))}"
        app.console.print(
            "`tags` are deprecated. For backwards-compatibility with old "
            f"versions of Prefect, this agent will create a work queue named `{work_queue_name}` "
            "that uses legacy tag-based matching.",
            style="red",
        )

        async with get_client() as client:
            try:
                work_queue = await client.read_work_queue_by_name(work_queue_name)
                if work_queue.filter is None:
                    # ensure the work queue has legacy (deprecated) tag-based behavior
                    await client.update_work_queue(filter=dict(tags=tags))
            except ObjectNotFound:
                # if the work queue doesn't already exist, we create it with tags
                # to enable legacy (deprecated) tag-matching behavior
                await client.create_work_queue(name=work_queue_name, tags=tags)

        work_queues = [work_queue_name]

    if not hide_welcome:
        if api:
            app.console.print(
                f"Starting v{prefect.__version__} agent connected to {api}..."
            )
        else:
            app.console.print(
                f"Starting v{prefect.__version__} agent with ephemeral API..."
            )

    async with OrionAgent(work_queues=work_queues) as agent:
        if not hide_welcome:
            app.console.print(ascii_name)
            app.console.print(
                "Agent started! Looking for work from "
                f"queue(s): {', '.join(work_queues)}..."
            )

        await critical_service_loop(
            agent.get_and_submit_flow_runs,
            PREFECT_AGENT_QUERY_INTERVAL.value(),
            printer=app.console.print,
        )

    app.console.print("Agent stopped!")