Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
tdw-catalog / tdw_catalog / list_datasets.py
Size: Mime:
from dataclasses import dataclass
from datetime import datetime
from enum import IntEnum
import sys
from typing import List, Optional, TYPE_CHECKING
from tdw_catalog import organization_member, source, topic, warehouse

from tdw_catalog.utils import LegacyFilter as BaseFilter, FilterSortOrder, ImportState, _convert_datetime_to_nullable_timestamp
if sys.version_info >= (3, 11):
    from enum import StrEnum
else:
    from backports.strenum import StrEnum

if TYPE_CHECKING:
    from tdw_catalog import Catalog


class SortableField(StrEnum):
    """
    The different fields which ``list_datasets`` on :class:`.Organization` can be sorted by
    """
    TITLE = "title",
    CREATED_AT = "created_at",
    IMPORTED_AT = "imported_at",
    UPDATED_AT = "updated_at",
    STATE = "reference_state",
    NEXT_INGEST = "reference_next_ingest",
    FAILED_AT = "reference_failed_at",
    SOURCE_NAME = "source_label"


class TimestampField(IntEnum):
    """
    The different possible fields that can be used to construct a :class:`.TimestampRange` filter for ``list_datasets`` on :class:`.Organization`
    """
    CREATED_AT = 0,
    UPDATED_AT = 1,
    IMPORTED_AT = 2,
    NEXT_INGEST = 3,
    FAILED_AT = 5


@dataclass
class TimestampRange():
    """
    Used to construct a temporal filter for ``list_datasets`` on :class:`.Organization`\\ , where a filter specifies a :class:`.TimestampField` and a time range
    """
    filter_by: TimestampField
    start_time: Optional[datetime]
    end_time: Optional[datetime]


@dataclass
class Sort():
    """
    Used to sort the results of ``list_datasets`` on :class:`.Organization`\\ .
    """
    field: SortableField
    order: Optional[FilterSortOrder] = FilterSortOrder.ASC

    def serialize(self):
        new_sort = {}
        new_sort["value"] = self.field
        new_sort[
            "order"] = "ASC" if self.order == FilterSortOrder.ASC else "DESC"


@dataclass
class DatasetAlias():
    """
    Used to sort the results of ``list_datasets`` by specific aliases
    """
    alias_key: str
    alias_values: List[str]

    def serialize(self):
        return {
            "alias_key": self.alias_key,
            "alias_values": self.alias_values,
        }


@dataclass
class Filter(BaseFilter):
    """
    :class:`.ListOrganizationDatasetsFilter` filters the results from ``list_datasets``
    on :class:`.Organization`\\ .

    Attributes
    ----------
    keywords : Optional[List[str]]
        Filters results according to the specified keyword(s) (fuzzy matching is supported)
    dataset_ids : Optional[List[str]]
        Filters results to the list of given :class:`.Dataset`\\ id(s)
    datset_aliases : Optional[List[DatasetAlias]]
        Filters results to the list of given :class:`.Dataset`\\ alias(es)
    sources : Optional[List[Source]]
        Filters results to the list of given :class:`.Source`\\ (s)
    topics : Optional[List[Topic]]
      Filters results to the list of given :class:`.Topic`\\ (s)
    creators : Optional[List[OrganizationMember]]
        Filters results to the list of given :class:`.OrganizationMember`\\ (s), who created the returned :class:`.Dataset`\\ s
    state : Optional[List[ImportState]]
        Filters results to the list of given :class:`.ImportState`\\ s. Note that virtualized datasets will always be categorized as ``IMPORTED``.
    warehouses: Optional[List[Warehouse]]
        Filters results to the list of given :class:`.Warehouse`\\ s
    timestamp_range : Optional[TimestampRange]
        Filters results to the within the given :class:`.TimestampRange`
    sort : Optional[Sort]
        Sorts filtered results according to the provided :class:`.Sort` structure
    """
    keywords: Optional[List[str]] = None
    dataset_ids: 'Optional[List[str]]' = None
    dataset_aliases: 'Optional[List[DatasetAlias]]' = None
    reference_ids: Optional[List[str]] = None
    sources: 'Optional[List[source.Source]]' = None
    topics: 'Optional[List[topic.Topic]]' = None
    creators: 'Optional[List[organization_member.OrganizationMember]]' = None
    states: Optional[List[ImportState]] = None
    warehouses: 'Optional[List[warehouse.Warehouse]]' = None
    timestamp_range: Optional[TimestampRange] = None
    sort: Optional[Sort] = None

    def serialize(self):
        new_filter = super().serialize()
        new_filter["exclude_versions"] = True
        if self.keywords is not None:
            new_filter["keywords"] = self.keywords
        if self.dataset_ids is not None:
            new_filter["dataset_ids"] = self.dataset_ids
        if self.dataset_aliases is not None:
            new_filter["dataset_aliases"] = [
                a.serialize() for a in self.dataset_aliases
            ]
        if self.sources is not None:
            new_filter["source_ids"] = [s.id for s in self.sources]
        if self.topics is not None:
            new_filter["topic_ids"] = [t.id for t in self.topics]
        if self.sort is not None:
            new_filter["sort"] = self.sort.serialize()
        if self.creators is not None:
            new_filter["uploader_ids"] = [u.id for u in self.creators]
        if self.states is not None:
            new_filter["reference_states"] = self.states
        if self.warehouses is not None:
            new_filter["warehouses"] = [w.name for w in self.warehouses]
        if self.timestamp_range is not None:
            start_timestamp = _convert_datetime_to_nullable_timestamp(
                self.timestamp_range.start_time)
            end_timestamp = _convert_datetime_to_nullable_timestamp(
                self.timestamp_range.end_time)
            new_filter["timestamp_range"] = {
                "filter_by": self.timestamp_range.filter_by,
                "from": {
                    "is_null": start_timestamp["is_null"],
                    "timestamp": {
                        "seconds": start_timestamp["timestamp"]["seconds"],
                        "nanos": start_timestamp["timestamp"]["nanos"]
                    }
                },
                "to": {
                    "is_null": end_timestamp["is_null"],
                    "timestamp": {
                        "seconds": end_timestamp["timestamp"]["seconds"],
                        "nanos": end_timestamp["timestamp"]["nanos"]
                    }
                }
            }
        return new_filter


class _ListDatasetsIterator():
    _client: 'Catalog'
    _dataset_ids: List[str]

    def __init__(self, client: 'Catalog', dataset_ids: List[str]):
        self._client = client
        self._dataset_ids = dataset_ids
        self._index = 0

    def __iter__(self):
        return self

    def __next__(self):
        import tdw_catalog.dataset as dataset
        if self._index < len(self._dataset_ids):
            item = self._dataset_ids[self._index]
            self._index += 1
            return dataset.Dataset.get(client=self._client, id=item)
        else:
            raise StopIteration