Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
tdw-catalog / tdw_catalog / query.py
Size: Mime:
from datetime import datetime, timezone
from itertools import count
import threading
from typing import Dict, List, Optional


class QueryCursor:
    """
    :class:`.QueryCursor` is a Python DB API-style Cursor object (PEP 249) for query
    results from the :class:`.Catalog`.

    Attributes
    __________
    arraysize: number
        Read/write attribute that controls the number of rows returned by fetchmany().
        The default value is 1 which means a single row would be fetched per call.
    description: List[tuple]
        Read-only attribute that provides the column names of the last query.
        To remain compatible with the Python DB API, it returns a 7-tuple for each
        column where the last five items of each tuple are None.
    """
    _rows: List[tuple]
    _current_row: 0
    _rowcount: count
    _array_size: int
    _description: List[tuple]

    def __init__(self, res: Dict[str, any]):

        def __format_cell(c: Dict[str, any], index: int) -> str:
            coltype = res['column_metadata'][index]['type'] if 'type' in res[
                'column_metadata'][index] else None
            jdbctype = res['column_metadata'][index][
                'jdbc_type'] if 'jdbc_type' in res['column_metadata'][
                    index] else None
            try:
                if 'is_null' in c:
                    return None
                elif coltype == 'integer' or jdbctype == 'BIGINT' or jdbctype == 'SMALLINT' or jdbctype == 'TINYINT':
                    return int(c['value'])
                elif coltype == 'decimal' or jdbctype == 'REAL' or jdbctype == 'FLOAT' or jdbctype == 'DOUBLE':
                    return float(c['value'])
                elif coltype == 'date' and jdbctype == 'DATE':
                    return datetime.strptime(
                        c['value'], "%Y-%m-%d").replace(tzinfo=timezone.utc)
                elif coltype == 'date' and jdbctype == 'TIMESTAMP':
                    # e.g. '1986-08-20 00:57:42.000'
                    return datetime.strptime(
                        c['value'],
                        "%Y-%m-%d %H:%M:%S.%f").replace(tzinfo=timezone.utc)
                else:
                    return c['value']
            except:
                return c['value']

        self._description = [(h['key'], h['type'] if 'type' in h else None,
                              None, None, None, None, None)
                             for h in res['column_metadata']]
        self._rows = list(
            map(
                lambda row: tuple(
                    [__format_cell(c, i) for i, c in enumerate(row['cells'])]),
                res['rows'] if 'rows' in res else [],
            ))
        self._rowcount = len(self._rows)
        self._current_row = count()
        self._lock = threading.Lock()
        self._array_size = 1

    @property
    def arraysize(self) -> int:
        return self._array_size

    @arraysize.setter
    def set_arraysize(self, new_size: int):
        with self._lock:
            self._array_size = new_size

    @property
    def description(self) -> List[tuple]:
        return self._description

    def fetchone(self) -> Optional[tuple]:
        """
        Returns the next row query result set as a tuple.
        Return None if no more data is available.
        """
        # lock fetchone so it is threadsafe
        with self._lock:
            next_index = next(self._current_row)
            try:
                return self._rows[next_index]
            except IndexError:
                return None

    def fetchmany(self, size=None) -> List[tuple]:
        """
        Return the next set of rows of a query result as a list.
        Return an empty list if no more rows are available.

        The number of rows to fetch per call is specified by the size parameter.
        If size is not given, arraysize determines the number of rows to be fetched.
        If fewer than size rows are available, as many rows as are available are returned.

        Note there are performance considerations involved with the size parameter.
        For optimal performance, it is usually best to use the arraysize attribute.
        If the size parameter is used, then it is best for it to retain the same value
        from one ``fetchmany()`` call to the next.
        """
        with self._lock:
            tofetch = self._array_size if size == None else size
            # only keep indices that are within bounds
            indices = filter(lambda x: x < len(self._rows),
                             [next(self._current_row) for i in range(tofetch)])
            return [self._rows[i] for i in indices]

    def fetchall(self) -> List[tuple]:
        """
        Return all (remaining) rows of a query result as a list. Return an empty list
        if no rows are available.
        """
        return self.fetchmany(len(self._rows))

    def close(self):
        """
        A no-op which is included to increase API-compability with Python DB API
        """
        pass