Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
tdw-catalog / tdw_catalog / dataset.py
Size: Mime:
from datetime import datetime
from typing import TYPE_CHECKING, Dict, List, Optional, Union
from asyncio import gather, sleep

from tdw_catalog import Catalog
from tdw_catalog.export import CSVExport, ParquetExport
from tdw_catalog.query import QueryCursor
from tdw_catalog.relations import _OrganizationRelation, _SourceRelation
import tdw_catalog.source as source
import tdw_catalog.connection as connection_package
import tdw_catalog._reference as _reference
from tdw_catalog.entity import Entity, Property, EntityBase
from tdw_catalog.errors import CatalogException, CatalogInternalException, _convert_error, _raise_error
from tdw_catalog.dataset_connector import DatasetConnector
import tdw_catalog.topic as topic
from tdw_catalog.utils import _ExportFormat, Filter, ImportState
from tdw_catalog.utils import _parse_timestamp
import tdw_catalog.warehouse as warehouse_package
import tdw_catalog.metadata.editor as editor
import tdw_catalog.metadata.field as metadata_field
import tdw_catalog.metadata_template as metadata_template
from tdw_catalog.data_dictionary import Column, DataDictionary, MetadataOnlyDataDictionary

if TYPE_CHECKING:
    import tdw_catalog.organization as organization


def _deserialize_dataset_metadata_template(
    data: Optional[Dict[str, any]],
    dataset: 'Dataset') -> 'Optional[metadata_template.MetadataTemplate]':
    return None if data is None else metadata_template.MetadataTemplate(
        dataset._client, **data)


def _deserialize_data_dictionary(
    data: List[dict], dataset: 'Dataset'
) -> Union[DataDictionary, MetadataOnlyDataDictionary]:
    # grab the first version, which will be the most recent valid one
    version = data[0] if data is not None and len(data) > 0 else None
    Klass = DataDictionary if dataset.is_connected else MetadataOnlyDataDictionary
    return Klass(dataset=dataset,
                 last_updated_at=_parse_timestamp(version["updated_at"])
                 if version is not None else None,
                 version_id=version["id"]
                 if version is not None and "id" in version else None,
                 columns=[
                     Column._from_property(dataset=dataset, p=p)
                     for p in version["properties"]
                 ] if version is not None else [])


@Entity([
    Property("id", str, serialize=True),
    Property("title", str, writable=True),
    Property("description", Optional[str], writable=True),
    Property("uploader_id", str, display_key="creator_id"),
    Property("source_id",
             str,
             relation="tdw_catalog.source.Source",
             serialize=True,
             writable=True),
    Property("organization_id",
             str,
             relation="tdw_catalog.organization.Organization",
             serialize=True),
    Property("created_at", datetime),
    Property("updated_at", datetime),
    Property("custom_fields",
             List[metadata_field.MetadataField],
             deserialize=metadata_field._deserialize_metadata_fields,
             serialize=True,
             writable=False,
             readable=False),
    Property("field_template",
             Optional[metadata_template.MetadataTemplate],
             display_key="metadata_template",
             deserialize=_deserialize_dataset_metadata_template,
             serialize=True,
             writable=False),
    Property("versions",
             Union[DataDictionary, MetadataOnlyDataDictionary],
             display_key="data_dictionary",
             deserialize=_deserialize_data_dictionary),
    # TODO warehouse metadata
    # TODO warehouse custom metadata
])
class Dataset(EntityBase, _OrganizationRelation, _SourceRelation):
    """
    A :class:`.Dataset` represents a cataloged data asset within an :class:`.Organization`.
    It is a container for structured and custom metadata describing the
    asset, and can optionally be connected to the data asset via a
    :class:`.IngestionConnection` or :class:`.VirtualizationConnection` to support queries,
    health monitoring, etc.

    Attributes
    __________
    id: str
        The :class:`.Dataset`'s unique ID
    title: str
        The title of the :class:`.Dataset`
    description: Optional[str]
        The full description text (supports Markdown) that helps describe this :class:`.Dataset`
    uploader_id: str
        The unique ID of the :class:`.User` that created this :class:`.Dataset`
    source_id: str
        The unique ID of the :class:`.Source` associated with this :class:`.Dataset`
    source: str
        The :class:`.Source` associated with this :class:`.Dataset`
    organization_id: str
        The unique ID of the :class:`.Organization` which this :class:`.Dataset` belongs to
    organization: Organization
        The :class:`.Organization` which this :class:`.Dataset` belongs to
    metadata_template: MetadataTemplate
        The :class:`.MetadataTemplate` attached to this :class:`.Dataset`, if any
    data_dictionary: DataDictionary
        The :class:`.DataDictionary` defined within this :class:`.Dataset`, or describing the schema of the connected data if this is a :class:`.ConnectedDataset`
    created_at: datetime
        The date this :class:`.Dataset` was originally created
    updated_at: datetime
        The date this :class:`.Dataset`\\ 's metadata was last modified
    """
    _client: 'Catalog'
    _context_organization: 'organization.Organization'
    id: str
    title: str
    description: Optional[str]
    uploader_id: str
    source: 'source.Source'
    source_id: str
    organization: 'organization.Organization'
    organization_id: str
    metadata_template: 'metadata_template.MetadataTemplate'
    data_dictionary: Union[DataDictionary, MetadataOnlyDataDictionary]
    created_at: datetime
    updated_at: datetime

    def __str__(self) -> str:
        return f"<Dataset id={self.id} title={self.title} organization_id={self.organization_id}>"

    def _get_templated_metadata(self) -> List['metadata_field.MetadataField']:
        if self._field_template is None:
            return []
        templated_fields = []
        custom_field_keys = list(map(
            lambda f: f["key"],
            self._custom_fields)) if self._custom_fields is not None else []

        # Add each field from custom_fields corresponding to a field in the template
        for field in self._field_template.fields:
            # if there is field in custom_fields with the same key as a template field, ensure it is the same type as the template field then add it to templated_metadata
            if field["key"] in custom_field_keys:
                matched_custom_field = next(
                    (x for x in self._custom_fields if x.key == field.key),
                    None)
                if metadata_field._check_field_match(
                    field=matched_custom_field,
                    templated_field=field) == False:
                    raise CatalogInternalException(
                        message=
                        f"The type of the field with key {field.key} on this Dataset does not match the type of the field with key {field.key} on its attached MetadataTemplate"
                    )
                templated_fields.append(matched_custom_field)
            # if there is field missing from custom_fields but present in the metadata template, add it directly from the template fields list to templated_metadata
            else:
                templated_fields.append(
                    metadata_field._convert_template_field(field))
        return templated_fields

    def _get_custom_metadata(self) -> List['metadata_field.MetadataField']:
        metadata_template_field_keys = list(
            map(lambda f: f["key"], self._field_template.fields)
        ) if self._field_template is not None else []
        # Add each field from custom_fields that does not correspond to a field in the template
        return [
            field for field in self._custom_fields
            if field["key"] not in metadata_template_field_keys
        ] if self._custom_fields is not None else []

    @property
    def custom_metadata(self) -> List['metadata_field.MetadataField']:
        """
        A list of :class:`.MetadataField`\\ s attached to this :class:`.Dataset` that are not associated with an attached :class:`.MetadataTemplate`
        """
        return self._get_custom_metadata()

    def update_custom_metadata(self) -> 'editor.MetadataEditor':
        """
        Provides a :class:`.MetadataEditor` which allows for the addition, removal, and alteration of :class:`.MetadataField`\\ s on this :class:`.Dataset` that are not associated with an attached :class:`.MetadataTemplate`

        Parameters
        ----------
        None

        Returns
        -------
        MetadataEditor
            An editor for adding, removing, and updating :class:`.MetadataField`\\ s on the :class:`.Dataset` which do not belong to a :class:`.MetadataTemplate`
        """
        return editor.MetadataEditor(fields=self.custom_metadata, dataset=self)

    @property
    def templated_metadata(self) -> List['metadata_field.MetadataField']:
        """
        A list of :class:`.MetadataField`\\ s attached to this :class:`.Dataset` that are associated with an attached :class:`.MetadataTemplate`
        """
        return self._get_templated_metadata()

    def update_templated_metadata(self) -> 'editor.TemplatedMetadataEditor':
        """
        Provides a :class:`.TemplatedMetadataEditor` which allows for the alteration of :class:`.MetadataField`\\ s on this :class:`.Dataset` that are associated with an attached :class:`.MetadataTemplate`. This object cannot add or remove :class:`.MetadataField`\\ s, that must be done on the :class:`.MetadataTemplate` directly.

        Parameters
        ----------
        None

        Returns
        -------
        TemplatedMetadataEditor
            An editor for updating :class:`.MetadataField`\\ s on the :class:`.Dataset` that are associated with an attached :class:`.MetadataTemplate`
        """
        return editor.TemplatedMetadataEditor(fields=self.templated_metadata,
                                              template=self._field_template,
                                              dataset=self)

    @property
    def is_connected(self) -> bool:
        return False

    @classmethod
    def get(cls,
            client: 'Catalog',
            id: str,
            context_organization: Optional['organization.Organization'] = None
            ):
        """
        Retrieve a :class:`.Dataset`

        Parameters
        ----------
        client : Catalog
            The :class:`.Catalog`  client to use to get the :class:`.Dataset`
        id : str
            The unique ID of the :class:`.Dataset`
        context_organization : Optional[Organization]
            The :class:`.Organization` from which this :class:`.Dataset` is being retrieved.
            :class:`.Dataset`\\ 's may be accessible from multiple :class:`.Organization`\\ 's,
            but can have differing metadata within each. This context parameter is necessary
            to determine which metadata to load.

        Returns
        -------
        Dataset
            The :class:`.Dataset` associated with the given ID

        Raises
        ------
        CatalogPermissionDeniedException
            If the caller is not allowed to retrieve the given :class:`.Dataset`
        CatalogNotFoundException
            If the given :class:`.Dataset` ID does not exist
        CatalogException
            If call to the :class:`.Catalog` server fails
        """
        try:
            res = client._get_dataset(id=id)
            # pick out the valid versions and call list_properties,
            # which returns glossary terms information
            versions = None if "versions" not in res["dataset"] or res[
                "dataset"]["versions"] is None else [
                v for v in res["dataset"]["versions"]
                if v["state"] == "draft" or v["state"] == "imported"
            ]
            if versions is not None and len(versions) > 0:
                properties = client._list_properties(
                    version_ids=[versions[0]["id"]],
                    organization_ids=[res["dataset"]["organization_id"]])
                # overwrite properties for versions[0], to include
                # glossary term info
                versions[0]["properties"] = properties
                res["dataset"]["versions"] = versions
            if "reference_id" in res["dataset"]:
                d = ConnectedDataset(client, **res["dataset"])
                d._context_organization = context_organization

                # Reach down into the revision and set the storage table on the dataset
                if versions is not None and len(versions) > 0 and versions[0]["revisions"] is not None and len(
                    versions[0]["revisions"]) > 0:
                    d.storage_table = versions[0]["revisions"][0]["storage_table"]

                return d
            d = cls(client, **res["dataset"])
            d._context_organization = context_organization
            return d
        except Exception as e:
            err = _raise_error(e, "Unable to fetch Dataset {}".format(id))

    def attach_template(self, template: 'metadata_template.MetadataTemplate'):
        """
        Attach a :class:`.MetadataTemplate` to this :class:`.Dataset`. Values may be supplied to
        templated fields immediately, but the template will only be attached when
        class:`.Dataset` `.save()` is called.

        Parameters
        ----------
        template : MetadataTemplate
            The :class:`.MetadataTemplate` to be attached to the :class:`.Dataset`

        Returns
        -------
        Dataset
            The :class:`.Dataset` with a newly attached :class:`MetadataTemplate`
        """
        if template is not None:
            _update_fields_after_template_attachment(dataset=self,
                                                     template=template)

        return self

    def detach_template(self):
        """
        Remove the attached :class:`.MetadataTemplate` from this :class:`.Dataset`. Any fields from this :class:`.MetadataTemplate` will remain on the :class:`.Dataset` but as individual :class:`.MetadataField`\\ s. Detachment happens instantly and calling :class:`.Dataset`.save() is not necessary for the changes to persist

        Parameters
        ----------
        None

        Returns
        -------
        Dataset
            The :class:`.Dataset` with no attached :class:`.MetadataTemplate`
        """
        try:
            self._client._detach_field_template(
                organization_id=self.organization_id,
                dataset_id=self.id,
                field_template_id=self._field_template.id)
        except Exception as e:
            raise _convert_error(e)
        self._field_template = None
        return self

    def save(self) -> None:
        """
        Update this :class:`.Dataset`, saving all changes to its metadata fields.

        Raises
        ------
        CatalogPermissionDeniedException
            If the caller is not allowed to update this :class:`.Dataset`
        CatalogException
            If call to the :class:`.Catalog` server fails
        """
        try:
            res = self._client._update_dataset(dataset=self.serialize())
            self.deserialize(res)
        except Exception as e:
            raise _convert_error(e)

    def delete(self) -> None:
        """
        Delete this :class:`.Dataset`\\.  The :class:`.Dataset` object should not be used after this
        method is invoked successfully.

        Raises
        ------
        CatalogPermissionDeniedException
            If the caller is not allowed to delete this :class:`.Dataset`
        CatalogException
            If call to the :class:`.Catalog` server fails
        """
        try:
            self._client._delete_dataset(id=self.id)
        except Exception as e:
            raise _convert_error(e)

    def connect(self) -> 'DatasetConnector':
        """
        Converts a :class:`.Dataset` into a ConnectedDataset, by accessing
        data via an :class:`.IngestionConnection` or :class:`.VirtualizationConnection`.
        A :class:`.ConnectedDataset` can represent ingested data, which is copied into
        the :class:`.Catalog` platform, or virtualized data which is accessed remotely by the
        platform without being copied.

        There are many methods for connecting a :class:`.Dataset`, thus a
        helper object is returned with various method-based
        workflows that aid in connecting to data.

        Returns
        -------
        DatasetConnector
            A helper object for configuring this :class:`.Dataset`\\ 's
            connection to data.
        """
        return DatasetConnector(self)

    def refresh(self) -> 'Dataset':
        """
        Return a fresh copy of this :class:`.Dataset`, with up-to-date
        property values. Useful after performing an update, connection,
        etc.
        """
        return Dataset.get(self._client, self.id, self._context_organization)

    def list_topics(self,
                    organization_id: Optional[str] = None,
                    filter: Optional[Filter] = None) -> 'List[topic.Topic]':
        """
        Retrieves the list of all :class:`.Topic`\\ s this :class:`.Dataset` is currently classified under, within the given :class:`.Organization`

        Parameters
        ----------
        organization_id : Optional[str]
          An optional ID for an :class:`.Organization` other than the original :class:`.Organization` the :class:`.Dataset` was created in (e.g. if the :class:`.Dataset` has been shared to another organization with a different set of :class:`.Topic`\\ s)
        filter : Optional[Filter]
          An optional :class:`.tdw_catalog.utils.Filter` to offset or limit the list of :class:`.Topic`\\ s returned

        Returns
        -------
        List[Topic]
          The list of :class:`.Topic`\\ s that have been classified to this :class:`.Dataset`

        Raises
        ------
        CatalogPermissionDeniedException
          If the caller is not allowed to list :class:`.Topic`\\ s in this :class:`.Organization`
        CatalogException
          If call to the :class:`.Catalog` server fails
        """
        try:
            topics = self._client._get_topics(
                dataset_id=self.id,
                organization_id=self.organization_id
                if organization_id is None else organization_id,
                filter=filter)
            return [topic.Topic(self._client, **t) for t in topics]
        except Exception as e:
            raise _convert_error(e)

    def classify(self, topic: 'topic.Topic') -> None:
        """
        Classify this :class:`.Dataset` with a :class:`.Topic`, linking them semantically

        Parameters
        ----------
        topic : Topic
          The :class:`.Topic` to classify this :class:`.Dataset` with

        Returns
        -------
        None

        Raises
        ------
        CatalogPermissionDeniedException
          If the caller is not allowed to classify :class:`.Dataset`\\ s, or if the :class:`.Topic` ID provided does not correspond to an existing :class:`.Topic`
        CatalogException
          If call to the :class:`.Catalog` server fails
        """
        try:
            self._client._set_topic(id=topic.id,
                                    dataset_id=self.id,
                                    organization_id=self.organization_id)
        except Exception as e:
            raise _convert_error(e)

    def declassify(self, topic: 'topic.Topic') -> None:
        """
        Remove a :class:`.Topic` classification from this :class:`.Dataset`

        Parameters
        ----------
        topic : Topic
          The :class:`.Topic` to be unclassified from this :class:`.Dataset`

        Returns
        -------
        None

        Raises
        ------
        CatalogPermissionDeniedException
          If the caller is not allowed to declassify :class:`.Dataset`\\ s, or if the :class:`.Topic` ID provided does not correspond to an existing :class:`.Topic`
        CatalogException
          If call to the :class:`.Catalog` server fails
        """
        try:
            self._client._unset_topic(id=topic.id,
                                      dataset_id=self.id,
                                      organization_id=self.organization_id)
        except Exception as e:
            raise _convert_error(e)


@Entity([
    Property("reference_id",
             str,
             relation="tdw_catalog._reference._Reference",
             readable=False),
    Property("exports_disabled", bool, writable=True),
    Property("imported_at", datetime, display_key="metrics_last_collected_at"),
    # TODO versions & revisions
    Property("reference_next_ingest",
             Optional[datetime],
             display_key="next_scheduled_metrics_collection_time"),
    Property("reference_failed_at",
             Optional[datetime],
             display_key="last_metrics_collection_failure_time"),
    Property("warehouse_metadata",
             Optional[List[metadata_field.MetadataField]],
             deserialize=metadata_field._deserialize_metadata_fields),
])
class ConnectedDataset(Dataset):
    """
    A :class:`.ConnectedDataset` is identical to a :class:`.Dataset` and inherits
    all of its fields, but represents a :class:`.Dataset` which is connected
    to the actual underlying data asset via a Connection. A
    :class:`.ConnectedDataset` supports queries, export, health monitoring, etc.

    Attributes
    __________
    exports_disabled: bool
        A flag to mark if this :class:`.Dataset` may be exported. Setting this
        to false does not prevent querying on this :class:`.Dataset`. Only relevant if
        the :class:`.Dataset` is connected to data.
    warehouse: str
        The underlying data warehouse where that data resides
    metrics_last_collected_at: datetime
        The last time metrics were collected for this :class:`.Dataset` (virtualized :class:`.Dataset`\\ s)
        or the last time the :class:`.Dataset` was imported (ingested :class:`.Dataset`\\ s).
    next_scheduled_metrics_collection_time: Optional[datetime]
        If this :class:`.Dataset` has an associated connection schedule,
        the next time this dataset will collect metrics (virtualized Dataset)
        or import (ingested :class:`.Dataset`\\ s).
    last_metrics_collection_failure_time: datetime
        The most recent time metrics collection (virtualized :class:`.Dataset`\\ s)
        or import (ingested :class:`.Dataset`\\ s) failed. ``None`` if metrics collection
        has never failed.
    warehouse_metadata: Optional[List[metadata_field.MetadataField]]
        Harvested metadata from virtualized :class:`.Dataset`\\ s. ``None`` for ingested :class:`.Dataset`\\ s.
    """
    exports_disabled: bool
    metrics_last_collected_at: datetime
    next_scheduled_metrics_collection_time: Optional[datetime]
    last_metrics_collection_failure_time: Optional[datetime]
    warehouse_metadata: 'Optional[List[metadata_field.MetadataField]]'
    storage_table: Optional[str]

    @property
    def is_connected(self) -> bool:
        return True

    def refresh(self) -> 'ConnectedDataset':
        """
        Return a fresh copy of this :class:`.ConnectedDataset`, with up-to-date
        property values. Useful after performing an update, connection,
        etc.
        """
        return ConnectedDataset.get(self._client, self.id)

    @property
    def connection_id(self) -> str:
        """"The ID of the underlying :class:`.IngestionConnection` or :class:`.Virtualization` which links this :class:`.Dataset` to data"""
        return self._reference.connection_id

    @property
    def connection(
        self
    ) -> Union['connection_package.IngestionConnection',
    'connection_package.VirtualizationConnection']:
        """"The underlying :class:`.IngestionConnection` or :class:`.VirtualizationConnection` which links this :class:`.Dataset` to data"""
        return self._reference.connection

    @property
    def warehouse(self) -> 'warehouse_package.Warehouse':
        """
        The :class:`.Warehouse` where the connected data is virtualized from, or ingested to
        """
        return warehouse_package.Warehouse.get(self._client,
                                               self._reference.warehouse,
                                               self.organization_id)

    @property
    def import_state(self) -> ImportState:
        return self._reference.state

    @property
    def health_monitoring_enabled(self) -> bool:
        """
        Whether or not :class:`.Catalog` platform health monitoring is enabled for this :class:`.ConnectedDataset`
        """
        return self._reference.dataspec_enabled

    @health_monitoring_enabled.setter
    def set_health_monitoring_enabled(self, health_monitoring_enabled: bool):
        """
        Enable or disable health monitoring for this :class:`.ConnectedDataset`
        """
        self._reference.dataspec_enabled = health_monitoring_enabled

    @property
    def metrics_collection_schedules(
        self) -> Optional[List['connection_package.ConnectionSchedule']]:
        """
        Returns all configured schedules for metrics collection, which govern health monitoring intervals and ingestion intervals for ingested :class:`.Dataset`\\ s
        """
        return self._reference.ingest_schedules

    @metrics_collection_schedules.setter
    def set_metrics_collection_schedules(self, schedules: Optional[
        List['connection_package.ConnectionSchedule']]):
        """
        Set or clear metrics collection schedules
        """
        self._reference.ingest_schedules = schedules

    @property
    def advanced_configuration(self) -> str:
        """
        This configuration string is auto-generated during ingest,
        or when virtualization, inferred from the connected data.
        It can be modified, with caution, to alter how the :class:`.Catalog`
        perceives and represents the connected data.

        Modification of this configuration without support
        from ThinkData Works is not recommended.
        """
        return self._reference.ingest_config

    @advanced_configuration.setter
    def set_advanced_configuration(self, ingest_config: str):
        """
        This configuration string is auto-generated during ingest,
        or when virtualization, inferred from the connected data.
        It can be modified, with caution, to alter how the :class:`.Catalog`
        perceives and represents the connected data.

        Modification of this configuration without support
        from ThinkData Works is not recommended.
        """
        self._reference.ingest_config = ingest_config

    def connect(self) -> 'DatasetConnector':
        """
        Manage all connection-related aspects of this
        :class:`.ConnectedDataset`.

        There are many methods for connecting a :class:`.Dataset`, thus a
        helper object is returned with various method-based
        workflows that aid in connecting to data.

        Returns
        -------
        DatasetConnector
            A helper object for configuring this :class:`.Dataset`\\ 's
            connection to data.
        """
        return super().connect()

    async def reconnect(self):
        """
        Manually triggers a reimport of ingested data for
        ingested datasets, and metrics collection (health monitoring, etc.)
        for virtualized and ingested datasets.

        Useful for forcing a metrics collection, or applying changes
        made to the advanced_configuration.
        """
        try:
            self._reference.force_fail()
        except:
            pass
        await self._reference.ingest()

    def __str__(self) -> str:
        return f"<ConnectedDataset id={self.id} title={self.title} organization_id={self.organization_id}>"

    def _get__reference(self):
        return _reference._Reference.get(self._client, self._reference_id,
                                         self.organization_id)

    async def query(self, query: Optional[str] = None) -> QueryCursor:
        """
        Async function which returns a Python DB API-style Cursor object (PEP 249),
        representing the results of the supplied SQL-like NiQL query
        executed against the connected data.

        Note that NIQL supports most standard SQL keywords, but keywords
        which modify underlying data (e.g. ``INSERT``, ``UPDATE``, ``DELETE``)
        may not be used.

        Note that the :class:`.Catalog` platform supports a global limit on
        results (10,000 rows) from a single query.

        To refer to the current dataset in the query, include ``{this}``
        in the query, such as: ``"SELECT * FROM {this}"``.

        Parameters
        ----------
        query : Optional[str]
            A NiQL query used to filter or reshape the data before exporting

        Returns
        -------
        QueryCursor
            The query results cursor, which can be printed, converted to
            a ``pandas`` DataFrame via ``pd.DataFrame(res.fetchall())``, etc.

        Raises
        ------
        CatalogPermissionDeniedException
            If the caller is not allowed to query data
        CatalogInvalidArgumentException
            If the given query is invalid
        CatalogException
            If call to the :class:`.Catalog` server fails, or the export process itself fails
        """
        try:
            q = "SELECT * FROM {this}" if query is None else query
            q = q.format(this=self.id)
            res = self._client._query(statement=q)
            return QueryCursor(res)
        except Exception as e:
            raise _convert_error(e)

    async def _do_export(
        self,
        query: Optional[str] = None,
        format: Optional[_ExportFormat] = None) -> Dict[str, any]:
        try:
            file_name = 'export.parquet' if format is _ExportFormat.PARQUET else 'export.csv'
            export = self._client._create_export(query=query,
                                                 format=format,
                                                 file_name=file_name)
            export_in_progress = True
            while export_in_progress:
                finished_export_details = None

                # when the export was already been done before, this 'export' key
                # will be available right away.
                if 'export' in export:
                    finished_export_details = self._client._get_export(
                        id=export['export']['id'])
                else:
                    finished_export_details = self._client._get_export(
                        id=export['id'])

                if 'export' in finished_export_details:
                    state = finished_export_details['export']['state']
                else:
                    state = finished_export_details['state']

                if state == 'finished':
                    break
                elif state == 'failed':
                    # if a CSV_GZIP export fails, fallback to CSV
                    if format is _ExportFormat.CSV_GZIP:
                        return await self._do_export(query,
                                                     format=_ExportFormat.CSV)
                    raise CatalogException(
                        message=finished_export_details["error_info"]
                        ["message"])

                await sleep(2)

            return finished_export_details
        except Exception as e:
            raise _convert_error(e)

    async def export_csv(self, query: Optional[str] = None) -> CSVExport:
        """
        Async function which returns the URL which can be used to stream a
        CSV-formatted copy of the connected data, optionally filtered by
        the supplied SQL-like NiQL query. Note that most standard SQL
        keywords are supported, but keywords which modify underlying data
        (e.g. ``INSERT``, ``UPDATE``, ``DELETE``) are not.

        To refer to the current dataset in the query, include ``{this}``
        in the query, such as: ``"SELECT * FROM {this}"``.

        Unlike ``ConnectedDataset.query()``, there is no limit on exported rows,
        other than any imposed by the underlying warehouse.

        Parameters
        ----------
        query : Optional[str]
            A NiQL query used to filter or reshape the data before exporting

        Returns
        -------
        CSVExport
            An :class:`.CSVExport` object containing a signed download URL which can be used to
            fetch the exported data. It can be downloaded in its entirety, or streamed in chunks.
            This :class:`.CSVExport` object improves the usability of the CSV data when employing ``pandas``,
            including a configuration for ``read_csv`` which can be passed via ``**export``
            as follows: ``df = pd.read_csv(export.url, **export)``, ensuring that the
            resultant ``DataFrame`` has the correct schema for all fields (including dates).
            Note: Is is recommended that `export_parquet` be employed for use with ``pandas`` when
            supported by the underlying warehouse.

        Raises
        ------
        CatalogPermissionDeniedException
            If the caller is not allowed to export data
        CatalogInvalidArgumentException
            If the given query is invalid
        CatalogException
            If call to the :class:`.Catalog` server fails, or the export process itself fails
        """
        q = "SELECT * FROM {this}" if query is None else query
        q = q.format(this=self.id)
        schema_q = "SELECT * FROM ({q}) LIMIT 0".format(q=q)

        # run the export and query in parellel, using the query to fetch the schema
        [query_res, finished_export_details
         ] = await gather(self.query(schema_q),
                          self._do_export(q, format=_ExportFormat.CSV_GZIP))

        export_format = _ExportFormat.CSV_GZIP if finished_export_details[
                                                      "export"]["format"] == "csv.gzip" else _ExportFormat.CSV

        return CSVExport._from_export_details(query_res,
                                              finished_export_details,
                                              export_format)

    async def export_parquet(self,
                             query: Optional[str] = None) -> ParquetExport:
        """
        Async function which returns the URL which can be used to stream a
        Parquet-formatted copy of the connected data, optionally filtered by
        the supplied SQL-like NiQL query. Note that most standard SQL
        keywords are supported, but keywords which modify underlying data
        (e.g. ``INSERT``, ``UPDATE``, ``DELETE``) are not.

        To refer to the current dataset in the query, include ``{this}``
        in the query, such as: ``"SELECT * FROM {this}"``.

        Unlike ``ConnectedDataset.query()``, there is no limit on exported rows,
        other than any imposed by the underlying warehouse.

        Note: Parquet export is not (yet) supported for all underlying warehouse types, but this
        export method should be preferred when interfacing with ``pandas`` whenever possible.

        Parameters
        ----------
        query : Optional[str]
            A NiQL query used to filter or reshape the data before exporting

        Returns
        -------
        ParquetExport
            An :class:`.ParquetExport` object containing a signed download URL which can be used to
            fetch the exported data. It can be downloaded in its entirety, or streamed in chunks.
            This :class:`.ParquetExport` object can be directly employed by ``pandas``
            as follows: ``df = pd.read_parquet(export.url)``. Note that ``pandas`` requires
            ``pyarrow`` OR ``fastparquet`` in order to ``read_parquet``.
            Note: Is is recommended that `export_parquet` be employed for use with pandas when
            supported by the underlying warehouse.

        Raises
        ------
        CatalogPermissionDeniedException
            If the caller is not allowed to export data
        CatalogInvalidArgumentException
            If the given query is invalid, or if Parquet export is not available for this warehouse type
        CatalogException
            If call to the :class:`.Catalog` server fails, or the export process itself fails
        """
        q = "SELECT * FROM {this}" if query is None else query
        q = q.format(this=self.id)
        finished_export_details = await self._do_export(
            q, format=_ExportFormat.PARQUET)
        return ParquetExport._from_export_details(finished_export_details)


def _update_fields_after_template_attachment(
    dataset: 'Dataset',
    template: 'metadata_template.MetadataTemplate') -> None:
    # steps for setting the template
    # remove all of the original template's fields from the custom fields list
    # convert the new templates fields list to MetadataField objects (instead of MetadataTemplateField objects)
    # update the values of the new template fields list
    # set the dataset metadata_template to the new template
    # set the new template
    original_fields_list = dataset._custom_fields if dataset._custom_fields is not None else []
    original_template_fields = dataset._field_template.fields if dataset._field_template is not None else None
    original_template_keys = list(
        map(lambda f: f["key"], original_template_fields)
    ) if original_template_fields is not None else []

    # remove the old template fields from the custom fields list
    filtered_fields_list = list(
        filter(lambda f: f["key"] not in original_template_keys,
               original_fields_list))
    # set the new field template, so that it will be saved when dataset.save() is called
    dataset._field_template = template

    # map the new field template fields to normal MetadataField's
    template_fields = list(
        map(lambda f: metadata_field._convert_template_field(f),
            dataset._field_template.fields)
    ) if dataset._field_template.fields is not None else []

    # update the template_fields with any pre-existing field values
    for field in template_fields:
        pre_existing = next((f for f in filtered_fields_list
                             if f.key == field.key and type(f) == type(field)),
                            None)

        if pre_existing is not None:
            # update the template field value if it doesn't have one
            if field.value is None:
                field.value = pre_existing.value
            # remove the pre-existing field from the custom fields list
            filtered_fields_list = [
                f for f in filtered_fields_list if f.key != field.key
            ]

    # add the new templates fields to the custom fields list
    dataset._custom_fields = template_fields + filtered_fields_list