Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
tdw-catalog / tdw_catalog / source.py
Size: Mime:
from typing import TYPE_CHECKING, List, Optional, Union
from tdw_catalog import connection, credential, warehouse
from tdw_catalog.entity import Entity, Property, EntityBase
from tdw_catalog.errors import CatalogException, _convert_error, _raise_error

from datetime import datetime
from tdw_catalog.relations import _OrganizationRelation

from tdw_catalog.utils import ConnectionPortalType, ListConnectionsFilter

if TYPE_CHECKING:
    import tdw_catalog.organization as organization


@Entity([
    Property("id", str, serialize=True),
    Property("organization_id",
             str,
             relation="tdw_catalog.organization.Organization",
             serialize=True),
    Property("user_id", str, serialize=True),
    Property("label", str, writable=True),
    Property("description", str, writable=True),
    Property("created_at", datetime),
    Property("updated_at", datetime)
])
class Source(EntityBase, _OrganizationRelation):
    """
    A :class:`.Source` is used to semantically group a set of related :class:`.Dataset`\\ s. :class:`.User`\\ s
    are free to label a :class:`.Source` in a descriptive way to best understand the
    meaning behind this grouping.

    Attributes
    ----------
    id : str
        Source's unique id
    organization : Organization
        The :class:`.Organization`associated with this :class:`.Source`.  An :class:`.Organization` or ``organization_id`` can be provided but not both.
    organization_id : str
        The unique ID of the :class:`.Organization` to which this :class:`.Source` belongs
    user_id : str
        The unique user ID of the :class:`.OrganizationMember` who created this :class:`.Source`
    label : str
        A descriptive label for this :class:`.Source`
    description : Optional[str] = None
        An optional extended description for this :class:`.Source`
    created_at : datetime
        The datetime at which this :class:`.Source` was created
    updated_at :  datetime
        The datetime at which this :class:`.Source` was last updated
    """

    id: str
    organization: 'organization.Organization'
    organization_id: str
    user_id: str
    label: str
    description: str
    created_at: datetime
    updated_at: datetime

    def __str__(self) -> str:
        return f'<Source id={self._id} label={self.label}>'

    @classmethod
    def get(cls, client, organization_id: str, id: str):
        """
        Retrieve a :class:`.Source` belonging to this :class:`.Organization`

        Parameters
        ----------
        client : Catalog
            The :class:`.Catalog`  client to use to get the :class:`.Source`
        organization_id : str
            The :class:`.Organization`ID the :class:`.Source` belongs to
        id : str
            The unique ID of the :class:`.Source`

        Returns
        -------
        Source
            The :class:`.Source` associated with the given ID

        Raises
        ------
        CatalogInternalException
            If call to the :class:`.Catalog` server fails
        CatalogNotFoundException
            If the :class:`.Source` with the supplied ID could not be found
        CatalogPermissionDeniedException
            If the caller is not allowed to retrieve this :class:`.Source`
        """
        try:
            res = client._get_source(id=id, organization_id=organization_id)
            return cls(client, **res)
        except Exception as e:
            err = _raise_error(e, "Unable to fetch Source {}".format(id))

    def save(self) -> None:
        """
        Update this :class:`.Source`, saving any changes to its fields

        Raises
        ------
        CatalogPermissionDeniedException
            If the caller is not allowed to update this :class:`.Source`
        CatalogException
            If call to the :class:`.Catalog` server fails
        """
        try:
            res = self._client._update_source(source=self.serialize())
            self.deserialize(res)
        except Exception as e:
            raise _convert_error(e)

    def delete(self) -> None:
        """
        Delete this :class:`.Source`. This :class:`.Source` object should not be
        used after `delete()` has successfully returned

        Raises
        ------
        CatalogPermissionDeniedException
            If the caller is not allowed to delete this :class:`.Source`
        CatalogException
            If call to the :class:`.Catalog` server fails
        """
        try:
            self._client._delete_source(source=self.serialize())
        except Exception as e:
            raise _convert_error(e)

    def list_connections(
        self,
        filter: Optional[ListConnectionsFilter] = None
    ) -> List[Union['connection.IngestionConnection',
                    'connection.VirtualizationConnection']]:
        """
        List all :class:`.IngestionConnection` and :class:`.VirtualizationConnection`\\ s belonging to this :class:`.Source`

        Parameters
        ----------
        filter : Optional[ListConnectionsFilter]
            An optional filter on the returned Connection list, useful for pagination of results.
            Note that the `organization_id` and source_ids properties will be set automatically
            to this :class:`.Organization` and Source.

        Returns
        -------
        List[Connection]
            The list of Connections in this :class:`.Source`

        Raises
        ------
        CatalogPermissionDeniedException
            If the caller is not allowed to list Connections in this :class:`.Organization`
        CatalogException
            If call to the :class:`.Catalog` server fails
        """
        try:
            f = filter
            if f is None:
                f = ListConnectionsFilter()
            f.organization_id = self.organization_id
            f.source_ids = [self.id]
            res = self._client._list_connections(filter=f)
            # Guard against unexpected response format
            org_connections = res["connections"] if isinstance(res, dict) and "connections" in res else []
            return list(
                map(
                    lambda org_connection: connection.IngestionConnection(
                        client=self._client, **org_connection)
                    if org_connection["portal"] == ConnectionPortalType.
                    EXTERNAL else connection.VirtualizationConnection(
                        client=self._client, **org_connection),
                    org_connections,
                ))
        except Exception as e:
            raise _convert_error(e)

    def _create_connection(
        self,
        label: str,
        portal: ConnectionPortalType,
        url: Optional[str] = None,
        description: Optional[str] = None,
        warehouse: Optional['warehouse.Warehouse'] = None,
        default_schema: Optional[str] = None,
        credential: Optional['credential.Credential'] = None,
        ingest_schedules: Optional[
            List['connection.ConnectionSchedule']] = None,
    ):
        try:
            create_args = {
                'source_id': self.id,
                'label': label,
                'portal': portal,
            }
            if url is not None:
                create_args['url'] = url
            if description is not None:
                create_args['description'] = description
            if credential is not None:
                create_args['credential_id'] = credential.id
            if warehouse is not None:
                create_args['warehouse'] = warehouse.name
            if default_schema is not None:
                create_args['default_schema'] = default_schema
            if ingest_schedules is not None:
                create_args['ingest_schedules'] = [
                    s.serialize() for s in ingest_schedules
                ]

            res = self._client._create_connection(**create_args)
            if portal == ConnectionPortalType.EXTERNAL:
                return connection.VirtualizationConnection(
                    client=self._client,
                    **res,
                )
            else:
                return connection.IngestionConnection(
                    client=self._client,
                    **res,
                )
        except Exception as e:
            raise _convert_error(e)

    def create_ingestion_connection(
        self,
        label: str,
        portal: ConnectionPortalType,
        url: Optional[str] = None,
        description: Optional[str] = None,
        warehouse: Optional['warehouse.Warehouse'] = None,
        credential: Optional['credential.Credential'] = None,
        ingest_schedules: Optional[
            List['connection.ConnectionSchedule']] = None,
    ) -> 'connection.IngestionConnection':
        """
        Create an :class:`.IngestionConnection` within this :class:`.Source`

        Parameters
        ----------
        label : str
            The descriptive label for this :class:`.IngestionConnection`
        portal : ConnectionPortalType
            The method of data access employed by this :class:`.IngestionConnection`
        url : Optional[str]
            A canonical URL that points to the location of data resources within the portal
        description : Optional[str] = None
            An optional extended description for this :class:`.IngestionConnection`
        warehouse : Optional[Warehouse]
            :class:`.Dataset`\\ s created using this :class:`.IngestionConnection` will ingest to this :class:`.Warehouse` by default (can be overriden at ingest time).
        credential : Optional[Credential]
            The :class:`.Credential` associated with this :class:`.IngestionConnection`.
        ingest_schedules : Optional[List[ConnectionSchedule]]
            Optional :class:`.ConnectionSchedule`\\ s which, when specified, indicate the frequency with which to
            reingest ingested data. Specific Datasets using this :class:`.IngestionConnection`
            may override this set of Schedules.

        Returns
        -------
        IngestionConnection
            The newly created IngestionConnection

        Raises
        ------
        CatalogPermissionDeniedException
            If the caller is not allowed to create :class:`.IngestionConnection`\\ s in this :class:`.Organization`
        CatalogException
            If call to the :class:`.Catalog` server fails
        """
        return self._create_connection(label, portal, url, description,
                                       warehouse, None, credential,
                                       ingest_schedules)

    # TODO change to warehouse creation builder, which builds appropriate params
    # for CreateWarehouseRequest, and remove warehouse argument (pending
    # the launch of warehouse self-serve)
    def _create_virtualization_connection(
        self,
        label: str,
        default_schema: str,
        description: Optional[str] = None,
        warehouse: Optional['warehouse.Warehouse'] = None,
        credential: Optional['credential.Credential'] = None,
        metrics_collection_schedules: Optional[
            List['connection.ConnectionSchedule']] = None,
    ) -> 'connection.IngestionConnection':
        """
        Create a :class:`.VirtualizationConnection` within this :class:`.Source`

        Parameters
        ----------
        label : str
            The descriptive label for this :class:`.VirtualizationConnection`
        default_schema: string
            The schema to search for tables and views when this :class:`.VirtualizationConnection` is used for data virtualization.
        description : Optional[str] = None
            An optional extended description for this :class:`.VirtualizationConnection`
        warehouse : Optional[Warehouse]
            :class:`.Dataset`\\ s created using this :class:`.VirtualizationConnection` will always access data from this :class:`.Warehouse`
        credential : Optional[Credential]
            The :class:`.Credential` associated with this :class:`.VirtualizationConnection`. Omitted when virtualizing.  A :class:`.Credential` or ``credential_id`` can be provided but not both.
        metrics_collection_schedules : Optional[List[ConnectionSchedule]]
            Optional :class:`.ConnectionSchedule`\\ s which, when specified, indicate the frequency with which to
            re-analyze virtualized data. Specific Datasets using this :class:`.VirtualizationConnection`
            may override this set of Schedules.

        Returns
        -------
        VirtualizationConnection
            The newly created VirtualizationConnection

        Raises
        ------
        CatalogPermissionDeniedException
            If the caller is not allowed to create :class:`.VirtualizationConnection`\\ s in this :class:`.Organization`
        CatalogException
            If call to the :class:`.Catalog` server fails
        """
        return self._create_connection(
            label=label,
            portal=ConnectionPortalType.EXTERNAL,
            url=None,
            description=description,
            warehouse=warehouse,
            default_schema=default_schema,
            credential=credential,
            ingest_schedules=metrics_collection_schedules,
        )