Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
prefect / blocks / core.py
Size: Mime:
import hashlib
import inspect
import sys
import warnings
from abc import ABC
from textwrap import dedent
from typing import (
    TYPE_CHECKING,
    Any,
    Dict,
    FrozenSet,
    List,
    Optional,
    Type,
    TypeVar,
    Union,
)
from uuid import UUID, uuid4

from griffe.dataclasses import Docstring
from griffe.docstrings.dataclasses import DocstringSection, DocstringSectionKind
from griffe.docstrings.parsers import Parser, parse
from packaging.version import InvalidVersion, Version
from pydantic import BaseModel, HttpUrl, SecretBytes, SecretStr, ValidationError
from typing_extensions import ParamSpec, Self, get_args, get_origin

import prefect
import prefect.exceptions
from prefect.blocks.fields import SecretDict
from prefect.client.utilities import inject_client
from prefect.logging.loggers import disable_logger
from prefect.orion.schemas.core import (
    DEFAULT_BLOCK_SCHEMA_VERSION,
    BlockDocument,
    BlockSchema,
    BlockType,
)
from prefect.utilities.asyncutils import sync_compatible
from prefect.utilities.collections import listrepr, remove_nested_keys
from prefect.utilities.dispatch import lookup_type, register_base_type
from prefect.utilities.hashing import hash_objects
from prefect.utilities.importtools import to_qualified_name
from prefect.utilities.slugify import slugify

if TYPE_CHECKING:
    from prefect.client.orion import OrionClient

R = TypeVar("R")
P = ParamSpec("P")


def block_schema_to_key(schema: BlockSchema) -> str:
    """
    Defines the unique key used to lookup the Block class for a given schema.
    """
    return f"{schema.block_type.slug}"


class InvalidBlockRegistration(Exception):
    """
    Raised on attempted registration of the base Block
    class or a Block interface class
    """


def _collect_nested_reference_strings(obj: Dict):
    """
    Collects all nested reference strings (e.g. #/definitions/Model) from a given object.
    """
    found_reference_strings = []
    if isinstance(obj, dict):
        if obj.get("$ref"):
            found_reference_strings.append(obj.get("$ref"))
        for value in obj.values():
            found_reference_strings.extend(_collect_nested_reference_strings(value))
    if isinstance(obj, list):
        for item in obj:
            found_reference_strings.extend(_collect_nested_reference_strings(item))
    return found_reference_strings


def _get_non_block_reference_definitions(object_definition: Dict, definitions: Dict):
    """
    Given a definition of an object in a block schema OpenAPI spec and the dictionary
    of all reference definitions in that same block schema OpenAPI spec, return the
    definitions for objects that are referenced from the object or any children of
    the object that do not reference a block.
    """
    non_block_definitions = {}
    reference_strings = _collect_nested_reference_strings(object_definition)
    for reference_string in reference_strings:
        definition_key = reference_string.replace("#/definitions/", "")
        definition = definitions.get(definition_key)
        if definition and definition.get("block_type_slug") is None:
            non_block_definitions = {
                **non_block_definitions,
                definition_key: definition,
                **_get_non_block_reference_definitions(definition, definitions),
            }
    return non_block_definitions


def _is_subclass(cls, parent_cls) -> bool:
    """
    Checks if a given class is a subclass of another class. Unlike issubclass,
    this will not throw an exception if cls is an instance instead of a type.
    """
    return inspect.isclass(cls) and issubclass(cls, parent_cls)


def _collect_secret_fields(name: str, type_: Type, secrets: List[str]) -> None:
    """
    Recursively collects all secret fields from a given type and adds them to the
    secrets list, supporting nested Union / BaseModel fields. Also, note, this function
    mutates the input secrets list, thus does not return anything.
    """
    if get_origin(type_) is Union:
        for union_type in get_args(type_):
            _collect_secret_fields(name, union_type, secrets)
        return
    elif _is_subclass(type_, BaseModel):
        for field in type_.__fields__.values():
            _collect_secret_fields(f"{name}.{field.name}", field.type_, secrets)
        return

    if type_ in (SecretStr, SecretBytes):
        secrets.append(name)
    elif type_ == SecretDict:
        # Append .* to field name to signify that all values under this
        # field are secret and should be obfuscated.
        secrets.append(f"{name}.*")
    elif Block.is_block_class(type_):
        secrets.extend(f"{name}.{s}" for s in type_.schema()["secret_fields"])


@register_base_type
class Block(BaseModel, ABC):
    """
    A base class for implementing a block that wraps an external service.

    This class can be defined with an arbitrary set of fields and methods, and
    couples business logic with data contained in an block document.
    `_block_document_name`, `_block_document_id`, `_block_schema_id`, and
    `_block_type_id` are reserved by Orion as Block metadata fields, but
    otherwise a Block can implement arbitrary logic. Blocks can be instantiated
    without populating these metadata fields, but can only be used interactively,
    not with the Orion API.

    Instead of the __init__ method, a block implementation allows the
    definition of a `block_initialization` method that is called after
    initialization.
    """

    class Config:
        extra = "allow"

        json_encoders = {SecretDict: lambda v: v.dict()}

        @staticmethod
        def schema_extra(schema: Dict[str, Any], model: Type["Block"]):
            """
            Customizes Pydantic's schema generation feature to add blocks related information.
            """
            schema["block_type_slug"] = model.get_block_type_slug()
            # Ensures args and code examples aren't included in the schema
            description = model.get_description()
            if description:
                schema["description"] = description
            else:
                # Prevent the description of the base class from being included in the schema
                schema.pop("description", None)

            # create a list of secret field names
            # secret fields include both top-level keys and dot-delimited nested secret keys
            # A wildcard (*) means that all fields under a given key are secret.
            # for example: ["x", "y", "z.*", "child.a"]
            # means the top-level keys "x" and "y", all keys under "z", and the key "a" of a block
            # nested under the "child" key are all secret. There is no limit to nesting.
            secrets = schema["secret_fields"] = []
            for field in model.__fields__.values():
                _collect_secret_fields(field.name, field.type_, secrets)

            # create block schema references
            refs = schema["block_schema_references"] = {}
            for field in model.__fields__.values():
                if Block.is_block_class(field.type_):
                    refs[field.name] = field.type_._to_block_schema_reference_dict()
                if get_origin(field.type_) is Union:
                    for type_ in get_args(field.type_):
                        if Block.is_block_class(type_):
                            if isinstance(refs.get(field.name), list):
                                refs[field.name].append(
                                    type_._to_block_schema_reference_dict()
                                )
                            elif isinstance(refs.get(field.name), dict):
                                refs[field.name] = [
                                    refs[field.name],
                                    type_._to_block_schema_reference_dict(),
                                ]
                            else:
                                refs[
                                    field.name
                                ] = type_._to_block_schema_reference_dict()

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.block_initialization()

    def __str__(self) -> str:
        return self.__repr__()

    def __repr_args__(self):
        repr_args = super().__repr_args__()
        data_keys = self.schema()["properties"].keys()
        return [
            (key, value) for key, value in repr_args if key is None or key in data_keys
        ]

    def block_initialization(self) -> None:
        pass

    # -- private class variables
    # set by the class itself

    # Attribute to customize the name of the block type created
    # when the block is registered with Orion. If not set, block
    # type name will default to the class name.
    _block_type_name: Optional[str] = None
    _block_type_slug: Optional[str] = None

    # Attributes used to set properties on a block type when registered
    # with Orion.
    _logo_url: Optional[HttpUrl] = None
    _documentation_url: Optional[HttpUrl] = None
    _description: Optional[str] = None
    _code_example: Optional[str] = None

    # -- private instance variables
    # these are set when blocks are loaded from the API
    _block_type_id: Optional[UUID] = None
    _block_schema_id: Optional[UUID] = None
    _block_schema_capabilities: Optional[List[str]] = None
    _block_schema_version: Optional[str] = None
    _block_document_id: Optional[UUID] = None
    _block_document_name: Optional[str] = None
    _is_anonymous: Optional[bool] = None

    @classmethod
    def __dispatch_key__(cls):
        if cls.__name__ == "Block":
            return None  # The base class is abstract
        return block_schema_to_key(cls._to_block_schema())

    @classmethod
    def get_block_type_name(cls):
        return cls._block_type_name or cls.__name__

    @classmethod
    def get_block_type_slug(cls):
        return slugify(cls._block_type_slug or cls.get_block_type_name())

    @classmethod
    def get_block_capabilities(cls) -> FrozenSet[str]:
        """
        Returns the block capabilities for this Block. Recursively collects all block
        capabilities of all parent classes into a single frozenset.
        """
        return frozenset(
            {
                c
                for base in (cls,) + cls.__mro__
                for c in getattr(base, "_block_schema_capabilities", []) or []
            }
        )

    @classmethod
    def _get_current_package_version(cls):
        current_module = inspect.getmodule(cls)
        if current_module:
            top_level_module = sys.modules[
                current_module.__name__.split(".")[0] or "__main__"
            ]
            try:
                version = Version(top_level_module.__version__)
                # Strips off any local version information
                return version.base_version
            except (AttributeError, InvalidVersion):
                # Module does not have a __version__ attribute or is not a parsable format
                pass
        return DEFAULT_BLOCK_SCHEMA_VERSION

    @classmethod
    def get_block_schema_version(cls) -> str:
        return cls._block_schema_version or cls._get_current_package_version()

    @classmethod
    def _to_block_schema_reference_dict(cls):
        return dict(
            block_type_slug=cls.get_block_type_slug(),
            block_schema_checksum=cls._calculate_schema_checksum(),
        )

    @classmethod
    def _calculate_schema_checksum(
        cls, block_schema_fields: Optional[Dict[str, Any]] = None
    ):
        """
        Generates a unique hash for the underlying schema of block.

        Args:
            block_schema_fields: Dictionary detailing block schema fields to generate a
                checksum for. The fields of the current class is used if this parameter
                is not provided.

        Returns:
            str: The calculated checksum prefixed with the hashing algorithm used.
        """
        block_schema_fields = (
            cls.schema() if block_schema_fields is None else block_schema_fields
        )
        fields_for_checksum = remove_nested_keys(["secret_fields"], block_schema_fields)
        if fields_for_checksum.get("definitions"):
            non_block_definitions = _get_non_block_reference_definitions(
                fields_for_checksum, fields_for_checksum["definitions"]
            )
            if non_block_definitions:
                fields_for_checksum["definitions"] = non_block_definitions
            else:
                # Pop off definitions entirely instead of empty dict for consistency
                # with the OpenAPI specification
                fields_for_checksum.pop("definitions")
        checksum = hash_objects(fields_for_checksum, hash_algo=hashlib.sha256)
        if checksum is None:
            raise ValueError("Unable to compute checksum for block schema")
        else:
            return f"sha256:{checksum}"

    def _to_block_document(
        self,
        name: Optional[str] = None,
        block_schema_id: Optional[UUID] = None,
        block_type_id: Optional[UUID] = None,
        is_anonymous: Optional[bool] = None,
    ) -> BlockDocument:
        """
        Creates the corresponding block document based on the data stored in a block.
        The corresponding block document name, block type ID, and block schema ID must
        either be passed into the method or configured on the block.

        Args:
            name: The name of the created block document. Not required if anonymous.
            block_schema_id: UUID of the corresponding block schema.
            block_type_id: UUID of the corresponding block type.
            is_anonymous: if True, an anonymous block is created. Anonymous
                blocks are not displayed in the UI and used primarily for system
                operations and features that need to automatically generate blocks.

        Returns:
            BlockDocument: Corresponding block document
                populated with the block's configured data.
        """
        if is_anonymous is None:
            is_anonymous = self._is_anonymous or False

        # name must be present if not anonymous
        if not is_anonymous and not name and not self._block_document_name:
            raise ValueError("No name provided, either as an argument or on the block.")

        if not block_schema_id and not self._block_schema_id:
            raise ValueError(
                "No block schema ID provided, either as an argument or on the block."
            )
        if not block_type_id and not self._block_type_id:
            raise ValueError(
                "No block type ID provided, either as an argument or on the block."
            )

        # The keys passed to `include` must NOT be aliases, else some items will be missed
        # i.e. must do `self.schema_` vs `self.schema` to get a `schema_ = Field(alias="schema")`
        # reported from https://github.com/PrefectHQ/prefect-dbt/issues/54
        data_keys = self.schema(by_alias=False)["properties"].keys()

        # `block_document_data`` must return the aliased version for it to show in the UI
        block_document_data = self.dict(by_alias=True, include=data_keys)

        # Iterate through and find blocks that already have saved block documents to
        # create references to those saved block documents.
        for key in data_keys:
            field_value = getattr(self, key)
            if (
                isinstance(field_value, Block)
                and field_value._block_document_id is not None
            ):
                block_document_data[key] = {
                    "$ref": {"block_document_id": field_value._block_document_id}
                }

        return BlockDocument(
            id=self._block_document_id or uuid4(),
            name=(name or self._block_document_name) if not is_anonymous else None,
            block_schema_id=block_schema_id or self._block_schema_id,
            block_type_id=block_type_id or self._block_type_id,
            data=block_document_data,
            block_schema=self._to_block_schema(
                block_type_id=block_type_id or self._block_type_id,
            ),
            block_type=self._to_block_type(),
            is_anonymous=is_anonymous,
        )

    @classmethod
    def _to_block_schema(cls, block_type_id: Optional[UUID] = None) -> BlockSchema:
        """
        Creates the corresponding block schema of the block.
        The corresponding block_type_id must either be passed into
        the method or configured on the block.

        Args:
            block_type_id: UUID of the corresponding block type.

        Returns:
            BlockSchema: The corresponding block schema.
        """
        fields = cls.schema()
        return BlockSchema(
            id=cls._block_schema_id if cls._block_schema_id is not None else uuid4(),
            checksum=cls._calculate_schema_checksum(),
            fields=fields,
            block_type_id=block_type_id or cls._block_type_id,
            block_type=cls._to_block_type(),
            capabilities=list(cls.get_block_capabilities()),
            version=cls.get_block_schema_version(),
        )

    @classmethod
    def _parse_docstring(cls) -> List[DocstringSection]:
        """
        Parses the docstring into list of DocstringSection objects.
        Helper method used primarily to suppress irrelevant logs, e.g.
        `<module>:11: No type or annotation for parameter 'write_json'`
        because griffe is unable to parse the types from pydantic.BaseModel.
        """
        with disable_logger("griffe.docstrings.google"):
            with disable_logger("griffe.agents.nodes"):
                docstring = Docstring(cls.__doc__)
                parsed = parse(docstring, Parser.google)
        return parsed

    @classmethod
    def get_description(cls) -> Optional[str]:
        """
        Returns the description for the current block. Attempts to parse
        description from class docstring if an override is not defined.
        """
        description = cls._description
        # If no description override has been provided, find the first text section
        # and use that as the description
        if description is None and cls.__doc__ is not None:
            parsed = cls._parse_docstring()
            parsed_description = next(
                (
                    section.as_dict().get("value")
                    for section in parsed
                    if section.kind == DocstringSectionKind.text
                ),
                None,
            )
            if isinstance(parsed_description, str):
                description = parsed_description.strip()
        return description

    @classmethod
    def get_code_example(cls) -> Optional[str]:
        """
        Returns the code example for the given block. Attempts to parse
        code example from the class docstring if an override is not provided.
        """
        code_example = (
            dedent(cls._code_example) if cls._code_example is not None else None
        )
        # If no code example override has been provided, attempt to find a examples
        # section or an admonition with the annotation "example" and use that as the
        # code example
        if code_example is None and cls.__doc__ is not None:
            parsed = cls._parse_docstring()
            for section in parsed:
                # Section kind will be "examples" if Examples section heading is used.
                if section.kind == DocstringSectionKind.examples:
                    # Examples sections are made up of smaller sections that need to be
                    # joined with newlines. Smaller sections are represented as tuples
                    # with shape (DocstringSectionKind, str)
                    code_example = "\n".join(
                        (part[1] for part in section.as_dict().get("value", []))
                    )
                    break
                # Section kind will be "admonition" if Example section heading is used.
                if section.kind == DocstringSectionKind.admonition:
                    value = section.as_dict().get("value", {})
                    if value.get("annotation") == "example":
                        code_example = value.get("description")
                        break

        if code_example is None:
            # If no code example has been specified or extracted from the class
            # docstring, generate a sensible default
            code_example = cls._generate_code_example()

        return code_example

    @classmethod
    def _generate_code_example(cls) -> str:
        """Generates a default code example for the current class"""
        qualified_name = to_qualified_name(cls)
        module_str = ".".join(qualified_name.split(".")[:-1])
        class_name = cls.__name__
        block_variable_name = f'{cls.get_block_type_slug().replace("-", "_")}_block'

        return dedent(
            f"""\
        ```python
        from {module_str} import {class_name}

        {block_variable_name} = {class_name}.load("BLOCK_NAME")
        ```"""
        )

    @classmethod
    def _to_block_type(cls) -> BlockType:
        """
        Creates the corresponding block type of the block.

        Returns:
            BlockType: The corresponding block type.
        """
        return BlockType(
            id=cls._block_type_id or uuid4(),
            slug=cls.get_block_type_slug(),
            name=cls.get_block_type_name(),
            logo_url=cls._logo_url,
            documentation_url=cls._documentation_url,
            description=cls.get_description(),
            code_example=cls.get_code_example(),
        )

    @classmethod
    def _from_block_document(cls, block_document: BlockDocument):
        """
        Instantiates a block from a given block document. The corresponding block class
        will be looked up in the block registry based on the corresponding block schema
        of the provided block document.

        Args:
            block_document: The block document used to instantiate a block.

        Raises:
            ValueError: If the provided block document doesn't have a corresponding block
                schema.

        Returns:
            Block: Hydrated block with data from block document.
        """
        if block_document.block_schema is None:
            raise ValueError(
                "Unable to determine block schema for provided block document"
            )

        block_cls = (
            cls
            if cls.__name__ != "Block"
            # Look up the block class by dispatch
            else cls.get_block_class_from_schema(block_document.block_schema)
        )

        block = block_cls.parse_obj(block_document.data)
        block._block_document_id = block_document.id
        block.__class__._block_schema_id = block_document.block_schema_id
        block.__class__._block_type_id = block_document.block_type_id
        block._block_document_name = block_document.name
        block._is_anonymous = block_document.is_anonymous
        block._define_metadata_on_nested_blocks(
            block_document.block_document_references
        )

        return block

    @classmethod
    def get_block_class_from_schema(cls: Type[Self], schema: BlockSchema) -> Type[Self]:
        """
        Retieve the block class implementation given a schema.
        """
        return lookup_type(cls, block_schema_to_key(schema))

    def _define_metadata_on_nested_blocks(
        self, block_document_references: Dict[str, Dict[str, Any]]
    ):
        """
        Recursively populates metadata fields on nested blocks based on the
        provided block document references.
        """
        for item in block_document_references.items():
            field_name, block_document_reference = item
            nested_block = getattr(self, field_name)
            if isinstance(nested_block, Block):
                nested_block_document_info = block_document_reference.get(
                    "block_document", {}
                )
                nested_block._define_metadata_on_nested_blocks(
                    nested_block_document_info.get("block_document_references", {})
                )
                nested_block_document_id = nested_block_document_info.get("id")
                nested_block._block_document_id = (
                    UUID(nested_block_document_id) if nested_block_document_id else None
                )
                nested_block._block_document_name = nested_block_document_info.get(
                    "name"
                )
                nested_block._is_anonymous = nested_block_document_info.get(
                    "is_anonymous"
                )

    @classmethod
    @sync_compatible
    @inject_client
    async def load(
        cls,
        name: str,
        validate: bool = True,
        client: "OrionClient" = None,
    ):
        """
        Retrieves data from the block document with the given name for the block type
        that corresponds with the current class and returns an instantiated version of
        the current class with the data stored in the block document.

        If a block document for a given block type is saved with a different schema
        than the current class calling `load`, a warning will be raised.

        If the current class schema is a subset of the block document schema, the block
        can be loaded as normal using the default `validate = True`.

        If the current class schema is a superset of the block document schema, `load`
        must be called with `validate` set to False to prevent a validation error. In
        this case, the block attributes will default to `None` and must be set manually
        and saved to a new block document before the block can be used as expected.

        Args:
            name: The name or slug of the block document. A block document slug is a
                string with the format <block_type_slug>/<block_document_name>
            validate: If False, the block document will be loaded without Pydantic
                validating the block schema. This is useful if the block schema has
                changed client-side since the block document referred to by `name` was saved.
            client: The client to use to load the block document. If not provided, the
                default client will be injected.

        Raises:
            ValueError: If the requested block document is not found.

        Returns:
            An instance of the current class hydrated with the data stored in the
            block document with the specified name.

        Examples:
            Load from a Block subclass with a block document name:
            ```python
            class Custom(Block):
                message: str

            Custom(message="Hello!").save("my-custom-message")

            loaded_block = Custom.load("my-custom-message")
            ```

            Load from Block with a block document slug:
            class Custom(Block):
                message: str

            Custom(message="Hello!").save("my-custom-message")

            loaded_block = Block.load("custom/my-custom-message")

            Migrate a block document to a new schema:
            ```python
            # original class
            class Custom(Block):
                message: str

            Custom(message="Hello!").save("my-custom-message")

            # Updated class with new required field
            class Custom(Block):
                message: str
                number_of_ducks: int

            loaded_block = Custom.load("my-custom-message", validate=False)

            # Prints UserWarning about schema mismatch

            loaded_block.number_of_ducks = 42

            loaded_block.save("my-custom-message", overwrite=True)
            ```
        """
        if cls.__name__ == "Block":
            block_type_slug, block_document_name = name.split("/", 1)
        else:
            block_type_slug = cls.get_block_type_slug()
            block_document_name = name

        try:
            block_document = await client.read_block_document_by_name(
                name=block_document_name, block_type_slug=block_type_slug
            )
        except prefect.exceptions.ObjectNotFound as e:
            raise ValueError(
                f"Unable to find block document named {block_document_name} for block type {block_type_slug}"
            ) from e

        try:
            return cls._from_block_document(block_document)
        except ValidationError as e:
            if not validate:
                missing_fields = tuple(err["loc"][0] for err in e.errors())
                missing_block_data = {field: None for field in missing_fields}
                warnings.warn(
                    f"Could not fully load {block_document_name!r} of block type {cls._block_type_slug!r} - "
                    "this is likely because one or more required fields were added to the schema "
                    f"for {cls.__name__!r} that did not exist on the class when this block was last saved. "
                    f"Please specify values for new field(s): {listrepr(missing_fields)}, then "
                    f'run `{cls.__name__}.save("{block_document_name}", overwrite=True)`, and '
                    "load this block again before attempting to use it."
                )
                return cls.construct(**block_document.data, **missing_block_data)
            raise RuntimeError(
                f"Unable to load {block_document_name!r} of block type {cls._block_type_slug!r} "
                "due to failed validation. To load without validation, try loading again "
                "with `validate=False`."
            ) from e

    @staticmethod
    def is_block_class(block) -> bool:
        return _is_subclass(block, Block)

    @classmethod
    @sync_compatible
    @inject_client
    async def register_type_and_schema(cls, client: "OrionClient" = None):
        """
        Makes block available for configuration with current Orion server.
        Recursively registers all nested blocks. Registration is idempotent.

        Args:
            client: Optional Orion client to use for registering type and schema with
                Orion. A new client will be created and used if one is not provided.
        """
        if cls.__name__ == "Block":
            raise InvalidBlockRegistration(
                "`register_type_and_schema` should be called on a Block "
                "subclass and not on the Block class directly."
            )
        if ABC in getattr(cls, "__bases__", []):
            raise InvalidBlockRegistration(
                "`register_type_and_schema` should be called on a Block "
                "subclass and not on a Block interface class directly."
            )

        for field in cls.__fields__.values():
            if Block.is_block_class(field.type_):
                await field.type_.register_type_and_schema(client=client)
            if get_origin(field.type_) is Union:
                for type_ in get_args(field.type_):
                    if Block.is_block_class(type_):
                        await type_.register_type_and_schema(client=client)

        try:
            block_type = await client.read_block_type_by_slug(
                slug=cls.get_block_type_slug()
            )
            cls._block_type_id = block_type.id
            await client.update_block_type(
                block_type_id=block_type.id, block_type=cls._to_block_type()
            )
        except prefect.exceptions.ObjectNotFound:
            block_type = await client.create_block_type(block_type=cls._to_block_type())
            cls._block_type_id = block_type.id

        try:
            block_schema = await client.read_block_schema_by_checksum(
                checksum=cls._calculate_schema_checksum(),
                version=cls.get_block_schema_version(),
            )
        except prefect.exceptions.ObjectNotFound:
            block_schema = await client.create_block_schema(
                block_schema=cls._to_block_schema(block_type_id=block_type.id)
            )

        cls._block_schema_id = block_schema.id

    @inject_client
    async def _save(
        self,
        name: Optional[str] = None,
        is_anonymous: bool = False,
        overwrite: bool = False,
        client: "OrionClient" = None,
    ):
        """
        Saves the values of a block as a block document with an option to save as an
        anonymous block document.

        Args:
            name: User specified name to give saved block document which can later be used to load the
                block document.
            is_anonymous: Boolean value specifying whether the block document is anonymous. Anonymous
                blocks are intended for system use and are not shown in the UI. Anonymous blocks do not
                require a user-supplied name.
            overwrite: Boolean value specifying if values should be overwritten if a block document with
                the specified name already exists.

        Raises:
            ValueError: If a name is not given and `is_anonymous` is `False` or a name is given and
                `is_anonymous` is `True`.
        """
        if name is None and not is_anonymous:
            raise ValueError(
                "You're attempting to save a block document without a name. "
                "Please either save a block document with a name or set "
                "is_anonymous to True."
            )

        self._is_anonymous = is_anonymous

        # Ensure block type and schema are registered before saving block document.
        await self.register_type_and_schema(client=client)

        try:
            block_document = await client.create_block_document(
                block_document=self._to_block_document(name=name)
            )
        except prefect.exceptions.ObjectAlreadyExists as err:
            if overwrite:
                block_document_id = self._block_document_id
                if block_document_id is None:
                    existing_block_document = await client.read_block_document_by_name(
                        name=name, block_type_slug=self.get_block_type_slug()
                    )
                    block_document_id = existing_block_document.id
                await client.update_block_document(
                    block_document_id=block_document_id,
                    block_document=self._to_block_document(name=name),
                )
                block_document = await client.read_block_document(
                    block_document_id=block_document_id
                )
            else:
                raise ValueError(
                    "You are attempting to save values with a name that is already in "
                    "use for this block type. If you would like to overwrite the values that are saved, "
                    "then save with `overwrite=True`."
                ) from err

        # Update metadata on block instance for later use.
        self._block_document_name = block_document.name
        self._block_document_id = block_document.id
        return self._block_document_id

    @sync_compatible
    async def save(
        self, name: str, overwrite: bool = False, client: "OrionClient" = None
    ):
        """
        Saves the values of a block as a block document.

        Args:
            name: User specified name to give saved block document which can later be used to load the
                block document.
            overwrite: Boolean value specifying if values should be overwritten if a block document with
                the specified name already exists.

        """
        document_id = await self._save(name=name, overwrite=overwrite, client=client)

        return document_id

    def _iter(self, *, include=None, exclude=None, **kwargs):
        # Injects the `block_type_slug` into serialized payloads for dispatch
        for key_value in super()._iter(include=include, exclude=exclude, **kwargs):
            yield key_value

        # Respect inclusion and exclusion still
        if include and "block_type_slug" not in include:
            return
        if exclude and "block_type_slug" in exclude:
            return

        yield "block_type_slug", self.get_block_type_slug()

    def __new__(cls: Type[Self], **kwargs) -> Self:
        """
        Create an instance of the Block subclass type if a `block_type_slug` is
        present in the data payload.
        """
        block_type_slug = kwargs.pop("block_type_slug", None)
        if block_type_slug:
            subcls = lookup_type(cls, dispatch_key=block_type_slug)
            m = super().__new__(subcls)
            # NOTE: This is a workaround for an obscure issue where copied models were
            #       missing attributes. This pattern is from Pydantic's
            #       `BaseModel._copy_and_set_values`.
            #       The issue this fixes could not be reproduced in unit tests that
            #       directly targeted dispatch handling and was only observed when
            #       copying then saving infrastructure blocks on deployment models.
            object.__setattr__(m, "__dict__", kwargs)
            object.__setattr__(m, "__fields_set__", set(kwargs.keys()))
            return m
        else:
            m = super().__new__(cls)
            object.__setattr__(m, "__dict__", kwargs)
            object.__setattr__(m, "__fields_set__", set(kwargs.keys()))
            return m