Repository URL to install this package:
|
Version:
2.8.0 ▾
|
prefect
/
docker.py
|
|---|
import hashlib
import os
import shutil
import sys
import warnings
from contextlib import contextmanager
from pathlib import Path, PurePosixPath
from tempfile import TemporaryDirectory
from types import TracebackType
from typing import (
TYPE_CHECKING,
Generator,
Iterable,
List,
Optional,
TextIO,
Tuple,
Type,
Union,
)
from urllib.parse import urlsplit
import pendulum
from typing_extensions import Self
import prefect
from prefect.utilities.importtools import lazy_import
from prefect.utilities.slugify import slugify
def python_version_minor() -> str:
return f"{sys.version_info.major}.{sys.version_info.minor}"
def python_version_micro() -> str:
return f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}"
def get_prefect_image_name(
prefect_version: str = None, python_version: str = None, flavor: str = None
) -> str:
"""
Get the Prefect image name matching the current Prefect and Python versions.
Args:
prefect_version: An optional override for the Prefect version.
python_version: An optional override for the Python version; must be at the
minor level e.g. '3.9'.
flavor: An optional alternative image flavor to build, like 'conda'
"""
parsed_version = (prefect_version or prefect.__version__).split("+")
prefect_version = parsed_version[0] if len(parsed_version) == 1 else "dev"
python_version = python_version or python_version_minor()
tag = slugify(
f"{prefect_version}-python{python_version}" + (f"-{flavor}" if flavor else ""),
lowercase=False,
max_length=128,
# Docker allows these characters for tag names
regex_pattern=r"[^a-zA-Z0-9_.-]+",
)
return f"prefecthq/prefect:{tag}"
@contextmanager
def silence_docker_warnings() -> Generator[None, None, None]:
with warnings.catch_warnings():
# Silence warnings due to use of deprecated methods within dockerpy
# See https://github.com/docker/docker-py/pull/2931
warnings.filterwarnings(
"ignore",
message="distutils Version classes are deprecated.*",
category=DeprecationWarning,
)
warnings.filterwarnings(
"ignore",
message="The distutils package is deprecated and slated for removal.*",
category=DeprecationWarning,
)
yield
# docker-py has some deprecation warnings that fire off during import, and we don't
# want to have those popping up in various modules and test suites. Instead,
# consolidate the imports we need here, and expose them via this module.
with silence_docker_warnings():
if TYPE_CHECKING:
import docker
from docker import DockerClient
else:
docker = lazy_import("docker")
@contextmanager
def docker_client() -> Generator["DockerClient", None, None]:
"""Get the environmentally-configured Docker client"""
with silence_docker_warnings():
client = docker.DockerClient.from_env()
try:
yield client
finally:
client.close()
class BuildError(Exception):
"""Raised when a Docker build fails"""
# Labels to apply to all images built with Prefect
IMAGE_LABELS = {
"io.prefect.version": prefect.__version__,
}
@silence_docker_warnings()
def build_image(
context: Path,
dockerfile: str = "Dockerfile",
pull: bool = False,
platform: str = None,
stream_progress_to: Optional[TextIO] = None,
) -> str:
"""Builds a Docker image, returning the image ID
Args:
context: the root directory for the Docker build context
dockerfile: the path to the Dockerfile, relative to the context
pull: True to pull the base image during the build
stream_progress_to: an optional stream (like sys.stdout, or an io.TextIO) that
will collect the build output as it is reported by Docker
Returns:
The image ID
"""
if not context:
raise ValueError("context required to build an image")
if not Path(context).exists():
raise ValueError(f"Context path {context} does not exist")
image_id = None
with docker_client() as client:
events = client.api.build(
path=str(context),
dockerfile=dockerfile,
pull=pull,
decode=True,
labels=IMAGE_LABELS,
platform=platform,
)
try:
for event in events:
if "stream" in event:
if not stream_progress_to:
continue
stream_progress_to.write(event["stream"])
stream_progress_to.flush()
elif "aux" in event:
image_id = event["aux"]["ID"]
elif "error" in event:
raise BuildError(event["error"])
elif "message" in event:
raise BuildError(event["message"])
except docker.errors.APIError as e:
raise BuildError(e.explanation) from e
assert image_id, "The Docker daemon did not return an image ID"
return image_id
class ImageBuilder:
"""An interface for preparing Docker build contexts and building images"""
base_directory: Path
context: Optional[Path]
platform: Optional[str]
dockerfile_lines: List[str]
def __init__(
self,
base_image: str,
base_directory: Path = None,
platform: str = None,
context: Path = None,
):
"""Create an ImageBuilder
Args:
base_image: the base image to use
base_directory: the starting point on your host for relative file locations,
defaulting to the current directory
context: use this path as the build context (if not provided, will create a
temporary directory for the context)
Returns:
The image ID
"""
self.base_directory = base_directory or context or Path().absolute()
self.temporary_directory = None
self.context = context
self.platform = platform
self.dockerfile_lines = []
if self.context:
dockerfile_path: Path = self.context / "Dockerfile"
if dockerfile_path.exists():
raise ValueError(f"There is already a Dockerfile at {context}")
self.add_line(f"FROM {base_image}")
def __enter__(self) -> Self:
if self.context and not self.temporary_directory:
return self
self.temporary_directory = TemporaryDirectory()
self.context = Path(self.temporary_directory.__enter__())
return self
def __exit__(
self, exc: Type[BaseException], value: BaseException, traceback: TracebackType
) -> None:
if not self.temporary_directory:
return
self.temporary_directory.__exit__(exc, value, traceback)
self.temporary_directory = None
self.context = None
def add_line(self, line: str) -> None:
"""Add a line to this image's Dockerfile"""
self.add_lines([line])
def add_lines(self, lines: Iterable[str]) -> None:
"""Add lines to this image's Dockerfile"""
self.dockerfile_lines.extend(lines)
def copy(self, source: Union[str, Path], destination: Union[str, PurePosixPath]):
"""Copy a file to this image"""
if not self.context:
raise Exception("No context available")
if not isinstance(destination, PurePosixPath):
destination = PurePosixPath(destination)
if not isinstance(source, Path):
source = Path(source)
if source.is_absolute():
source = source.resolve().relative_to(self.base_directory)
if self.temporary_directory:
os.makedirs(self.context / source.parent, exist_ok=True)
if source.is_dir():
shutil.copytree(self.base_directory / source, self.context / source)
else:
shutil.copy2(self.base_directory / source, self.context / source)
self.add_line(f"COPY {source} {destination}")
def write_text(self, text: str, destination: Union[str, PurePosixPath]):
if not self.context:
raise Exception("No context available")
if not isinstance(destination, PurePosixPath):
destination = PurePosixPath(destination)
source_hash = hashlib.sha256(text.encode()).hexdigest()
(self.context / f".{source_hash}").write_text(text)
self.add_line(f"COPY .{source_hash} {destination}")
def build(
self, pull: bool = False, stream_progress_to: Optional[TextIO] = None
) -> str:
"""Build the Docker image from the current state of the ImageBuilder
Args:
pull: True to pull the base image during the build
stream_progress_to: an optional stream (like sys.stdout, or an io.TextIO)
that will collect the build output as it is reported by Docker
Returns:
The image ID
"""
dockerfile_path: Path = self.context / "Dockerfile"
with dockerfile_path.open("w") as dockerfile:
dockerfile.writelines(line + "\n" for line in self.dockerfile_lines)
try:
return build_image(
self.context,
platform=self.platform,
pull=pull,
stream_progress_to=stream_progress_to,
)
finally:
os.unlink(dockerfile_path)
def assert_has_line(self, line: str) -> None:
"""Asserts that the given line is in the Dockerfile"""
all_lines = "\n".join(
[f" {i+1:>3}: {line}" for i, line in enumerate(self.dockerfile_lines)]
)
message = (
f"Expected {line!r} not found in Dockerfile. Dockerfile:\n{all_lines}"
)
assert line in self.dockerfile_lines, message
def assert_line_absent(self, line: str) -> None:
"""Asserts that the given line is absent from the Dockerfile"""
if line not in self.dockerfile_lines:
return
i = self.dockerfile_lines.index(line)
surrounding_lines = "\n".join(
[
f" {i+1:>3}: {line}"
for i, line in enumerate(self.dockerfile_lines[i - 2 : i + 2])
]
)
message = (
f"Unexpected {line!r} found in Dockerfile at line {i+1}. "
f"Surrounding lines:\n{surrounding_lines}"
)
assert line not in self.dockerfile_lines, message
def assert_line_before(self, first: str, second: str) -> None:
"""Asserts that the first line appears before the second line"""
self.assert_has_line(first)
self.assert_has_line(second)
first_index = self.dockerfile_lines.index(first)
second_index = self.dockerfile_lines.index(second)
surrounding_lines = "\n".join(
[
f" {i+1:>3}: {line}"
for i, line in enumerate(
self.dockerfile_lines[second_index - 2 : first_index + 2]
)
]
)
message = (
f"Expected {first!r} to appear before {second!r} in the Dockerfile, but "
f"{first!r} was at line {first_index+1} and {second!r} as at line "
f"{second_index+1}. Surrounding lines:\n{surrounding_lines}"
)
assert first_index < second_index, message
def assert_line_after(self, second: str, first: str) -> None:
"""Asserts that the second line appears after the first line"""
self.assert_line_before(first, second)
def assert_has_file(self, source: Path, container_path: PurePosixPath) -> None:
"""Asserts that the given file or directory will be copied into the container
at the given path"""
if source.is_absolute():
source = source.relative_to(self.base_directory)
self.assert_has_line(f"COPY {source} {container_path}")
class PushError(Exception):
"""Raised when a Docker image push fails"""
@silence_docker_warnings()
def push_image(
image_id: str,
registry_url: str,
name: str,
tag: Optional[str] = None,
stream_progress_to: Optional[TextIO] = None,
) -> str:
"""Pushes a local image to a Docker registry, returning the registry-qualified tag
for that image
This assumes that the environment's Docker daemon is already authenticated to the
given registry, and currently makes no attempt to authenticate.
Args:
image_id (str): a Docker image ID
registry_url (str): the URL of a Docker registry
name (str): the name of this image
tag (str): the tag to give this image (defaults to a short representation of
the image's ID)
stream_progress_to: an optional stream (like sys.stdout, or an io.TextIO) that
will collect the build output as it is reported by Docker
Returns:
A registry-qualified tag, like my-registry.example.com/my-image:abcdefg
"""
if not tag:
tag = slugify(pendulum.now("utc").isoformat())
_, registry, _, _, _ = urlsplit(registry_url)
repository = f"{registry}/{name}"
with docker_client() as client:
image: Image = client.images.get(image_id)
image.tag(repository, tag=tag)
events = client.api.push(repository, tag=tag, stream=True, decode=True)
try:
for event in events:
if "status" in event:
if not stream_progress_to:
continue
stream_progress_to.write(event["status"])
if "progress" in event:
stream_progress_to.write(" " + event["progress"])
stream_progress_to.write("\n")
stream_progress_to.flush()
elif "error" in event:
raise PushError(event["error"])
finally:
client.api.remove_image(f"{repository}:{tag}", noprune=True)
return f"{repository}:{tag}"
def to_run_command(command: List[str]) -> str:
"""
Convert a process-style list of command arguments to a single Dockerfile RUN
instruction.
"""
if not command:
return ""
run_command = f"RUN {command[0]}"
if len(command) > 1:
run_command += " " + " ".join([repr(arg) for arg in command[1:]])
# TODO: Consider performing text-wrapping to improve readability of the generated
# Dockerfile
# return textwrap.wrap(
# run_command,
# subsequent_indent=" " * 4,
# break_on_hyphens=False,
# break_long_words=False
# )
return run_command
def parse_image_tag(name: str) -> Tuple[str, Optional[str]]:
"""
Parse Docker Image String
- If a tag exists, this function parses and returns the image registry and tag, separately
as a tuple.
- Example 1: 'prefecthq/prefect:latest' -> ('prefecthq/prefect', 'latest')
- Example 2: 'hostname.io:5050/folder/subfolder:latest' -> ('hostname.io:5050/folder/subfolder', 'latest')
- Supports parsing Docker Image strings that follow Docker Image Specification v1.1.0
- Image building tools typically enforce this standard
Args:
name (str): Name of Docker Image
Return:
tuple: image registry, image tag
"""
tag = None
name_parts = name.split("/")
# First handles the simplest image names (DockerHub-based, index-free, potentionally with a tag)
# - Example: simplename:latest
if len(name_parts) == 1:
if ":" in name_parts[0]:
image_name, tag = name_parts[0].split(":")
else:
image_name = name_parts[0]
else:
# 1. Separates index (hostname.io or prefecthq) from path:tag (folder/subfolder:latest or prefect:latest)
# 2. Separates path and tag (if tag exists)
# 3. Reunites index and path (without tag) as image name
index_name = name_parts[0]
image_path = "/".join(name_parts[1:])
if ":" in image_path:
image_path, tag = image_path.split(":")
image_name = f"{index_name}/{image_path}"
return image_name, tag