Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
from datetime import datetime
from functools import wraps
from itertools import chain, combinations
from typing import (
    Any,
    Callable,
    Collection,
    Dict,
    Final,
    List,
    Optional,
    Set,
    Tuple,
    Union,
    cast,
)
import hashlib
import logging
import pickle

from cachetools import LRUCache
from sqlalchemy import (
    Column,
    DateTime,
    Integer,
    LargeBinary,
    MetaData,
    String,
    Table,
    UniqueConstraint,
    create_engine,
    delete,
    exists,
    insert,
    select,
)
from sqlalchemy.engine import Engine
import sqlalchemy

from sarus_data_spec.factory import Factory
from sarus_data_spec.protobuf.typing import (
    ProtobufWithUUID,
    ProtobufWithUUIDAndDatetime,
)
from sarus_data_spec.storage.utils import sort_dataspecs
from sarus_data_spec.typing import DataSpec, Referrable, Referring
import sarus_data_spec.protobuf as sp

SEP: Final[str] = ","
logger = logging.getLogger(__name__)


def referrable_collection_string(
    values: Collection[Referrable[ProtobufWithUUID]],
) -> str:
    return SEP.join(sorted(value.uuid() for value in values))


def uuid_collection_string(uuids: Collection[str]) -> str:
    return SEP.join(sorted(uuid for uuid in uuids))


F = Callable[..., Optional[Referrable[ProtobufWithUUID]]]


def cached_without_none(cache: Any) -> Callable[[F], F]:
    def decorator(func: F) -> F:
        @wraps(func)
        def wrapper(*args: Any, **kwargs: Any) -> Any:
            key = (func, args, frozenset(kwargs.items()))
            result = cache.get(key)
            if result is not None:
                return result

            result = func(*args, **kwargs)
            if result is not None:
                cache[key] = result

            return result

        return wrapper

    return decorator


def set_referrables_table(metadata: MetaData) -> Table:
    """A helper function to setup the referrables table"""
    referrables_table = Table(
        "referrables",
        metadata,
        Column("uuid", String, primary_key=True),
        Column("type_name", String, nullable=False),
        Column("timestamp", DateTime, nullable=True),
        Column("referrable", LargeBinary, nullable=False),
        # Two columns are added to the referrable table: the first column
        # is an optional integer, null for everyone except for statuses (a
        # ProtobufWithDateTime in the storage language), it counts how many
        # statuses have been pushed until itself for that tuple (manager,
        # datasets)
        Column("atomic_update_value", Integer, nullable=True),
        # the second one is null for every referrable except statuses where
        # it is the hash of the tuple (referred dataset,referred manager,
        # count) where count is the value of atomic_update_value.
        Column("unique_constraint", String, nullable=True, unique=True),
    )
    return referrables_table


def set_referring_table(metadata: MetaData) -> Table:
    """A helper function to setup the referring table"""
    referring_table = Table(
        "referring",
        metadata,
        Column("referred", String, index=True, nullable=False),
        Column("referring", String, nullable=False),
        Column("typename", String, nullable=False),
        UniqueConstraint("referred", "referring", name="referred_referring"),
    )
    return referring_table


def set_sources_table(metadata: MetaData) -> Table:
    """A helper function to setup the sources table"""
    sources_table = Table(
        "sources",
        metadata,
        Column("referring", String, index=True, nullable=False),
        Column("source", String, nullable=False),
        Column("typename", String, nullable=False),
        UniqueConstraint(
            "referring", "source", name="referring_sources_relation"
        ),
    )
    return sources_table


class Storage:
    """Simple SQL Storage."""

    cache: Any = LRUCache(maxsize=512)

    def __init__(self, engine: Engine, factory: Factory) -> None:
        # A Store to save (timestamp, type_name, data, relating data)
        self._engine = engine
        self._metadata = MetaData()
        self._referrables = set_referrables_table(self._metadata)
        self._referring = set_referring_table(self._metadata)
        self._sources = set_sources_table(self._metadata)
        self._factory = factory

    def create_all(self) -> None:
        self._metadata.create_all(self._engine)

    def create_referrable_insertion(
        self,
        value: Referrable[ProtobufWithUUID],
        atomic_update_values: Tuple[Optional[int], Optional[str]],
    ) -> sqlalchemy.sql.dml.Insert:
        """Create a statement to insert the value in the referrables table."""
        protobuf = value.protobuf()
        if isinstance(protobuf, ProtobufWithUUIDAndDatetime):
            return insert(self._referrables).values(
                uuid=value.uuid(),
                type_name=value.type_name(),
                timestamp=datetime.fromisoformat(protobuf.datetime),
                referrable=sp.serialize(protobuf),
                atomic_update_value=atomic_update_values[0],
                unique_constraint=atomic_update_values[1],
            )
        else:
            return insert(self._referrables).values(
                uuid=value.uuid(),
                type_name=value.type_name(),
                referrable=sp.serialize(protobuf),
                atomic_update_value=atomic_update_values[0],
                unique_constraint=atomic_update_values[1],
            )

    def create_batch_referrable_insertion(
        self, values: Collection[Referrable[ProtobufWithUUID]]
    ) -> List[sqlalchemy.sql.dml.Insert]:
        """Create a statement to insert the value in the referrables table for
        a btach store."""
        insertions = []
        for value in values:
            insertions.append(
                self.create_referrable_insertion(
                    value,
                    atomic_update_values=self.referring_with_datetime_update_unique_token(  # noqa: E501
                        value
                    ),
                )
            )

        return insertions

    def create_referring_insertions(
        self, value: Referring[ProtobufWithUUID]
    ) -> List[sqlalchemy.sql.dml.Insert]:
        """Create statements to insert the links in the referrings table."""

        referred_combinations = chain.from_iterable(
            combinations(value.referred_uuid(), r) for r in range(1, 3)
        )
        return [
            insert(self._referring).values(
                referred=uuid_collection_string(combination),
                referring=value.uuid(),
                typename=value.type_name(),
            )
            for combination in referred_combinations
        ]

    def create_batch_referring_insertions(
        self, values: Collection[Referrable[ProtobufWithUUID]]
    ) -> List[sqlalchemy.sql.dml.Insert]:
        """Create statements to insert the links in the referrings table for
        the batch store."""
        uuids = set([value.uuid() for value in values])
        insertions = []
        for value in values:
            if isinstance(value, Referring):
                # Add to referring
                value = cast(Referring[ProtobufWithUUID], value)
                insertions.extend(self.create_referring_insertions(value))

                # Check that all referred will be present already stored or not
                already_stored_referred = set(value.referred_uuid()) - uuids
                with self._engine.begin() as conn:
                    elements = conn.execute(
                        select(self._referrables.columns.uuid).where(
                            self._referrables.columns.uuid.in_(
                                already_stored_referred
                            )
                        )
                    ).fetchall()

                assert {el[0] for el in elements} == already_stored_referred
        return insertions

    def create_sources_insertions(
        self, value: Referring[ProtobufWithUUID]
    ) -> List[sqlalchemy.sql.dml.Insert]:
        """Create statements to insert the source
        of a dataspec in the source table."""
        referred_values = value.referred()

        referred_dataspecs = [
            referred_value
            for referred_value in referred_values
            if isinstance(referred_value, DataSpec)
        ]
        referred_uuids = [
            referred_dataspec.uuid()
            for referred_dataspec in referred_dataspecs
        ]

        # A dataspec without (dataspec) parents is his own source
        if not referred_dataspecs:
            dataspec = cast(DataSpec, value)
            return [
                insert(self._sources).values(
                    referring=dataspec.uuid(),
                    source=dataspec.uuid(),
                    typename=dataspec.type_name(),
                )
            ]
        else:
            for referred_dataspec in referred_dataspecs:
                if len(self.sources(referred_dataspec)) == 0:
                    self._store_sources(referred_dataspec)

            retrieval = select(
                self._sources.columns.source, self._sources.columns.typename
            ).where(self._sources.columns.referring.in_(referred_uuids))
            with self._engine.begin() as conn:
                source_rows = conn.execute(retrieval).fetchall()
            # concatenating the sources into a list
            sources_info = set(
                [(row.source, row.typename) for row in source_rows]
            )

            return [
                insert(self._sources).values(
                    referring=value.uuid(),
                    source=source_info[0],
                    typename=source_info[1],
                )
                for source_info in sources_info
            ]

    def create_batch_sources_insertions(
        self, values: Collection[Referrable[ProtobufWithUUID]]
    ) -> List[sqlalchemy.sql.dml.Insert]:
        """Create statements to insert the source
        of a dataspec in the source table for the batch store."""
        insertions = []
        sorted_values = self.sort_dataspecs(values)
        sorted_uuids = [value.uuid() for value in sorted_values]
        # Insert sources for absent objects
        sources_found = {}
        for value in sorted_values:
            if isinstance(value, Referring):
                sources = []
                referred_values = value.referred()
                referred_dataspecs = [
                    referred_value
                    for referred_value in referred_values
                    if isinstance(referred_value, DataSpec)
                ]
                referred_uuids = [
                    referred_dataspec.uuid()
                    for referred_dataspec in referred_dataspecs
                ]
                if not referred_dataspecs:
                    dataspec = cast(DataSpec, value)
                    insertions.extend(
                        [
                            insert(self._sources).values(
                                referring=dataspec.uuid(),
                                source=dataspec.uuid(),
                                typename=dataspec.type_name(),
                            )
                        ]
                    )
                    sources_found[value.uuid()] = [value]
                else:
                    # Get sources for referred UUIDs that
                    # are planned to be stored
                    to_store_uuids = set(
                        [
                            uuid
                            for uuid in referred_uuids
                            if uuid in sorted_uuids
                        ]
                    )
                    for uuid in to_store_uuids:
                        sources.extend(sources_found[uuid])

                    # Get sources for referred UUIDs that
                    # have already been stored
                    already_stored_uuids = set(
                        [
                            uuid
                            for uuid in referred_uuids
                            if uuid not in sorted_uuids
                        ]
                    )
                    for uuid in already_stored_uuids:
                        referred_dataspec = self.referrable(uuid)
                        if referred_dataspec and isinstance(
                            referred_dataspec, DataSpec
                        ):
                            retrieval_sources = self.sources(referred_dataspec)
                            sources.extend(retrieval_sources)

                    # add insertions for collected sources
                    sources = list(set(sources))
                    sources_found[value.uuid()] = sources
                    insertions.extend(
                        [
                            insert(self._sources).values(
                                referring=value.uuid(),
                                source=source.uuid(),
                                typename=source.type_name(),
                            )
                            for source in sources
                        ]
                    )
        return insertions

    def referring_with_datetime_update_unique_token(
        self, value: Referrable[ProtobufWithUUID]
    ) -> Tuple[Optional[int], Optional[str]]:
        """This method is used to differentiate the atomic update
        of protos with a datetime from the others. In the former,
        it returns a tuple of None while the latter it returns
        the count of statuses that will exist for the referred
        collection and the hash of (referred+count).
        """

        proto = value.protobuf()
        if isinstance(value, Referring) and isinstance(
            proto, ProtobufWithUUIDAndDatetime
        ):
            # this is a status
            # we add:
            last_ref = self.last_referring(
                referred_uuid=value.referred_uuid(),
                type_name=value.type_name(),
            )
            if last_ref is None:
                return 0, self._hash_unique_constraint(
                    referring=value, last_integer=-1
                )
            query_last_integer = select(
                self._referrables.columns.atomic_update_value
            ).where(self._referrables.columns.uuid == last_ref.uuid())
            with self._engine.begin() as conn:
                last_integer = conn.scalar(query_last_integer)
            assert last_integer is not None
            return last_integer + 1, self._hash_unique_constraint(
                referring=value, last_integer=last_integer
            )
        return (None, None)

    def _hash_unique_constraint(
        self, referring: Referring, last_integer: int
    ) -> str:
        """Method to create value for the unique constraint, referred values
        by the referring are concatenated with last_integer+1 and hashed"""

        referred_uuids = sorted(referring.referred_uuid())
        referred_uuids.append(str(last_integer + 1))
        md5 = hashlib.md5(usedforsecurity=False)
        md5.update(pickle.dumps(tuple(referred_uuids)))
        return md5.hexdigest()

    def store(self, value: Referrable[ProtobufWithUUID]) -> None:
        # Check the value for consistency.
        assert value._frozen()
        # Test if the value exists.
        retrieval = select(
            exists().where(self._referrables.columns.uuid == value.uuid())
        )

        with self._engine.begin() as conn:
            element = conn.scalar(retrieval)

        if element:
            return  # Already inserted

        # If not exists insert
        insertions = []
        insertions.append(
            self.create_referrable_insertion(
                value,
                atomic_update_values=self.referring_with_datetime_update_unique_token(  # noqa: E501
                    value
                ),
            )
        )
        if isinstance(value, Referring):
            # Add to referring.
            value = cast(Referring[ProtobufWithUUID], value)
            insertions.extend(self.create_referring_insertions(value))

        try:
            with self._engine.begin() as conn:
                for insertion in insertions:
                    conn.execute(insertion)
        except sqlalchemy.exc.IntegrityError:
            logger.warning("Tried to INSERT already existing value.")

        if isinstance(value, DataSpec):
            dataspec = cast(DataSpec, value)
            self._store_sources(dataspec)

    def _store_sources(self, value: Referring[ProtobufWithUUID]) -> None:
        insertions_sources = self.create_sources_insertions(value)
        if len(insertions_sources) == 0:
            raise ValueError(
                """Unable to retrieve sources. The computation graph
                    in storage is incomplete."""
            )

        try:
            with self._engine.begin() as conn:
                for insertion in insertions_sources:
                    conn.execute(insertion)
        except sqlalchemy.exc.IntegrityError:
            logger.warning("Tried to INSERT already existing value.")

    def batch_store(
        self, values: Collection[Referrable[ProtobufWithUUID]]
    ) -> None:
        """Store a collection of referrables in the storage.
        This method does not requires the objects to be provided in the graph
        order.
        Protobufs with datetime are not accepted.
        """
        # Check objects
        for value in values:
            assert value._frozen()
            assert not isinstance(
                value.protobuf(), ProtobufWithUUIDAndDatetime
            )

        # Check which referrables are already stored
        uuids = [val.uuid() for val in values]
        with self._engine.begin() as conn:
            elements = conn.execute(
                select(self._referrables.columns.uuid).where(
                    self._referrables.columns.uuid.in_(uuids)
                )
            ).fetchall()
        stored_uuids = [el[0] for el in elements]
        absent_uuids = set(
            [uuid for uuid in uuids if uuid not in stored_uuids]
        )
        absent_values = [
            value for value in values if value.uuid() in absent_uuids
        ]

        insertions = []

        referrable_insertions = self.create_batch_referrable_insertion(
            absent_values
        )
        insertions.extend(referrable_insertions)

        referring_insertions = self.create_batch_referring_insertions(
            absent_values
        )
        insertions.extend(referring_insertions)

        try:
            with self._engine.begin() as conn:
                for insertion in insertions:
                    conn.execute(insertion)
        except sqlalchemy.exc.IntegrityError:
            logger.warning("Tried to INSERT already existing value.")

        # Insert sources for absent objects
        sources_instertions = self.create_batch_sources_insertions(
            absent_values
        )

        try:
            with self._engine.begin() as conn:
                for source_instertion in sources_instertions:
                    conn.execute(source_instertion)
        except sqlalchemy.exc.IntegrityError:
            logger.warning("Tried to INSERT already existing value.")

    @cached_without_none(cache)
    def referrable(self, uuid: str) -> Optional[Referrable[ProtobufWithUUID]]:
        retrieval = select(self._referrables.columns.referrable).where(
            self._referrables.columns.uuid == uuid
        )
        with self._engine.begin() as conn:
            value_binary = conn.execute(retrieval).fetchone()
        if value_binary is not None:
            return cast(
                Referrable[ProtobufWithUUID],
                self._factory.create(
                    sp.deserialize(value_binary.referrable), store=False
                ),
            )
        else:
            return None

    def referring(
        self,
        referred: Union[
            Referrable[ProtobufWithUUID],
            Collection[Referrable[ProtobufWithUUID]],
        ],
        type_name: Optional[str] = None,
    ) -> Collection[Referring[ProtobufWithUUID]]:
        """Returns the list of all the referring (with type_name) of a
        Referrable."""
        if isinstance(referred, Referrable):
            referred = [referred]
        cast(Collection[Referrable[ProtobufWithUUID]], referred)
        retrieval = select(self._referring.columns.referring).where(
            self._referring.columns.referred
            == referrable_collection_string(referred),
            (self._referring.columns.typename == type_name)
            if type_name is not None
            else sqlalchemy.true(),
        )
        with self._engine.begin() as conn:
            referring_rows = conn.execute(retrieval).fetchall()

        referrings = set()
        for row in referring_rows:
            referrable = self.referrable(row.referring)
            assert referrable is not None
            referrings.add(cast(Referring[ProtobufWithUUID], referrable))

        return referrings

    def batch_referring(
        self,
        collection_referred: Collection[
            Union[
                Referrable[ProtobufWithUUID],
                Collection[Referrable[ProtobufWithUUID]],
            ]
        ],
        type_names: Optional[Collection[str]] = None,
    ) -> Dict[str, Set[Referring[ProtobufWithUUID]]]:
        """Returns the list of all the referring
        (for multiples type_name) of several Referrables."""
        referred_strings = []
        for referred in collection_referred:
            if isinstance(referred, Referrable):
                referred_strings.append(
                    referrable_collection_string([referred])
                )
            else:
                referred_strings.append(referrable_collection_string(referred))

        retrieval = select(
            self._referring.columns.referring,
            self._referring.columns.typename,
        ).where(
            self._referring.columns.referred.in_(referred_strings),
            (self._referring.columns.typename.in_(type_names))
            if type_names is not None
            else sqlalchemy.true(),
        )
        with self._engine.begin() as conn:
            referring_rows = conn.execute(retrieval).fetchall()
        result_dict: Dict[str, Set[Referring[ProtobufWithUUID]]] = {}
        if type_names is not None:
            for type_name in type_names:
                result_dict[type_name] = set()

        for row in referring_rows:
            ref = self.referrable(row.referring)
            assert ref is not None
            referring = cast(Referring[ProtobufWithUUID], ref)
            if row.typename in result_dict:
                result_dict[row.typename].add(referring)
            else:
                result_dict[row.typename] = {referring}

        return result_dict

    def sources(
        self,
        value: Referrable[ProtobufWithUUID],
        type_name: Optional[str] = None,
    ) -> Set[DataSpec]:
        """Returns a set of all sources (with type_name) of a Referrable."""
        if not isinstance(value, Referring):
            return set()

        value = cast(Referring[ProtobufWithUUID], value)
        retrieval = (
            select(self._referrables.columns.referrable)
            .join_from(
                self._sources,
                self._referrables,
                self._sources.columns.source == self._referrables.columns.uuid,
            )
            .where(self._sources.columns.referring == value.uuid())
        )

        with self._engine.begin() as conn:
            sources_binaries = conn.execute(retrieval).fetchall()

        if len(sources_binaries) > 0:
            if type_name is not None:
                retrieval = retrieval.where(
                    self._sources.columns.typename == type_name
                )
                with self._engine.begin() as conn:
                    sources_binaries = conn.execute(retrieval).fetchall()

            return {
                cast(
                    DataSpec,
                    self._factory.create(
                        sp.deserialize(source_binary.referrable), store=False
                    ),
                )
                for source_binary in sources_binaries
            }
        else:
            self._store_sources(value)
            return self.sources(value=value, type_name=type_name)

    def all_sources(self, type_name: Optional[str] = None) -> Set[DataSpec]:
        """Returns a set of all sources (with type_name) in the Storage."""
        retrieval = (
            select(self._referrables.columns.referrable)
            .join_from(
                self._sources,
                self._referrables,
                self._sources.columns.source == self._referrables.columns.uuid,
            )
            .where(
                self._sources.columns.referring == self._sources.columns.source
            )
        )
        if type_name is not None:
            retrieval = retrieval.where(
                self._sources.columns.typename == type_name
            )

        with self._engine.begin() as conn:
            sources_binaries = conn.execute(retrieval).fetchall()

        if len(sources_binaries) > 0:
            return {
                cast(
                    DataSpec,
                    self._factory.create(
                        sp.deserialize(source_binary.referrable), store=False
                    ),
                )
                for source_binary in sources_binaries
            }
        else:
            return set()

    def sort_dataspecs(
        self, values: Collection[Referrable[ProtobufWithUUID]]
    ) -> Collection[Referring[ProtobufWithUUID]]:
        """Return a sorted list of dataspecs, in the order of the DAG, from
        the root to the nodes (the elements of the input list)."""

        return sort_dataspecs(self, values)

    def last_referring_query(
        self,
        referred_uuid: Collection[str],
        type_name: str,
    ) -> Any:
        """A helper function to build the last referring query"""
        retrieval = (
            select(self._referrables.columns.referrable)
            .join_from(
                self._referring,
                self._referrables,
                self._referring.columns.referring
                == self._referrables.columns.uuid,
            )
            .where(
                self._referring.columns.referred
                == uuid_collection_string(referred_uuid),
                self._referring.columns.typename == type_name,
            )
            .order_by(self._referrables.c.timestamp.desc())
            .limit(1)
        )
        return retrieval

    def last_referring(
        self,
        referred_uuid: Collection[str],
        type_name: str,
    ) -> Optional[Referring[ProtobufWithUUIDAndDatetime]]:
        retrieval = self.last_referring_query(referred_uuid, type_name)
        with self._engine.begin() as conn:
            value = conn.execute(retrieval).fetchone()
        if value is None:
            return None
        return cast(
            Optional[Referring[ProtobufWithUUIDAndDatetime]],
            self._factory.create(
                sp.deserialize(value.referrable), store=False
            ),
        )

    def update_referring_with_datetime(
        self,
        referred_uuid: Collection[str],
        type_name: str,
        update: Callable[
            [Referring[ProtobufWithUUIDAndDatetime]],
            Tuple[Referring[ProtobufWithUUIDAndDatetime], bool],
        ],
    ) -> Tuple[Referring[ProtobufWithUUIDAndDatetime], bool]:
        """Tries to update the storage with new protobuf with datetime
        via the update method. An integrity error when inserting means
        that a protobuf with datetime with the same referrable and same
        manager was inserted by another Python process between the start
        and the end of the transaction. The workaround is to retry the
        whole transaction altogether."""

        retry = True
        while retry:
            try:
                updated, is_updated = self.insert_protobuf_with_datetime(
                    referred_uuid=referred_uuid,
                    type_name=type_name,
                    update=update,
                )
                retry = False
            except sqlalchemy.exc.IntegrityError:
                pass
        return updated, is_updated

    def create_referring_with_datetime(
        self,
        value: Referring[ProtobufWithUUIDAndDatetime],
        update: Callable[
            [Referring[ProtobufWithUUIDAndDatetime]],
            Tuple[Referring[ProtobufWithUUIDAndDatetime], bool],
        ],
    ) -> Tuple[Referring[ProtobufWithUUIDAndDatetime], bool]:
        """Creating a new row is different from updating because we suppose no
        record for the protobuf with datetime existed at the start of the
        transaction. It might fail if another Python process creates a protobuf
        with datetime referring to the same referrable and manager. In this
        case an Integrity error occurs, we switch to updating the protobuf with
        datetime,because a record for the protobuf with datetime has now been
        created"""

        try:
            with self._engine.begin() as conn:
                queries = [
                    insert(self._referrables).values(
                        uuid=value.uuid(),
                        type_name=value.type_name(),
                        timestamp=datetime.fromisoformat(
                            value.protobuf().datetime
                        ),
                        referrable=sp.serialize(value.protobuf()),
                        atomic_update_value=0,
                        unique_constraint=self._hash_unique_constraint(
                            referring=value, last_integer=-1
                        ),
                    )
                ]
                queries.extend(self.create_referring_insertions(value))
                for query in queries:
                    conn.execute(query)
        except sqlalchemy.exc.IntegrityError:
            return self.update_referring_with_datetime(
                referred_uuid=value.referred_uuid(),
                type_name=value.type_name(),
                update=update,
            )

        return value, True

    def insert_protobuf_with_datetime(
        self,
        referred_uuid: Collection[str],
        type_name: str,
        update: Callable[
            [Referring[ProtobufWithUUIDAndDatetime]],
            Tuple[Referring[ProtobufWithUUIDAndDatetime], bool],
        ],
    ) -> Tuple[Referring[ProtobufWithUUIDAndDatetime], bool]:
        """This method is responsible to insert a new
        protobuf with datetime, if it is compliant to the rule
        set in the update method. If it is not, it will return
        the last existing referring for the referred element."""
        with self._engine.begin() as conn:
            retrieval = self.last_referring_query(referred_uuid, type_name)
            value = conn.execute(retrieval).fetchone()
            assert value is not None
            # Cast to the right type
            referring = cast(
                Referring[ProtobufWithUUIDAndDatetime],
                self._factory.create(
                    sp.deserialize(value.referrable), store=False
                ),
            )
            updated, should_update = update(referring)
            if should_update:
                # Insert referrables
                query_last_integer = select(
                    self._referrables.columns.atomic_update_value
                ).where(self._referrables.columns.uuid == referring.uuid())
                last_integer = conn.scalar(query_last_integer)
                assert last_integer is not None
                unique_constraint = self._hash_unique_constraint(
                    referring=referring, last_integer=last_integer
                )
                conn.execute(
                    insert(self._referrables).values(
                        uuid=updated.uuid(),
                        type_name=updated.type_name(),
                        timestamp=datetime.fromisoformat(
                            updated.protobuf().datetime
                        ),
                        referrable=sp.serialize(updated.protobuf()),
                        atomic_update_value=last_integer + 1,
                        unique_constraint=unique_constraint,
                    )
                )
                referred_combinations = chain.from_iterable(
                    combinations(updated.referred_uuid(), r)
                    for r in range(1, 3)
                )
                # Insert referring
                for combination in referred_combinations:
                    conn.execute(
                        insert(self._referring).values(
                            referred=uuid_collection_string(combination),
                            referring=updated.uuid(),
                            typename=updated.type_name(),
                        )
                    )
                return updated, True

        fallback = self.last_referring(referred_uuid, type_name)
        assert fallback is not None
        return fallback, False

    def type_name(
        self, type_name: str
    ) -> Collection[Referrable[ProtobufWithUUID]]:
        retrieval = select(self._referrables.columns.referrable).where(
            self._referrables.columns.type_name == type_name
        )
        with self._engine.begin() as conn:
            values = conn.execute(retrieval).fetchall()
        return {
            cast(
                Referrable[ProtobufWithUUID],
                self._factory.create(
                    sp.deserialize(value.referrable), store=False
                ),
            )
            for value in values
        }

    def all_referrings(self, uuid: str) -> List[str]:
        """Returns a list all items referring to a Referrable recursively."""
        target = self.referrable(uuid)

        to_delete, to_check = set(), {target}
        while len(to_check) > 0:
            node = to_check.pop()
            if not node:
                continue
            to_delete.add(node)
            deps = node.referring()
            if not deps:
                continue
            for dep in deps:
                if dep not in to_delete:
                    to_check.add(dep)

        return [msg.uuid() for msg in to_delete]

    def delete(self, uuid: str) -> None:
        """Delete an object all referring items."""
        uuids_to_delete = self.all_referrings(uuid)

        with self._engine.begin() as conn:
            result = conn.execute(
                delete(self._sources).where(
                    self._sources.c.referring.in_(uuids_to_delete)
                )
            )
            logger.info(f"Deleted {result.rowcount} elements from sources")
            result = conn.execute(
                delete(self._referring).where(
                    self._referring.c.referring.in_(uuids_to_delete)
                )
            )
            logger.info(f"Deleted {result.rowcount} elements from referring")
            result = conn.execute(
                delete(self._referrables).where(
                    self._referrables.c.uuid.in_(uuids_to_delete)
                )
            )
            logger.info(f"Deleted {result.rowcount} elements from referrables")

            # empty cache
            self.cache.clear()

    def delete_type(self, type_name: str) -> None:
        """Deletes all referrable corresponding to a given type_name and all
        the referrings corresponfing to it"""

        uuids = [obj.uuid() for obj in self.type_name(type_name)]
        uuids_to_delete = set(
            chain(*(self.all_referrings(uuid) for uuid in uuids))
        )

        with self._engine.begin() as conn:
            result = conn.execute(
                delete(self._sources).where(
                    self._sources.c.referring.in_(uuids_to_delete)
                )
            )
            logger.info(f"Deleted {result.rowcount} elements from sources")
            result = conn.execute(
                delete(self._referring).where(
                    self._referring.c.referring.in_(uuids_to_delete)
                )
            )
            logger.info(f"Deleted {result.rowcount} elements from referring")
            result = conn.execute(
                delete(self._referrables).where(
                    self._referrables.c.uuid.in_(uuids_to_delete)
                )
            )
            logger.info(f"Deleted {result.rowcount} elements from referrables")

            # empty cache
            self.cache.clear()

    def truncate(self) -> None:
        # Remove all the objects
        with self._engine.begin() as conn:
            conn.execute(delete(self._referrables))
            conn.execute(delete(self._referring))


# Some utility functions to create the DB


def sqlite(path: str) -> Engine:
    metadata = MetaData()
    referrables = set_referrables_table(metadata)  # noqa: F841
    referring = set_referring_table(metadata)  # noqa: F841
    sources = set_sources_table(metadata)  # noqa: F841
    uri = f"sqlite:///{path}"
    engine = create_engine(uri)
    metadata.create_all(engine)
    return engine