Repository URL to install this package:
|
Version:
3.0.0rc10 ▾
|
from asyncio import sleep
from datetime import datetime
from typing import Any, Dict, List, Optional
from tdw_catalog.entity import Entity, EntityBase, Property
from tdw_catalog import Catalog
import tdw_catalog.connection as connection
from tdw_catalog.errors import CatalogException, _convert_error
from tdw_catalog.relations import _ConnectionRelation
from tdw_catalog.utils import ImportState, IngestPipeline
@Entity([
Property('id', str, serialize=True),
Property('dataset_id', str, serialize=True),
Property('user_id', str, serialize=True),
Property('last_updated_by_user_id', str),
Property('state', ImportState),
Property("organization_id", str, serialize=True),
Property('warehouse', str, writable=True),
Property('url', str, writable=True),
Property('last_e_tag', str),
Property('ingest_config', str, writable=True),
Property('pipeline', str, writable=True),
Property('dataspec_enabled', bool, writable=True),
Property('source_portal', str),
Property("connection_id",
str,
relation="tdw_catalog.connection._Connection",
writable=True, serialize=True),
Property("destination_connection_id", str, writable=True, serialize=False),
Property("ingest_schedules",
Optional[List[connection.ConnectionSchedule]],
writable=True,
deserialize=connection._deserialize_connection_schedules),
Property('next_ingest', Optional[datetime]),
Property('imported_at', Optional[datetime]),
Property('import_started_at', Optional[datetime]),
Property('failed_at', Optional[datetime]),
Property('created_at', Optional[datetime]),
Property('updated_at', Optional[datetime]),
Property('marked_for_destruction_at', Optional[datetime]),
Property('started_at', Optional[datetime]),
Property('finalizing_at', Optional[datetime]),
Property('create_as_query_statement', str, writable=True),
Property('ingest_pipeline', IngestPipeline, writable=True),
Property('dataspec_pipeline_version', str, writable=True),
Property('create_as_query_config', Optional[Dict[str, Any]], writable=True),
Property('er_config', Optional[Dict[str, Any]], writable=True),
])
class _Reference(EntityBase, _ConnectionRelation):
_client: 'Catalog'
id: str
dataset_id: str
last_updated_by_user_id: str
user_id: str
state: str
organization_id: str
warehouse: str
url: str
last_e_tag: str
ingest_config: str
pipeline: str
dataspec_enabled: bool
source_portal: str
Connection: connection._Connection
connection_id: str
destination_connection_id: str
ingest_schedules: Optional[List[connection.ConnectionSchedule]]
next_ingest: Optional[datetime]
imported_at: Optional[datetime]
import_started_at: Optional[datetime]
failed_at: Optional[datetime]
created_at: Optional[datetime]
updated_at: Optional[datetime]
marked_for_destruction_at: Optional[datetime]
started_at: Optional[datetime]
finalizing_at: Optional[datetime]
create_as_query_statement: str
ingest_pipeline: IngestPipeline
dataspec_pipeline_version: str
create_as_query_config: Optional[Dict[str, Any]]
er_config: Optional[Dict[str, Any]]
def __str__(self) -> str:
return f"<_Reference id={self.id} dataset_id={self.dataset_id}>"
@classmethod
def get(cls, client: 'Catalog', id: str, organization_id: str):
try:
res = client._get_reference(id=id, organization_id=organization_id)
return cls(client, **res)
except Exception as e:
raise _convert_error(e)
@property
def exists(self) -> bool:
return self.id is not None and len(self.id) > 0
def save(self) -> None:
try:
if self.exists:
res = self._client._update_reference(
reference=self.serialize())
self.deserialize(res)
else:
res = self._client._create_reference(
reference=self.serialize())
self.deserialize(res)
except Exception as e:
raise _convert_error(e)
def force_fail(self) -> Optional[Dict[str, any]]:
if self.exists:
return self._client._force_current_job_fail(
reference_id=self.id,
organization_id=self.organization_id,
)
return None
async def ingest(self) -> None:
if self.exists:
job = self._client._ingest_reference(
id=self.id,
organization_id=self.organization_id,
)
while job is not None and job['state'] != 'finished' and job[
'state'] != 'failed':
jobs = self._client._bulk_get_latest_job(
reference_ids=[self.id],
organization_id=self.organization_id,
)
job = next(iter(jobs), None)
await sleep(0.5)
if job is None or job['state'] == 'failed':
raise CatalogException(message=
"Unable to complete file import: {msg}".format(
msg=job['error_msg']))
return
def list_table_views(self) -> Optional[List[Dict[str, any]]]:
if not self.exists:
return None
return self._client._list_table_views(
reference_id=self.id,
)
def get_table_view(self, id: str) -> Optional[Dict[str, any]]:
if not self.exists:
return None
return self._client._get_table_view(
id=id,
)