Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
tdw-catalog / tdw_catalog / _reference.py
Size: Mime:
from asyncio import sleep
from datetime import datetime
from typing import Any, Dict, List, Optional
from tdw_catalog.entity import Entity, EntityBase, Property
from tdw_catalog import Catalog
import tdw_catalog.connection as connection
from tdw_catalog.errors import CatalogException, _convert_error
from tdw_catalog.relations import _ConnectionRelation
from tdw_catalog.utils import ImportState, IngestPipeline


@Entity([
    Property('id', str, serialize=True),
    Property('dataset_id', str, serialize=True),
    Property('user_id', str, serialize=True),
    Property('last_updated_by_user_id', str),
    Property('state', ImportState),
    Property("organization_id", str, serialize=True),
    Property('warehouse', str, writable=True),
    Property('url', str, writable=True),
    Property('last_e_tag', str),
    Property('ingest_config', str, writable=True),
    Property('pipeline', str, writable=True),
    Property('dataspec_enabled', bool, writable=True),
    Property('source_portal', str),
    Property("connection_id",
             str,
             relation="tdw_catalog.connection._Connection",
             writable=True, serialize=True),
    Property("destination_connection_id", str, writable=True, serialize=False),
    Property("ingest_schedules",
             Optional[List[connection.ConnectionSchedule]],
             writable=True,
             deserialize=connection._deserialize_connection_schedules),
    Property('next_ingest', Optional[datetime]),
    Property('imported_at', Optional[datetime]),
    Property('import_started_at', Optional[datetime]),
    Property('failed_at', Optional[datetime]),
    Property('created_at', Optional[datetime]),
    Property('updated_at', Optional[datetime]),
    Property('marked_for_destruction_at', Optional[datetime]),
    Property('started_at', Optional[datetime]),
    Property('finalizing_at', Optional[datetime]),
    Property('create_as_query_statement', str, writable=True),
    Property('ingest_pipeline', IngestPipeline, writable=True),
    Property('dataspec_pipeline_version', str, writable=True),
    Property('create_as_query_config', Optional[Dict[str, Any]], writable=True),
    Property('er_config', Optional[Dict[str, Any]], writable=True),
])
class _Reference(EntityBase, _ConnectionRelation):
    _client: 'Catalog'
    id: str
    dataset_id: str
    last_updated_by_user_id: str
    user_id: str
    state: str
    organization_id: str
    warehouse: str
    url: str
    last_e_tag: str
    ingest_config: str
    pipeline: str
    dataspec_enabled: bool
    source_portal: str
    Connection: connection._Connection
    connection_id: str
    destination_connection_id: str
    ingest_schedules: Optional[List[connection.ConnectionSchedule]]
    next_ingest: Optional[datetime]
    imported_at: Optional[datetime]
    import_started_at: Optional[datetime]
    failed_at: Optional[datetime]
    created_at: Optional[datetime]
    updated_at: Optional[datetime]
    marked_for_destruction_at: Optional[datetime]
    started_at: Optional[datetime]
    finalizing_at: Optional[datetime]
    create_as_query_statement: str
    ingest_pipeline: IngestPipeline
    dataspec_pipeline_version: str
    create_as_query_config: Optional[Dict[str, Any]]
    er_config: Optional[Dict[str, Any]]

    def __str__(self) -> str:
        return f"<_Reference id={self.id} dataset_id={self.dataset_id}>"

    @classmethod
    def get(cls, client: 'Catalog', id: str, organization_id: str):
        try:
            res = client._get_reference(id=id, organization_id=organization_id)
            return cls(client, **res)
        except Exception as e:
            raise _convert_error(e)

    @property
    def exists(self) -> bool:
        return self.id is not None and len(self.id) > 0

    def save(self) -> None:
        try:
            if self.exists:
                res = self._client._update_reference(
                    reference=self.serialize())
                self.deserialize(res)
            else:
                res = self._client._create_reference(
                    reference=self.serialize())
                self.deserialize(res)
        except Exception as e:
            raise _convert_error(e)

    def force_fail(self) -> Optional[Dict[str, any]]:
        if self.exists:
            return self._client._force_current_job_fail(
                reference_id=self.id,
                organization_id=self.organization_id,
            )
        return None

    async def ingest(self) -> None:
        if self.exists:
            job = self._client._ingest_reference(
                id=self.id,
                organization_id=self.organization_id,
            )
            while job is not None and job['state'] != 'finished' and job[
                    'state'] != 'failed':
                jobs = self._client._bulk_get_latest_job(
                    reference_ids=[self.id],
                    organization_id=self.organization_id,
                )
                job = next(iter(jobs), None)
                await sleep(0.5)
            if job is None or job['state'] == 'failed':
                raise CatalogException(message=
                    "Unable to complete file import: {msg}".format(
                        msg=job['error_msg']))
            return

    def list_table_views(self) -> Optional[List[Dict[str, any]]]:
        if not self.exists:
            return None
        return self._client._list_table_views(
            reference_id=self.id,
        )

    def get_table_view(self, id: str) -> Optional[Dict[str, any]]:
        if not self.exists:
            return None
        return self._client._get_table_view(
            id=id,
        )