Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
tdw-catalog / tdw_catalog / export.py
Size: Mime:
from typing import BinaryIO, Dict, List, Optional, Type
from datetime import date, datetime
from tdw_catalog.query import QueryCursor

from tdw_catalog.utils import _ExportFormat, _download_export, _parse_timestamp


class _Export(dict):
    query: str
    _format: _ExportFormat
    created_at: datetime
    started_at: datetime
    finished_at: datetime
    url: str

    def __getattr__(self, attr):
        return self[attr]


class CSVExport(_Export):
    """
    :class:`.CSVExport` represents a signed download URL pointing to
    the CSV-formatted result of a :class:`.Dataset` ``export_csv()``
    operation, alongside metadata concerning the exported data.

    This class is deliberately formatted for use with pandas'
    ``read_csv`` function, as follows:
    ``e1 = await dataset.export_csv()`` and ``df = pd.read_csv(e1.url, **e1)``

    Attributes
    __________
    query: str
        The query statement which was used to create the :class:`.Export`
    created_at: datetime
        The time this :class:`.Export` was originally created
    started_at: datetime
        The time this :class:`.Export` was started
    finished_at: datetime
        The time this :class:`.Export` was completed
    url: str
        The CSV-formatted export results can be downloaded via this signed URL
    dtype : Dict[str, Type]
        Metadata describing the schema of the exported data
    parse_dates: List[str]
        A list of columns within ``dtype`` that should be interpreted as dates
    true_values : List[str]
        A list of values to interpret as "truthy"
    false_values : List[str]
        A list of values to interpret as "falsey"
    compression : Optional[str]
        Indicates the compression format of the data, if any
    """

    @classmethod
    def _map_types(cls, type: str) -> Type:
        if type == 'boolean':
            return bool
        elif type == 'string' or type == 'geometry':
            return str
        return object

    @classmethod
    def _from_export_details(cls, query_res: QueryCursor,
                             finished_export_details: Dict[str, any],
                             format: _ExportFormat) -> 'CSVExport':
        ex = CSVExport()
        ex._format = format
        ex.created_at = _parse_timestamp(
            finished_export_details['export']['created_at'])
        ex.started_at = _parse_timestamp(
            finished_export_details['export']['started_at'])
        ex.finished_at = _parse_timestamp(
            finished_export_details['export']['finished_at'])
        ex.url = finished_export_details['file_url']

        ex['dtype'] = {
            x[0]: cls._map_types(x[1])
            for x in query_res.description
        }
        ex['parse_dates'] = [
            x[0] for x in query_res.description
            if x[1] == 'date' or x[1] == 'datetime'
        ]
        ex['true_values'] = ['t', 'T', '1']
        ex['false_values'] = ['f', 'F', '0']
        if format == _ExportFormat.CSV_GZIP:
            ex['compression'] = 'gzip'

        return ex

    async def to_str(self) -> str:
        """
        Downloads the export into an in-memory `str`

        Returns
        -------
        str
            The CSV contents of this export
        """
        return await _download_export(self.url, self._format)

    async def to_stream(self, out: BinaryIO):
        """
        Downloads the export into an on-disk file, or other stream

        Parameters
        ----------
        out : io.BinaryIO
            The stream to write CSV data to
        """
        return await _download_export(self.url, self._format, f_out=out)


class ParquetExport(_Export):

    @classmethod
    def _from_export_details(
            cls, finished_export_details: Dict[str, any]) -> 'ParquetExport':
        ex = ParquetExport()
        ex._format = _ExportFormat.PARQUET
        ex.created_at = _parse_timestamp(
            finished_export_details['export']['created_at'])
        ex.started_at = _parse_timestamp(
            finished_export_details['export']['started_at'])
        ex.finished_at = _parse_timestamp(
            finished_export_details['export']['finished_at'])
        ex.url = finished_export_details['file_url']
        return ex

    async def to_bytes(self) -> BinaryIO:
        """
        Downloads the export into an in-memory buffer

        Returns
        -------
        BinaryIO
            The Parquet contents of this export
        """
        return await _download_export(self.url, format=_ExportFormat.PARQUET)

    async def to_stream(self, out: BinaryIO):
        """
        Downloads the export into an on-disk file, or other stream

        Parameters
        ----------
        out : io.BinaryIO
            The stream to write Parquet data to
        """
        return await _download_export(self.url,
                                      format=_ExportFormat.PARQUET,
                                      f_out=out)