Repository URL to install this package:
Version:
1.10.2 ▾
|
# -*- coding: utf-8 -*-
# MinIO Python Library for Amazon S3 Compatible Cloud Storage, (C)
# 2020 MinIO, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Request/response of SelectObjectContent API."""
from __future__ import absolute_import
from abc import ABCMeta
from binascii import crc32
from io import BytesIO
from xml.etree import ElementTree as ET
from .error import MinioException
from .xml import Element, SubElement, findtext
COMPRESSION_TYPE_NONE = "NONE"
COMPRESSION_TYPE_GZIP = "GZIP"
COMPRESSION_TYPE_BZIP2 = "BZIP2"
FILE_HEADER_INFO_USE = "USE"
FILE_HEADER_INFO_IGNORE = "IGNORE"
FILE_HEADER_INFO_NONE = "NONE"
JSON_TYPE_DOCUMENT = "DOCUMENT"
JSON_TYPE_LINES = "LINES"
QUOTE_FIELDS_ALWAYS = "ALWAYS"
QUOTE_FIELDS_ASNEEDED = "ASNEEDED"
class InputSerialization:
"""Input serialization."""
__metaclass__ = ABCMeta
def __init__(self, compression_type):
if (
compression_type is not None and
compression_type not in [
COMPRESSION_TYPE_NONE,
COMPRESSION_TYPE_GZIP,
COMPRESSION_TYPE_BZIP2,
]
):
raise ValueError(
f"compression type must be {COMPRESSION_TYPE_NONE}, "
f"{COMPRESSION_TYPE_GZIP} or {COMPRESSION_TYPE_BZIP2}"
)
self._compression_type = compression_type
def toxml(self, element):
"""Convert to XML."""
if self._compression_type is not None:
SubElement(element, "CompressionType", self._compression_type)
return element
class CSVInputSerialization(InputSerialization):
"""CSV input serialization."""
def __init__(self, compression_type=None,
allow_quoted_record_delimiter=None, comments=None,
field_delimiter=None, file_header_info=None,
quote_character=None, quote_escape_character=None,
record_delimiter=None):
super().__init__(compression_type)
self._allow_quoted_record_delimiter = allow_quoted_record_delimiter
self._comments = comments
self._field_delimiter = field_delimiter
if (
file_header_info is not None and
file_header_info not in [
FILE_HEADER_INFO_USE,
FILE_HEADER_INFO_IGNORE,
FILE_HEADER_INFO_NONE,
]
):
raise ValueError(
f"file header info must be {FILE_HEADER_INFO_USE}, "
f"{FILE_HEADER_INFO_IGNORE} or {FILE_HEADER_INFO_NONE}"
)
self._file_header_info = file_header_info
self._quote_character = quote_character
self._quote_escape_character = quote_escape_character
self._record_delimiter = record_delimiter
def toxml(self, element):
"""Convert to XML."""
super().toxml(element)
element = SubElement(element, "CSV")
if self._allow_quoted_record_delimiter is not None:
SubElement(
element,
"AllowQuotedRecordDelimiter",
self._allow_quoted_record_delimiter,
)
if self._comments is not None:
SubElement(element, "Comments", self._comments)
if self._field_delimiter is not None:
SubElement(element, "FieldDelimiter", self._field_delimiter)
if self._file_header_info is not None:
SubElement(element, "FileHeaderInfo", self._file_header_info)
if self._quote_character is not None:
SubElement(element, "QuoteCharacter", self._quote_character)
if self._quote_escape_character is not None:
SubElement(
element,
"QuoteEscapeCharacter",
self._quote_escape_character,
)
if self._record_delimiter is not None:
SubElement(element, "RecordDelimiter", self._record_delimiter)
class JSONInputSerialization(InputSerialization):
"""JSON input serialization."""
def __init__(self, compression_type=None, json_type=None):
super().__init__(compression_type)
if (
json_type is not None and
json_type not in [JSON_TYPE_DOCUMENT, JSON_TYPE_LINES]
):
raise ValueError(
f"json type must be {JSON_TYPE_DOCUMENT} or {JSON_TYPE_LINES}"
)
self._json_type = json_type
def toxml(self, element):
"""Convert to XML."""
super().toxml(element)
element = SubElement(element, "JSON")
if self._json_type is not None:
SubElement(element, "Type", self._json_type)
class ParquetInputSerialization(InputSerialization):
"""Parquet input serialization."""
def __init__(self):
super().__init__(None)
def toxml(self, element):
"""Convert to XML."""
super().toxml(element)
return SubElement(element, "Parquet")
class CSVOutputSerialization:
"""CSV output serialization."""
def __init__(self, field_delimiter=None, quote_character=None,
quote_escape_character=None, quote_fields=None,
record_delimiter=None):
self._field_delimiter = field_delimiter
self._quote_character = quote_character
self._quote_escape_character = quote_escape_character
if (
quote_fields is not None and
quote_fields not in [
QUOTE_FIELDS_ALWAYS, QUOTE_FIELDS_ASNEEDED,
]
):
raise ValueError(
f"quote fields must be {QUOTE_FIELDS_ALWAYS} or "
f"{QUOTE_FIELDS_ASNEEDED}"
)
self._quote_fields = quote_fields
self._record_delimiter = record_delimiter
def toxml(self, element):
"""Convert to XML."""
element = SubElement(element, "CSV")
if self._field_delimiter is not None:
SubElement(element, "FieldDelimiter", self._field_delimiter)
if self._quote_character is not None:
SubElement(element, "QuoteCharacter", self._quote_character)
if self._quote_escape_character is not None:
SubElement(
element,
"QuoteEscapeCharacter",
self._quote_escape_character,
)
if self._quote_fields is not None:
SubElement(element, "QuoteFields", self._quote_fields)
if self._record_delimiter is not None:
SubElement(element, "RecordDelimiter", self._record_delimiter)
class JSONOutputSerialization:
"""JSON output serialization."""
def __init__(self, record_delimiter=None):
self._record_delimiter = record_delimiter
def toxml(self, element):
"""Convert to XML."""
element = SubElement(element, "JSON")
if self._record_delimiter is not None:
SubElement(element, "RecordDelimiter", self._record_delimiter)
class SelectRequest:
"""Select object content request."""
def __init__(self, expression, input_serialization, output_serialization,
request_progress=False, scan_start_range=None,
scan_end_range=None):
self._expression = expression
if not isinstance(
input_serialization,
(
CSVInputSerialization,
JSONInputSerialization,
ParquetInputSerialization,
),
):
raise ValueError(
"input serialization must be CSVInputSerialization, "
"JSONInputSerialization or ParquetInputSerialization type",
)
self._input_serialization = input_serialization
if not isinstance(
output_serialization,
(CSVOutputSerialization, JSONOutputSerialization),
):
raise ValueError(
"output serialization must be CSVOutputSerialization or "
"JSONOutputSerialization type",
)
self._output_serialization = output_serialization
self._request_progress = request_progress
self._scan_start_range = scan_start_range
self._scan_end_range = scan_end_range
def toxml(self, element):
"""Convert to XML."""
element = Element("SelectObjectContentRequest")
SubElement(element, "Expression", self._expression)
SubElement(element, "ExpressionType", "SQL")
self._input_serialization.toxml(
SubElement(element, "InputSerialization"),
)
self._output_serialization.toxml(
SubElement(element, "OutputSerialization"),
)
if self._request_progress:
SubElement(
SubElement(element, "RequestProgress"), "Enabled", "true",
)
if self._scan_start_range or self._scan_end_range:
tag = SubElement(element, "ScanRange")
if self._scan_start_range:
SubElement(tag, "Start", self._scan_start_range)
if self._scan_end_range:
SubElement(tag, "End", self._scan_end_range)
return element
def _read(reader, size):
"""Wrapper to RawIOBase.read() to error out on short reads."""
data = reader.read(size)
if len(data) != size:
raise IOError("insufficient data")
return data
def _int(data):
"""Convert byte data to big-endian int."""
return int.from_bytes(data, byteorder="big")
def _crc32(data):
"""Wrapper to binascii.crc32()."""
return crc32(data) & 0xffffffff
def _decode_header(data):
"""Decode header data."""
reader = BytesIO(data)
headers = {}
while True:
length = reader.read(1)
if not length:
break
name = _read(reader, _int(length))
if _int(_read(reader, 1)) != 7:
raise IOError("header value type is not 7")
value = _read(reader, _int(_read(reader, 2)))
headers[name.decode()] = value.decode()
return headers
class Stats:
"""Progress/Stats information."""
def __init__(self, data):
element = ET.fromstring(data.decode())
self._bytes_scanned = findtext(element, "BytesScanned")
self._bytes_processed = findtext(element, "BytesProcessed")
self._bytes_returned = findtext(element, "BytesReturned")
@property
def bytes_scanned(self):
"""Get bytes scanned."""
return self._bytes_scanned
@property
def bytes_processed(self):
"""Get bytes processed."""
return self._bytes_processed
@property
def bytes_returned(self):
"""Get bytes returned."""
return self._bytes_returned
class SelectObjectReader:
"""
BufferedIOBase compatible reader represents response data of
Minio.select_object_content() API.
"""
def __init__(self, response):
self._response = response
self._stats = None
self._payload = None
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, exc_traceback):
return self.close()
def readable(self): # pylint: disable=no-self-use
"""Return this is readable."""
return True
def writeable(self): # pylint: disable=no-self-use
"""Return this is not writeable."""
return False
def close(self):
"""Close response and release network resources."""
self._response.close()
self._response.release_conn()
def stats(self):
"""Get stats information."""
return self._stats
def _read(self):
"""Read and decode response."""
if self._response.isclosed():
return 0
prelude = _read(self._response, 8)
prelude_crc = _read(self._response, 4)
if _crc32(prelude) != _int(prelude_crc):
raise IOError(
f"prelude CRC mismatch; expected: {_crc32(prelude)}, "
f"got: {_int(prelude_crc)}"
)
total_length = _int(prelude[:4])
data = _read(self._response, total_length - 8 - 4 - 4)
message_crc = _int(_read(self._response, 4))
if _crc32(prelude + prelude_crc + data) != message_crc:
raise IOError(
f"message CRC mismatch; "
f"expected: {_crc32(prelude + prelude_crc + data)}, "
f"got: {message_crc}"
)
header_length = _int(prelude[4:])
headers = _decode_header(data[:header_length])
if headers.get(":message-type") == "error":
raise MinioException(
f"{headers.get(':error-code')}: {headers.get(':error-message')}"
)
if headers.get(":event-type") == "End":
return 0
payload_length = total_length - header_length - 16
if headers.get(":event-type") == "Cont" or payload_length < 1:
return self._read()
payload = data[header_length:header_length+payload_length]
if headers.get(":event-type") in ["Progress", "Stats"]:
self._stats = Stats(payload)
return self._read()
if headers.get(":event-type") == "Records":
self._payload = payload
return len(payload)
raise MinioException(
f"unknown event-type {headers.get(':event-type')}",
)
def stream(self, num_bytes=32*1024):
"""
Stream extracted payload from response data. Upon completion, caller
should call self.close() to release network resources.
"""
while self._read() > 0:
while self._payload:
result = self._payload
if num_bytes < len(self._payload):
result = self._payload[:num_bytes]
self._payload = self._payload[len(result):]
yield result