Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
# -*- coding: utf-8 -*-
# MinIO Python Library for Amazon S3 Compatible Cloud Storage, (C)
# 2020 MinIO, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Request/response of SelectObjectContent API."""

from __future__ import absolute_import

from abc import ABCMeta
from binascii import crc32
from io import BytesIO
from xml.etree import ElementTree as ET

from .error import MinioException
from .xml import Element, SubElement, findtext

COMPRESSION_TYPE_NONE = "NONE"
COMPRESSION_TYPE_GZIP = "GZIP"
COMPRESSION_TYPE_BZIP2 = "BZIP2"

FILE_HEADER_INFO_USE = "USE"
FILE_HEADER_INFO_IGNORE = "IGNORE"
FILE_HEADER_INFO_NONE = "NONE"

JSON_TYPE_DOCUMENT = "DOCUMENT"
JSON_TYPE_LINES = "LINES"

QUOTE_FIELDS_ALWAYS = "ALWAYS"
QUOTE_FIELDS_ASNEEDED = "ASNEEDED"


class InputSerialization:
    """Input serialization."""

    __metaclass__ = ABCMeta

    def __init__(self, compression_type):
        if (
                compression_type is not None and
                compression_type not in [
                    COMPRESSION_TYPE_NONE,
                    COMPRESSION_TYPE_GZIP,
                    COMPRESSION_TYPE_BZIP2,
                ]
        ):
            raise ValueError(
                f"compression type must be {COMPRESSION_TYPE_NONE}, "
                f"{COMPRESSION_TYPE_GZIP} or {COMPRESSION_TYPE_BZIP2}"
            )
        self._compression_type = compression_type

    def toxml(self, element):
        """Convert to XML."""
        if self._compression_type is not None:
            SubElement(element, "CompressionType", self._compression_type)
        return element


class CSVInputSerialization(InputSerialization):
    """CSV input serialization."""

    def __init__(self, compression_type=None,
                 allow_quoted_record_delimiter=None, comments=None,
                 field_delimiter=None, file_header_info=None,
                 quote_character=None, quote_escape_character=None,
                 record_delimiter=None):
        super().__init__(compression_type)
        self._allow_quoted_record_delimiter = allow_quoted_record_delimiter
        self._comments = comments
        self._field_delimiter = field_delimiter
        if (
                file_header_info is not None and
                file_header_info not in [
                    FILE_HEADER_INFO_USE,
                    FILE_HEADER_INFO_IGNORE,
                    FILE_HEADER_INFO_NONE,
                ]
        ):
            raise ValueError(
                f"file header info must be {FILE_HEADER_INFO_USE}, "
                f"{FILE_HEADER_INFO_IGNORE} or {FILE_HEADER_INFO_NONE}"
            )
        self._file_header_info = file_header_info
        self._quote_character = quote_character
        self._quote_escape_character = quote_escape_character
        self._record_delimiter = record_delimiter

    def toxml(self, element):
        """Convert to XML."""
        super().toxml(element)
        element = SubElement(element, "CSV")
        if self._allow_quoted_record_delimiter is not None:
            SubElement(
                element,
                "AllowQuotedRecordDelimiter",
                self._allow_quoted_record_delimiter,
            )
        if self._comments is not None:
            SubElement(element, "Comments", self._comments)
        if self._field_delimiter is not None:
            SubElement(element, "FieldDelimiter", self._field_delimiter)
        if self._file_header_info is not None:
            SubElement(element, "FileHeaderInfo", self._file_header_info)
        if self._quote_character is not None:
            SubElement(element, "QuoteCharacter", self._quote_character)
        if self._quote_escape_character is not None:
            SubElement(
                element,
                "QuoteEscapeCharacter",
                self._quote_escape_character,
            )
        if self._record_delimiter is not None:
            SubElement(element, "RecordDelimiter", self._record_delimiter)


class JSONInputSerialization(InputSerialization):
    """JSON input serialization."""

    def __init__(self, compression_type=None, json_type=None):
        super().__init__(compression_type)
        if (
                json_type is not None and
                json_type not in [JSON_TYPE_DOCUMENT, JSON_TYPE_LINES]
        ):
            raise ValueError(
                f"json type must be {JSON_TYPE_DOCUMENT} or {JSON_TYPE_LINES}"
            )
        self._json_type = json_type

    def toxml(self, element):
        """Convert to XML."""
        super().toxml(element)
        element = SubElement(element, "JSON")
        if self._json_type is not None:
            SubElement(element, "Type", self._json_type)


class ParquetInputSerialization(InputSerialization):
    """Parquet input serialization."""

    def __init__(self):
        super().__init__(None)

    def toxml(self, element):
        """Convert to XML."""
        super().toxml(element)
        return SubElement(element, "Parquet")


class CSVOutputSerialization:
    """CSV output serialization."""

    def __init__(self, field_delimiter=None, quote_character=None,
                 quote_escape_character=None, quote_fields=None,
                 record_delimiter=None):
        self._field_delimiter = field_delimiter
        self._quote_character = quote_character
        self._quote_escape_character = quote_escape_character
        if (
                quote_fields is not None and
                quote_fields not in [
                    QUOTE_FIELDS_ALWAYS, QUOTE_FIELDS_ASNEEDED,
                ]
        ):
            raise ValueError(
                f"quote fields must be {QUOTE_FIELDS_ALWAYS} or "
                f"{QUOTE_FIELDS_ASNEEDED}"
            )
        self._quote_fields = quote_fields
        self._record_delimiter = record_delimiter

    def toxml(self, element):
        """Convert to XML."""
        element = SubElement(element, "CSV")
        if self._field_delimiter is not None:
            SubElement(element, "FieldDelimiter", self._field_delimiter)
        if self._quote_character is not None:
            SubElement(element, "QuoteCharacter", self._quote_character)
        if self._quote_escape_character is not None:
            SubElement(
                element,
                "QuoteEscapeCharacter",
                self._quote_escape_character,
            )
        if self._quote_fields is not None:
            SubElement(element, "QuoteFields", self._quote_fields)
        if self._record_delimiter is not None:
            SubElement(element, "RecordDelimiter", self._record_delimiter)


class JSONOutputSerialization:
    """JSON output serialization."""

    def __init__(self, record_delimiter=None):
        self._record_delimiter = record_delimiter

    def toxml(self, element):
        """Convert to XML."""
        element = SubElement(element, "JSON")
        if self._record_delimiter is not None:
            SubElement(element, "RecordDelimiter", self._record_delimiter)


class SelectRequest:
    """Select object content request."""

    def __init__(self, expression, input_serialization, output_serialization,
                 request_progress=False, scan_start_range=None,
                 scan_end_range=None):
        self._expression = expression
        if not isinstance(
                input_serialization,
                (
                    CSVInputSerialization,
                    JSONInputSerialization,
                    ParquetInputSerialization,
                ),
        ):
            raise ValueError(
                "input serialization must be CSVInputSerialization, "
                "JSONInputSerialization or ParquetInputSerialization type",
            )
        self._input_serialization = input_serialization
        if not isinstance(
                output_serialization,
                (CSVOutputSerialization, JSONOutputSerialization),
        ):
            raise ValueError(
                "output serialization must be CSVOutputSerialization or "
                "JSONOutputSerialization type",
            )
        self._output_serialization = output_serialization
        self._request_progress = request_progress
        self._scan_start_range = scan_start_range
        self._scan_end_range = scan_end_range

    def toxml(self, element):
        """Convert to XML."""
        element = Element("SelectObjectContentRequest")
        SubElement(element, "Expression", self._expression)
        SubElement(element, "ExpressionType", "SQL")
        self._input_serialization.toxml(
            SubElement(element, "InputSerialization"),
        )
        self._output_serialization.toxml(
            SubElement(element, "OutputSerialization"),
        )
        if self._request_progress:
            SubElement(
                SubElement(element, "RequestProgress"), "Enabled", "true",
            )
        if self._scan_start_range or self._scan_end_range:
            tag = SubElement(element, "ScanRange")
            if self._scan_start_range:
                SubElement(tag, "Start", self._scan_start_range)
            if self._scan_end_range:
                SubElement(tag, "End", self._scan_end_range)
        return element


def _read(reader, size):
    """Wrapper to RawIOBase.read() to error out on short reads."""
    data = reader.read(size)
    if len(data) != size:
        raise IOError("insufficient data")
    return data


def _int(data):
    """Convert byte data to big-endian int."""
    return int.from_bytes(data, byteorder="big")


def _crc32(data):
    """Wrapper to binascii.crc32()."""
    return crc32(data) & 0xffffffff


def _decode_header(data):
    """Decode header data."""
    reader = BytesIO(data)
    headers = {}
    while True:
        length = reader.read(1)
        if not length:
            break
        name = _read(reader, _int(length))
        if _int(_read(reader, 1)) != 7:
            raise IOError("header value type is not 7")
        value = _read(reader, _int(_read(reader, 2)))
        headers[name.decode()] = value.decode()
    return headers


class Stats:
    """Progress/Stats information."""

    def __init__(self, data):
        element = ET.fromstring(data.decode())
        self._bytes_scanned = findtext(element, "BytesScanned")
        self._bytes_processed = findtext(element, "BytesProcessed")
        self._bytes_returned = findtext(element, "BytesReturned")

    @property
    def bytes_scanned(self):
        """Get bytes scanned."""
        return self._bytes_scanned

    @property
    def bytes_processed(self):
        """Get bytes processed."""
        return self._bytes_processed

    @property
    def bytes_returned(self):
        """Get bytes returned."""
        return self._bytes_returned


class SelectObjectReader:
    """
    BufferedIOBase compatible reader represents response data of
    Minio.select_object_content() API.
    """

    def __init__(self, response):
        self._response = response
        self._stats = None
        self._payload = None

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, exc_traceback):
        return self.close()

    def readable(self):  # pylint: disable=no-self-use
        """Return this is readable."""
        return True

    def writeable(self):  # pylint: disable=no-self-use
        """Return this is not writeable."""
        return False

    def close(self):
        """Close response and release network resources."""
        self._response.close()
        self._response.release_conn()

    def stats(self):
        """Get stats information."""
        return self._stats

    def _read(self):
        """Read and decode response."""
        if self._response.isclosed():
            return 0

        prelude = _read(self._response, 8)
        prelude_crc = _read(self._response, 4)
        if _crc32(prelude) != _int(prelude_crc):
            raise IOError(
                f"prelude CRC mismatch; expected: {_crc32(prelude)}, "
                f"got: {_int(prelude_crc)}"
            )

        total_length = _int(prelude[:4])
        data = _read(self._response, total_length - 8 - 4 - 4)
        message_crc = _int(_read(self._response, 4))
        if _crc32(prelude + prelude_crc + data) != message_crc:
            raise IOError(
                f"message CRC mismatch; "
                f"expected: {_crc32(prelude + prelude_crc + data)}, "
                f"got: {message_crc}"
            )

        header_length = _int(prelude[4:])
        headers = _decode_header(data[:header_length])

        if headers.get(":message-type") == "error":
            raise MinioException(
                f"{headers.get(':error-code')}: {headers.get(':error-message')}"
            )

        if headers.get(":event-type") == "End":
            return 0

        payload_length = total_length - header_length - 16
        if headers.get(":event-type") == "Cont" or payload_length < 1:
            return self._read()

        payload = data[header_length:header_length+payload_length]

        if headers.get(":event-type") in ["Progress", "Stats"]:
            self._stats = Stats(payload)
            return self._read()

        if headers.get(":event-type") == "Records":
            self._payload = payload
            return len(payload)

        raise MinioException(
            f"unknown event-type {headers.get(':event-type')}",
        )

    def stream(self, num_bytes=32*1024):
        """
        Stream extracted payload from response data. Upon completion, caller
        should call self.close() to release network resources.
        """
        while self._read() > 0:
            while self._payload:
                result = self._payload
                if num_bytes < len(self._payload):
                    result = self._payload[:num_bytes]
                self._payload = self._payload[len(result):]
                yield result