Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
prefect / cli / dev.py
Size: Mime:
"""
Command line interface for working with Orion
"""
import json
import os
import platform
import shutil
import subprocess
import sys
import textwrap
import time
from functools import partial
from string import Template
from typing import List

import anyio
import typer

import prefect
from prefect.cli._types import PrefectTyper, SettingsOption
from prefect.cli._utilities import exit_with_error, exit_with_success
from prefect.cli.agent import start as start_agent
from prefect.cli.root import app
from prefect.docker import get_prefect_image_name, python_version_minor
from prefect.orion.api.server import create_app
from prefect.settings import (
    PREFECT_API_URL,
    PREFECT_CLI_COLORS,
    PREFECT_CLI_WRAP_LINES,
    PREFECT_ORION_API_HOST,
    PREFECT_ORION_API_PORT,
)
from prefect.utilities.filesystem import tmpchdir
from prefect.utilities.processutils import run_process

DEV_HELP = """
Commands for development.

Note that many of these commands require extra dependencies (such as npm and MkDocs)
to function properly.
"""
dev_app = PrefectTyper(
    name="dev", short_help="Commands for development.", help=DEV_HELP
)
app.add_typer(dev_app)


def exit_with_error_if_not_editable_install():
    if (
        prefect.__module_path__.parent == "site-packages"
        or not (prefect.__root_path__ / "setup.py").exists()
    ):
        exit_with_error(
            "Development commands require an editable Prefect installation. "
            "Development commands require content outside of the 'prefect' module  "
            "which is not available when installed into your site-packages. "
            f"Detected module path: {prefect.__module_path__}."
        )


def agent_process_entrypoint(**kwargs):
    """
    An entrypoint for starting an agent in a subprocess. Adds a Rich console
    to the Typer app, processes Typer default parameters, then starts an agent.
    All kwargs are forwarded to  `prefect.cli.agent.start`.
    """
    import inspect

    import rich

    # import locally so only the `dev` command breaks if Typer internals change
    from typer.models import ParameterInfo

    # Typer does not process default parameters when calling a function
    # directly, so we must set `start_agent`'s default parameters manually.
    # get the signature of the `start_agent` function
    start_agent_signature = inspect.signature(start_agent)

    # for any arguments not present in kwargs, use the default value.
    for name, param in start_agent_signature.parameters.items():
        if name not in kwargs:
            # All `param.default` values for start_agent are Typer params that store the
            # actual default value in their `default` attribute and we must call
            # `param.default.default` to get the actual default value. We should also
            # ensure we extract the right default if non-Typer defaults are added
            # to `start_agent` in the future.
            if isinstance(param.default, ParameterInfo):
                default = param.default.default
            else:
                default = param.default

            # Some defaults are Prefect `SettingsOption.value` methods
            # that must be called to get the actual value.
            kwargs[name] = default() if callable(default) else default

    # add a console, because calling the agent start function directly
    # instead of via CLI call means `app` has no `console` attached.
    app.console = (
        rich.console.Console(
            highlight=False,
            color_system="auto" if PREFECT_CLI_COLORS else None,
            soft_wrap=not PREFECT_CLI_WRAP_LINES.value(),
        )
        if not getattr(app, "console", None)
        else app.console
    )

    try:
        start_agent(**kwargs)  # type: ignore
    except KeyboardInterrupt:
        # expected when watchfiles kills the process
        pass


@dev_app.command()
def build_docs(
    schema_path: str = None,
):
    """
    Builds REST API reference documentation for static display.
    """
    exit_with_error_if_not_editable_install()
    schema = create_app(ephemeral=True).openapi()

    if not schema_path:
        schema_path = (
            prefect.__root_path__ / "docs" / "api-ref" / "schema.json"
        ).absolute()
    # overwrite info for display purposes
    schema["info"] = {}
    with open(schema_path, "w") as f:
        json.dump(schema, f)
    app.console.print(f"OpenAPI schema written to {schema_path}")


BUILD_UI_HELP = f"""
Installs dependencies and builds UI locally.

The built UI will be located at {prefect.__root_path__ / "orion-ui"}

Requires npm.
"""


@dev_app.command(help=BUILD_UI_HELP)
def build_ui():
    exit_with_error_if_not_editable_install()
    with tmpchdir(prefect.__root_path__):
        with tmpchdir(prefect.__root_path__ / "orion-ui"):

            app.console.print("Installing npm packages...")
            try:
                subprocess.check_output(
                    ["npm", "ci", "install"], shell=sys.platform == "win32"
                )
            except Exception:
                app.console.print(
                    "npm call failed - try running `nvm use` first.", style="red"
                )
                raise

            app.console.print("Building for distribution...")
            env = os.environ.copy()
            env["ORION_UI_SERVE_BASE"] = "/"
            subprocess.check_output(
                ["npm", "run", "build"], env=env, shell=sys.platform == "win32"
            )

        if os.path.exists(prefect.__ui_static_path__):
            app.console.print("Removing existing build files...")
            shutil.rmtree(prefect.__ui_static_path__)

        app.console.print("Copying build into src...")
        shutil.copytree("orion-ui/dist", prefect.__ui_static_path__)

    app.console.print("Complete!")


@dev_app.command()
async def ui():
    """
    Starts a hot-reloading development UI.
    """
    exit_with_error_if_not_editable_install()
    with tmpchdir(prefect.__root_path__):
        with tmpchdir(prefect.__root_path__ / "orion-ui"):
            app.console.print("Installing npm packages...")
            subprocess.check_output(["npm", "install"], shell=sys.platform == "win32")

            app.console.print("Starting UI development server...")
            await run_process(command=["npm", "run", "serve"], stream_output=True)


@dev_app.command()
async def api(
    host: str = SettingsOption(PREFECT_ORION_API_HOST),
    port: int = SettingsOption(PREFECT_ORION_API_PORT),
    log_level: str = "DEBUG",
    services: bool = True,
):
    """
    Starts a hot-reloading development API.
    """
    import watchfiles

    server_env = os.environ.copy()
    server_env["PREFECT_ORION_SERVICES_RUN_IN_APP"] = str(services)
    server_env["PREFECT_ORION_SERVICES_UI"] = "False"
    server_env["PREFECT_ORION_UI_API_URL"] = f"http://{host}:{port}/api"

    command = [
        "uvicorn",
        "--factory",
        "prefect.orion.api.server:create_app",
        "--host",
        str(host),
        "--port",
        str(port),
        "--log-level",
        log_level.lower(),
    ]

    app.console.print(f"Running: {' '.join(command)}")
    import signal

    stop_event = anyio.Event()
    start_command = partial(
        run_process, command=command, env=server_env, stream_output=True
    )

    async with anyio.create_task_group() as tg:
        try:
            server_pid = await tg.start(start_command)
            async for _ in watchfiles.awatch(
                prefect.__module_path__, stop_event=stop_event  # type: ignore
            ):
                # when any watched files change, restart the server
                app.console.print("Restarting Prefect Server...")
                os.kill(server_pid, signal.SIGTERM)  # type: ignore
                # start a new server
                server_pid = await tg.start(start_command)
        except RuntimeError as err:
            # a bug in watchfiles causes an 'Already borrowed' error from Rust when
            # exiting: https://github.com/samuelcolvin/watchfiles/issues/200
            if str(err).strip() != "Already borrowed":
                raise
        except KeyboardInterrupt:
            # exit cleanly on ctrl-c by killing the server process if it's
            # still running
            try:
                os.kill(server_pid, signal.SIGTERM)  # type: ignore
            except ProcessLookupError:
                # process already exited
                pass

            stop_event.set()


@dev_app.command()
async def agent(
    api_url: str = SettingsOption(PREFECT_API_URL),
    work_queues: List[str] = typer.Option(
        ["default"],
        "-q",
        "--work-queue",
        help="One or more work queue names for the agent to pull from.",
    ),
):
    """
    Starts a hot-reloading development agent process.
    """
    # Delayed import since this is only a 'dev' dependency
    import watchfiles

    app.console.print("Creating hot-reloading agent process...")

    try:
        await watchfiles.arun_process(
            prefect.__module_path__,
            target=agent_process_entrypoint,
            kwargs=dict(api=api_url, work_queues=work_queues),
        )
    except RuntimeError as err:
        # a bug in watchfiles causes an 'Already borrowed' error from Rust when
        # exiting: https://github.com/samuelcolvin/watchfiles/issues/200
        if str(err).strip() != "Already borrowed":
            raise


@dev_app.command()
async def start(
    exclude_api: bool = typer.Option(False, "--no-api"),
    exclude_ui: bool = typer.Option(False, "--no-ui"),
    exclude_agent: bool = typer.Option(False, "--no-agent"),
    work_queues: List[str] = typer.Option(
        ["default"],
        "-q",
        "--work-queue",
        help="One or more work queue names for the dev agent to pull from.",
    ),
):
    """
    Starts a hot-reloading development server with API, UI, and agent processes.

    Each service has an individual command if you wish to start them separately.
    Each service can be excluded here as well.
    """
    async with anyio.create_task_group() as tg:
        if not exclude_api:
            tg.start_soon(
                partial(
                    api,
                    host=PREFECT_ORION_API_HOST.value(),
                    port=PREFECT_ORION_API_PORT.value(),
                )
            )
        if not exclude_ui:
            tg.start_soon(ui)
        if not exclude_agent:
            # Hook the agent to the hosted API if running
            if not exclude_api:
                host = f"http://{PREFECT_ORION_API_HOST.value()}:{PREFECT_ORION_API_PORT.value()}/api"  # noqa
            else:
                host = PREFECT_API_URL.value()
            tg.start_soon(agent, host, work_queues)


@dev_app.command()
def build_image(
    arch: str = typer.Option(
        None,
        help=(
            "The architecture to build the container for. "
            "Defaults to the architecture of the host Python. "
            f"[default: {platform.machine()}]"
        ),
    ),
    python_version: str = typer.Option(
        None,
        help=(
            "The Python version to build the container for. "
            "Defaults to the version of the host Python. "
            f"[default: {python_version_minor()}]"
        ),
    ),
    flavor: str = typer.Option(
        None,
        help=(
            "An alternative flavor to build, for example 'conda'. "
            "Defaults to the standard Python base image"
        ),
    ),
    dry_run: bool = False,
):
    """
    Build a docker image for development.
    """
    exit_with_error_if_not_editable_install()
    # TODO: Once https://github.com/tiangolo/typer/issues/354 is addresesd, the
    #       default can be set in the function signature
    arch = arch or platform.machine()
    python_version = python_version or python_version_minor()

    tag = get_prefect_image_name(python_version=python_version, flavor=flavor)

    # Here we use a subprocess instead of the docker-py client to easily stream output
    # as it comes
    command = [
        "docker",
        "build",
        str(prefect.__root_path__),
        "--tag",
        tag,
        "--platform",
        f"linux/{arch}",
        "--build-arg",
        "PREFECT_EXTRAS=[dev]",
        "--build-arg",
        f"PYTHON_VERSION={python_version}",
    ]

    if flavor:
        command += ["--build-arg", f"BASE_IMAGE=prefect-{flavor}"]

    if dry_run:
        print(" ".join(command))
        return

    try:
        subprocess.check_call(command, shell=sys.platform == "win32")
    except subprocess.CalledProcessError:
        exit_with_error("Failed to build image!")
    else:
        exit_with_success(f"Built image {tag!r} for linux/{arch}")


@dev_app.command()
def container(bg: bool = False, name="prefect-dev", api: bool = True):
    """
    Run a docker container with local code mounted and installed.
    """
    exit_with_error_if_not_editable_install()
    import docker
    from docker.models.containers import Container

    client = docker.from_env()

    containers = client.containers.list()
    container_names = {container.name for container in containers}
    if name in container_names:
        exit_with_error(
            f"Container {name!r} already exists. Specify a different name or stop "
            "the existing container."
        )

    blocking_cmd = "prefect dev api" if api else "sleep infinity"
    tag = get_prefect_image_name()

    container: Container = client.containers.create(
        image=tag,
        command=[
            "/bin/bash",
            "-c",
            f"pip install -e /opt/prefect/repo\\[dev\\] && touch /READY && {blocking_cmd}",  # noqa
        ],
        name=name,
        auto_remove=True,
        working_dir="/opt/prefect/repo",
        volumes=[f"{prefect.__root_path__}:/opt/prefect/repo"],
        shm_size="4G",
    )

    print(f"Starting container for image {tag!r}...")
    container.start()

    print("Waiting for installation to complete", end="", flush=True)
    try:
        ready = False
        while not ready:
            print(".", end="", flush=True)
            result = container.exec_run("test -f /READY")
            ready = result.exit_code == 0
            if not ready:
                time.sleep(3)
    except BaseException:
        print("\nInterrupted. Stopping container...")
        container.stop()
        raise

    print(
        textwrap.dedent(
            f"""
            Container {container.name!r} is ready! To connect to the container, run:

                docker exec -it {container.name} /bin/bash
            """
        )
    )

    if bg:
        print(
            textwrap.dedent(
                f"""
                The container will run forever. Stop the container with:

                    docker stop {container.name}
                """
            )
        )
        # Exit without stopping
        return

    try:
        print("Send a keyboard interrupt to exit...")
        container.wait()
    except KeyboardInterrupt:
        pass  # Avoid showing "Abort"
    finally:
        print("\nStopping container...")
        container.stop()


@dev_app.command()
def kubernetes_manifest():
    """
    Generates a Kubernetes manifest for development.

    Example:
        $ prefect dev kubernetes-manifest | kubectl apply -f -
    """
    exit_with_error_if_not_editable_install()

    template = Template(
        (
            prefect.__module_path__ / "cli" / "templates" / "kubernetes-dev.yaml"
        ).read_text()
    )
    manifest = template.substitute(
        {
            "prefect_root_directory": prefect.__root_path__,
            "image_name": get_prefect_image_name(),
        }
    )
    print(manifest)