Repository URL to install this package:
|
Version:
4.5.4.dev1 ▾
|
from datetime import datetime
from functools import wraps
from itertools import chain, combinations
from typing import (
Any,
Callable,
Collection,
Dict,
Final,
List,
Optional,
Set,
Tuple,
Union,
cast,
)
import hashlib
import logging
import pickle
from cachetools import LRUCache
from sqlalchemy import (
Column,
DateTime,
Integer,
LargeBinary,
MetaData,
String,
Table,
UniqueConstraint,
create_engine,
delete,
exists,
insert,
select,
)
from sqlalchemy.engine import Engine
import sqlalchemy
from sarus_data_spec.factory import Factory
from sarus_data_spec.protobuf.typing import (
ProtobufWithUUID,
ProtobufWithUUIDAndDatetime,
)
from sarus_data_spec.storage.utils import sort_dataspecs
from sarus_data_spec.typing import DataSpec, Referrable, Referring
import sarus_data_spec.protobuf as sp
SEP: Final[str] = ","
logger = logging.getLogger(__name__)
def referrable_collection_string(
values: Collection[Referrable[ProtobufWithUUID]],
) -> str:
return SEP.join(sorted(value.uuid() for value in values))
def uuid_collection_string(uuids: Collection[str]) -> str:
return SEP.join(sorted(uuid for uuid in uuids))
F = Callable[..., Optional[Referrable[ProtobufWithUUID]]]
def cached_without_none(cache: Any) -> Callable[[F], F]:
def decorator(func: F) -> F:
@wraps(func)
def wrapper(*args: Any, **kwargs: Any) -> Any:
key = (func, args, frozenset(kwargs.items()))
result = cache.get(key)
if result is not None:
return result
result = func(*args, **kwargs)
if result is not None:
cache[key] = result
return result
return wrapper
return decorator
def set_referrables_table(metadata: MetaData) -> Table:
"""A helper function to setup the referrables table"""
referrables_table = Table(
"referrables",
metadata,
Column("uuid", String, primary_key=True),
Column("type_name", String, nullable=False),
Column("timestamp", DateTime, nullable=True),
Column("referrable", LargeBinary, nullable=False),
# Two columns are added to the referrable table: the first column
# is an optional integer, null for everyone except for statuses (a
# ProtobufWithDateTime in the storage language), it counts how many
# statuses have been pushed until itself for that tuple (manager,
# datasets)
Column("atomic_update_value", Integer, nullable=True),
# the second one is null for every referrable except statuses where
# it is the hash of the tuple (referred dataset,referred manager,
# count) where count is the value of atomic_update_value.
Column("unique_constraint", String, nullable=True, unique=True),
)
return referrables_table
def set_referring_table(metadata: MetaData) -> Table:
"""A helper function to setup the referring table"""
referring_table = Table(
"referring",
metadata,
Column("referred", String, index=True, nullable=False),
Column("referring", String, nullable=False),
Column("typename", String, nullable=False),
UniqueConstraint("referred", "referring", name="referred_referring"),
)
return referring_table
def set_sources_table(metadata: MetaData) -> Table:
"""A helper function to setup the sources table"""
sources_table = Table(
"sources",
metadata,
Column("referring", String, index=True, nullable=False),
Column("source", String, nullable=False),
Column("typename", String, nullable=False),
UniqueConstraint(
"referring", "source", name="referring_sources_relation"
),
)
return sources_table
class Storage:
"""Simple SQL Storage."""
cache: Any = LRUCache(maxsize=512)
def __init__(self, engine: Engine, factory: Factory) -> None:
# A Store to save (timestamp, type_name, data, relating data)
self._engine = engine
self._metadata = MetaData()
self._referrables = set_referrables_table(self._metadata)
self._referring = set_referring_table(self._metadata)
self._sources = set_sources_table(self._metadata)
self._factory = factory
def create_all(self) -> None:
self._metadata.create_all(self._engine)
def create_referrable_insertion(
self,
value: Referrable[ProtobufWithUUID],
atomic_update_values: Tuple[Optional[int], Optional[str]],
) -> sqlalchemy.sql.dml.Insert:
"""Create a statement to insert the value in the referrables table."""
protobuf = value.protobuf()
if isinstance(protobuf, ProtobufWithUUIDAndDatetime):
return insert(self._referrables).values(
uuid=value.uuid(),
type_name=value.type_name(),
timestamp=datetime.fromisoformat(protobuf.datetime),
referrable=sp.serialize(protobuf),
atomic_update_value=atomic_update_values[0],
unique_constraint=atomic_update_values[1],
)
else:
return insert(self._referrables).values(
uuid=value.uuid(),
type_name=value.type_name(),
referrable=sp.serialize(protobuf),
atomic_update_value=atomic_update_values[0],
unique_constraint=atomic_update_values[1],
)
def create_batch_referrable_insertion(
self, values: Collection[Referrable[ProtobufWithUUID]]
) -> List[sqlalchemy.sql.dml.Insert]:
"""Create a statement to insert the value in the referrables table for
a btach store."""
insertions = []
for value in values:
insertions.append(
self.create_referrable_insertion(
value,
atomic_update_values=self.referring_with_datetime_update_unique_token( # noqa: E501
value
),
)
)
return insertions
def create_referring_insertions(
self, value: Referring[ProtobufWithUUID]
) -> List[sqlalchemy.sql.dml.Insert]:
"""Create statements to insert the links in the referrings table."""
referred_combinations = chain.from_iterable(
combinations(value.referred_uuid(), r) for r in range(1, 3)
)
return [
insert(self._referring).values(
referred=uuid_collection_string(combination),
referring=value.uuid(),
typename=value.type_name(),
)
for combination in referred_combinations
]
def create_batch_referring_insertions(
self, values: Collection[Referrable[ProtobufWithUUID]]
) -> List[sqlalchemy.sql.dml.Insert]:
"""Create statements to insert the links in the referrings table for
the batch store."""
uuids = set([value.uuid() for value in values])
insertions = []
for value in values:
if isinstance(value, Referring):
# Add to referring
value = cast(Referring[ProtobufWithUUID], value)
insertions.extend(self.create_referring_insertions(value))
# Check that all referred will be present already stored or not
already_stored_referred = set(value.referred_uuid()) - uuids
with self._engine.begin() as conn:
elements = conn.execute(
select(self._referrables.columns.uuid).where(
self._referrables.columns.uuid.in_(
already_stored_referred
)
)
).fetchall()
assert {el[0] for el in elements} == already_stored_referred
return insertions
def create_sources_insertions(
self, value: Referring[ProtobufWithUUID]
) -> List[sqlalchemy.sql.dml.Insert]:
"""Create statements to insert the source
of a dataspec in the source table."""
referred_values = value.referred()
referred_dataspecs = [
referred_value
for referred_value in referred_values
if isinstance(referred_value, DataSpec)
]
referred_uuids = [
referred_dataspec.uuid()
for referred_dataspec in referred_dataspecs
]
# A dataspec without (dataspec) parents is his own source
if not referred_dataspecs:
dataspec = cast(DataSpec, value)
return [
insert(self._sources).values(
referring=dataspec.uuid(),
source=dataspec.uuid(),
typename=dataspec.type_name(),
)
]
else:
for referred_dataspec in referred_dataspecs:
if len(self.sources(referred_dataspec)) == 0:
self._store_sources(referred_dataspec)
retrieval = select(
self._sources.columns.source, self._sources.columns.typename
).where(self._sources.columns.referring.in_(referred_uuids))
with self._engine.begin() as conn:
source_rows = conn.execute(retrieval).fetchall()
# concatenating the sources into a list
sources_info = set(
[(row.source, row.typename) for row in source_rows]
)
return [
insert(self._sources).values(
referring=value.uuid(),
source=source_info[0],
typename=source_info[1],
)
for source_info in sources_info
]
def create_batch_sources_insertions(
self, values: Collection[Referrable[ProtobufWithUUID]]
) -> List[sqlalchemy.sql.dml.Insert]:
"""Create statements to insert the source
of a dataspec in the source table for the batch store."""
insertions = []
sorted_values = self.sort_dataspecs(values)
sorted_uuids = [value.uuid() for value in sorted_values]
# Insert sources for absent objects
sources_found = {}
for value in sorted_values:
if isinstance(value, Referring):
sources = []
referred_values = value.referred()
referred_dataspecs = [
referred_value
for referred_value in referred_values
if isinstance(referred_value, DataSpec)
]
referred_uuids = [
referred_dataspec.uuid()
for referred_dataspec in referred_dataspecs
]
if not referred_dataspecs:
dataspec = cast(DataSpec, value)
insertions.extend(
[
insert(self._sources).values(
referring=dataspec.uuid(),
source=dataspec.uuid(),
typename=dataspec.type_name(),
)
]
)
sources_found[value.uuid()] = [value]
else:
# Get sources for referred UUIDs that
# are planned to be stored
to_store_uuids = set(
[
uuid
for uuid in referred_uuids
if uuid in sorted_uuids
]
)
for uuid in to_store_uuids:
sources.extend(sources_found[uuid])
# Get sources for referred UUIDs that
# have already been stored
already_stored_uuids = set(
[
uuid
for uuid in referred_uuids
if uuid not in sorted_uuids
]
)
for uuid in already_stored_uuids:
referred_dataspec = self.referrable(uuid)
if referred_dataspec and isinstance(
referred_dataspec, DataSpec
):
retrieval_sources = self.sources(referred_dataspec)
sources.extend(retrieval_sources)
# add insertions for collected sources
sources = list(set(sources))
sources_found[value.uuid()] = sources
insertions.extend(
[
insert(self._sources).values(
referring=value.uuid(),
source=source.uuid(),
typename=source.type_name(),
)
for source in sources
]
)
return insertions
def referring_with_datetime_update_unique_token(
self, value: Referrable[ProtobufWithUUID]
) -> Tuple[Optional[int], Optional[str]]:
"""This method is used to differentiate the atomic update
of protos with a datetime from the others. In the former,
it returns a tuple of None while the latter it returns
the count of statuses that will exist for the referred
collection and the hash of (referred+count).
"""
proto = value.protobuf()
if isinstance(value, Referring) and isinstance(
proto, ProtobufWithUUIDAndDatetime
):
# this is a status
# we add:
last_ref = self.last_referring(
referred_uuid=value.referred_uuid(),
type_name=value.type_name(),
)
if last_ref is None:
return 0, self._hash_unique_constraint(
referring=value, last_integer=-1
)
query_last_integer = select(
self._referrables.columns.atomic_update_value
).where(self._referrables.columns.uuid == last_ref.uuid())
with self._engine.begin() as conn:
last_integer = conn.scalar(query_last_integer)
assert last_integer is not None
return last_integer + 1, self._hash_unique_constraint(
referring=value, last_integer=last_integer
)
return (None, None)
def _hash_unique_constraint(
self, referring: Referring, last_integer: int
) -> str:
"""Method to create value for the unique constraint, referred values
by the referring are concatenated with last_integer+1 and hashed"""
referred_uuids = sorted(referring.referred_uuid())
referred_uuids.append(str(last_integer + 1))
md5 = hashlib.md5(usedforsecurity=False)
md5.update(pickle.dumps(tuple(referred_uuids)))
return md5.hexdigest()
def store(self, value: Referrable[ProtobufWithUUID]) -> None:
# Check the value for consistency.
assert value._frozen()
# Test if the value exists.
retrieval = select(
exists().where(self._referrables.columns.uuid == value.uuid())
)
with self._engine.begin() as conn:
element = conn.scalar(retrieval)
if element:
return # Already inserted
# If not exists insert
insertions = []
insertions.append(
self.create_referrable_insertion(
value,
atomic_update_values=self.referring_with_datetime_update_unique_token( # noqa: E501
value
),
)
)
if isinstance(value, Referring):
# Add to referring.
value = cast(Referring[ProtobufWithUUID], value)
insertions.extend(self.create_referring_insertions(value))
try:
with self._engine.begin() as conn:
for insertion in insertions:
conn.execute(insertion)
except sqlalchemy.exc.IntegrityError:
logger.warning("Tried to INSERT already existing value.")
if isinstance(value, DataSpec):
dataspec = cast(DataSpec, value)
self._store_sources(dataspec)
def _store_sources(self, value: Referring[ProtobufWithUUID]) -> None:
insertions_sources = self.create_sources_insertions(value)
if len(insertions_sources) == 0:
raise ValueError(
"""Unable to retrieve sources. The computation graph
in storage is incomplete."""
)
try:
with self._engine.begin() as conn:
for insertion in insertions_sources:
conn.execute(insertion)
except sqlalchemy.exc.IntegrityError:
logger.warning("Tried to INSERT already existing value.")
def batch_store(
self, values: Collection[Referrable[ProtobufWithUUID]]
) -> None:
"""Store a collection of referrables in the storage.
This method does not requires the objects to be provided in the graph
order.
Protobufs with datetime are not accepted.
"""
# Check objects
for value in values:
assert value._frozen()
assert not isinstance(
value.protobuf(), ProtobufWithUUIDAndDatetime
)
# Check which referrables are already stored
uuids = [val.uuid() for val in values]
with self._engine.begin() as conn:
elements = conn.execute(
select(self._referrables.columns.uuid).where(
self._referrables.columns.uuid.in_(uuids)
)
).fetchall()
stored_uuids = [el[0] for el in elements]
absent_uuids = set(
[uuid for uuid in uuids if uuid not in stored_uuids]
)
absent_values = [
value for value in values if value.uuid() in absent_uuids
]
insertions = []
referrable_insertions = self.create_batch_referrable_insertion(
absent_values
)
insertions.extend(referrable_insertions)
referring_insertions = self.create_batch_referring_insertions(
absent_values
)
insertions.extend(referring_insertions)
try:
with self._engine.begin() as conn:
for insertion in insertions:
conn.execute(insertion)
except sqlalchemy.exc.IntegrityError:
logger.warning("Tried to INSERT already existing value.")
# Insert sources for absent objects
sources_instertions = self.create_batch_sources_insertions(
absent_values
)
try:
with self._engine.begin() as conn:
for source_instertion in sources_instertions:
conn.execute(source_instertion)
except sqlalchemy.exc.IntegrityError:
logger.warning("Tried to INSERT already existing value.")
@cached_without_none(cache)
def referrable(self, uuid: str) -> Optional[Referrable[ProtobufWithUUID]]:
retrieval = select(self._referrables.columns.referrable).where(
self._referrables.columns.uuid == uuid
)
with self._engine.begin() as conn:
value_binary = conn.execute(retrieval).fetchone()
if value_binary is not None:
return cast(
Referrable[ProtobufWithUUID],
self._factory.create(
sp.deserialize(value_binary.referrable), store=False
),
)
else:
return None
def referring(
self,
referred: Union[
Referrable[ProtobufWithUUID],
Collection[Referrable[ProtobufWithUUID]],
],
type_name: Optional[str] = None,
) -> Collection[Referring[ProtobufWithUUID]]:
"""Returns the list of all the referring (with type_name) of a
Referrable."""
if isinstance(referred, Referrable):
referred = [referred]
cast(Collection[Referrable[ProtobufWithUUID]], referred)
retrieval = select(self._referring.columns.referring).where(
self._referring.columns.referred
== referrable_collection_string(referred),
(self._referring.columns.typename == type_name)
if type_name is not None
else sqlalchemy.true(),
)
with self._engine.begin() as conn:
referring_rows = conn.execute(retrieval).fetchall()
referrings = set()
for row in referring_rows:
referrable = self.referrable(row.referring)
assert referrable is not None
referrings.add(cast(Referring[ProtobufWithUUID], referrable))
return referrings
def batch_referring(
self,
collection_referred: Collection[
Union[
Referrable[ProtobufWithUUID],
Collection[Referrable[ProtobufWithUUID]],
]
],
type_names: Optional[Collection[str]] = None,
) -> Dict[str, Set[Referring[ProtobufWithUUID]]]:
"""Returns the list of all the referring
(for multiples type_name) of several Referrables."""
referred_strings = []
for referred in collection_referred:
if isinstance(referred, Referrable):
referred_strings.append(
referrable_collection_string([referred])
)
else:
referred_strings.append(referrable_collection_string(referred))
retrieval = select(
self._referring.columns.referring,
self._referring.columns.typename,
).where(
self._referring.columns.referred.in_(referred_strings),
(self._referring.columns.typename.in_(type_names))
if type_names is not None
else sqlalchemy.true(),
)
with self._engine.begin() as conn:
referring_rows = conn.execute(retrieval).fetchall()
result_dict: Dict[str, Set[Referring[ProtobufWithUUID]]] = {}
if type_names is not None:
for type_name in type_names:
result_dict[type_name] = set()
for row in referring_rows:
ref = self.referrable(row.referring)
assert ref is not None
referring = cast(Referring[ProtobufWithUUID], ref)
if row.typename in result_dict:
result_dict[row.typename].add(referring)
else:
result_dict[row.typename] = {referring}
return result_dict
def sources(
self,
value: Referrable[ProtobufWithUUID],
type_name: Optional[str] = None,
) -> Set[DataSpec]:
"""Returns a set of all sources (with type_name) of a Referrable."""
if not isinstance(value, Referring):
return set()
value = cast(Referring[ProtobufWithUUID], value)
retrieval = (
select(self._referrables.columns.referrable)
.join_from(
self._sources,
self._referrables,
self._sources.columns.source == self._referrables.columns.uuid,
)
.where(self._sources.columns.referring == value.uuid())
)
with self._engine.begin() as conn:
sources_binaries = conn.execute(retrieval).fetchall()
if len(sources_binaries) > 0:
if type_name is not None:
retrieval = retrieval.where(
self._sources.columns.typename == type_name
)
with self._engine.begin() as conn:
sources_binaries = conn.execute(retrieval).fetchall()
return {
cast(
DataSpec,
self._factory.create(
sp.deserialize(source_binary.referrable), store=False
),
)
for source_binary in sources_binaries
}
else:
self._store_sources(value)
return self.sources(value=value, type_name=type_name)
def all_sources(self, type_name: Optional[str] = None) -> Set[DataSpec]:
"""Returns a set of all sources (with type_name) in the Storage."""
retrieval = (
select(self._referrables.columns.referrable)
.join_from(
self._sources,
self._referrables,
self._sources.columns.source == self._referrables.columns.uuid,
)
.where(
self._sources.columns.referring == self._sources.columns.source
)
)
if type_name is not None:
retrieval = retrieval.where(
self._sources.columns.typename == type_name
)
with self._engine.begin() as conn:
sources_binaries = conn.execute(retrieval).fetchall()
if len(sources_binaries) > 0:
return {
cast(
DataSpec,
self._factory.create(
sp.deserialize(source_binary.referrable), store=False
),
)
for source_binary in sources_binaries
}
else:
return set()
def sort_dataspecs(
self, values: Collection[Referrable[ProtobufWithUUID]]
) -> Collection[Referring[ProtobufWithUUID]]:
"""Return a sorted list of dataspecs, in the order of the DAG, from
the root to the nodes (the elements of the input list)."""
return sort_dataspecs(self, values)
def last_referring_query(
self,
referred_uuid: Collection[str],
type_name: str,
) -> Any:
"""A helper function to build the last referring query"""
retrieval = (
select(self._referrables.columns.referrable)
.join_from(
self._referring,
self._referrables,
self._referring.columns.referring
== self._referrables.columns.uuid,
)
.where(
self._referring.columns.referred
== uuid_collection_string(referred_uuid),
self._referring.columns.typename == type_name,
)
.order_by(self._referrables.c.timestamp.desc())
.limit(1)
)
return retrieval
def last_referring(
self,
referred_uuid: Collection[str],
type_name: str,
) -> Optional[Referring[ProtobufWithUUIDAndDatetime]]:
retrieval = self.last_referring_query(referred_uuid, type_name)
with self._engine.begin() as conn:
value = conn.execute(retrieval).fetchone()
if value is None:
return None
return cast(
Optional[Referring[ProtobufWithUUIDAndDatetime]],
self._factory.create(
sp.deserialize(value.referrable), store=False
),
)
def update_referring_with_datetime(
self,
referred_uuid: Collection[str],
type_name: str,
update: Callable[
[Referring[ProtobufWithUUIDAndDatetime]],
Tuple[Referring[ProtobufWithUUIDAndDatetime], bool],
],
) -> Tuple[Referring[ProtobufWithUUIDAndDatetime], bool]:
"""Tries to update the storage with new protobuf with datetime
via the update method. An integrity error when inserting means
that a protobuf with datetime with the same referrable and same
manager was inserted by another Python process between the start
and the end of the transaction. The workaround is to retry the
whole transaction altogether."""
retry = True
while retry:
try:
updated, is_updated = self.insert_protobuf_with_datetime(
referred_uuid=referred_uuid,
type_name=type_name,
update=update,
)
retry = False
except sqlalchemy.exc.IntegrityError:
pass
return updated, is_updated
def create_referring_with_datetime(
self,
value: Referring[ProtobufWithUUIDAndDatetime],
update: Callable[
[Referring[ProtobufWithUUIDAndDatetime]],
Tuple[Referring[ProtobufWithUUIDAndDatetime], bool],
],
) -> Tuple[Referring[ProtobufWithUUIDAndDatetime], bool]:
"""Creating a new row is different from updating because we suppose no
record for the protobuf with datetime existed at the start of the
transaction. It might fail if another Python process creates a protobuf
with datetime referring to the same referrable and manager. In this
case an Integrity error occurs, we switch to updating the protobuf with
datetime,because a record for the protobuf with datetime has now been
created"""
try:
with self._engine.begin() as conn:
queries = [
insert(self._referrables).values(
uuid=value.uuid(),
type_name=value.type_name(),
timestamp=datetime.fromisoformat(
value.protobuf().datetime
),
referrable=sp.serialize(value.protobuf()),
atomic_update_value=0,
unique_constraint=self._hash_unique_constraint(
referring=value, last_integer=-1
),
)
]
queries.extend(self.create_referring_insertions(value))
for query in queries:
conn.execute(query)
except sqlalchemy.exc.IntegrityError:
return self.update_referring_with_datetime(
referred_uuid=value.referred_uuid(),
type_name=value.type_name(),
update=update,
)
return value, True
def insert_protobuf_with_datetime(
self,
referred_uuid: Collection[str],
type_name: str,
update: Callable[
[Referring[ProtobufWithUUIDAndDatetime]],
Tuple[Referring[ProtobufWithUUIDAndDatetime], bool],
],
) -> Tuple[Referring[ProtobufWithUUIDAndDatetime], bool]:
"""This method is responsible to insert a new
protobuf with datetime, if it is compliant to the rule
set in the update method. If it is not, it will return
the last existing referring for the referred element."""
with self._engine.begin() as conn:
retrieval = self.last_referring_query(referred_uuid, type_name)
value = conn.execute(retrieval).fetchone()
assert value is not None
# Cast to the right type
referring = cast(
Referring[ProtobufWithUUIDAndDatetime],
self._factory.create(
sp.deserialize(value.referrable), store=False
),
)
updated, should_update = update(referring)
if should_update:
# Insert referrables
query_last_integer = select(
self._referrables.columns.atomic_update_value
).where(self._referrables.columns.uuid == referring.uuid())
last_integer = conn.scalar(query_last_integer)
assert last_integer is not None
unique_constraint = self._hash_unique_constraint(
referring=referring, last_integer=last_integer
)
conn.execute(
insert(self._referrables).values(
uuid=updated.uuid(),
type_name=updated.type_name(),
timestamp=datetime.fromisoformat(
updated.protobuf().datetime
),
referrable=sp.serialize(updated.protobuf()),
atomic_update_value=last_integer + 1,
unique_constraint=unique_constraint,
)
)
referred_combinations = chain.from_iterable(
combinations(updated.referred_uuid(), r)
for r in range(1, 3)
)
# Insert referring
for combination in referred_combinations:
conn.execute(
insert(self._referring).values(
referred=uuid_collection_string(combination),
referring=updated.uuid(),
typename=updated.type_name(),
)
)
return updated, True
fallback = self.last_referring(referred_uuid, type_name)
assert fallback is not None
return fallback, False
def type_name(
self, type_name: str
) -> Collection[Referrable[ProtobufWithUUID]]:
retrieval = select(self._referrables.columns.referrable).where(
self._referrables.columns.type_name == type_name
)
with self._engine.begin() as conn:
values = conn.execute(retrieval).fetchall()
return {
cast(
Referrable[ProtobufWithUUID],
self._factory.create(
sp.deserialize(value.referrable), store=False
),
)
for value in values
}
def all_referrings(self, uuid: str) -> List[str]:
"""Returns a list all items referring to a Referrable recursively."""
target = self.referrable(uuid)
to_delete, to_check = set(), {target}
while len(to_check) > 0:
node = to_check.pop()
if not node:
continue
to_delete.add(node)
deps = node.referring()
if not deps:
continue
for dep in deps:
if dep not in to_delete:
to_check.add(dep)
return [msg.uuid() for msg in to_delete]
def delete(self, uuid: str) -> None:
"""Delete an object all referring items."""
uuids_to_delete = self.all_referrings(uuid)
with self._engine.begin() as conn:
result = conn.execute(
delete(self._sources).where(
self._sources.c.referring.in_(uuids_to_delete)
)
)
logger.info(f"Deleted {result.rowcount} elements from sources")
result = conn.execute(
delete(self._referring).where(
self._referring.c.referring.in_(uuids_to_delete)
)
)
logger.info(f"Deleted {result.rowcount} elements from referring")
result = conn.execute(
delete(self._referrables).where(
self._referrables.c.uuid.in_(uuids_to_delete)
)
)
logger.info(f"Deleted {result.rowcount} elements from referrables")
# empty cache
self.cache.clear()
def delete_type(self, type_name: str) -> None:
"""Deletes all referrable corresponding to a given type_name and all
the referrings corresponfing to it"""
uuids = [obj.uuid() for obj in self.type_name(type_name)]
uuids_to_delete = set(
chain(*(self.all_referrings(uuid) for uuid in uuids))
)
with self._engine.begin() as conn:
result = conn.execute(
delete(self._sources).where(
self._sources.c.referring.in_(uuids_to_delete)
)
)
logger.info(f"Deleted {result.rowcount} elements from sources")
result = conn.execute(
delete(self._referring).where(
self._referring.c.referring.in_(uuids_to_delete)
)
)
logger.info(f"Deleted {result.rowcount} elements from referring")
result = conn.execute(
delete(self._referrables).where(
self._referrables.c.uuid.in_(uuids_to_delete)
)
)
logger.info(f"Deleted {result.rowcount} elements from referrables")
# empty cache
self.cache.clear()
def truncate(self) -> None:
# Remove all the objects
with self._engine.begin() as conn:
conn.execute(delete(self._referrables))
conn.execute(delete(self._referring))
# Some utility functions to create the DB
def sqlite(path: str) -> Engine:
metadata = MetaData()
referrables = set_referrables_table(metadata) # noqa: F841
referring = set_referring_table(metadata) # noqa: F841
sources = set_sources_table(metadata) # noqa: F841
uri = f"sqlite:///{path}"
engine = create_engine(uri)
metadata.create_all(engine)
return engine