Repository URL to install this package:
|
Version:
0.1.1 ▾
|
fastparquet
/
core.py
|
|---|
import io
import os
import re
import struct
import numpy as np
import pandas as pd
from thriftpy.protocol.compact import TCompactProtocol
from . import encoding
from .compression import decompress_data
from .converted_types import convert, typemap
from .schema import _is_list_like, _is_map_like
from .speedups import unpack_byte_array
from .thrift_structures import parquet_thrift
from .util import val_to_num, byte_buffer, ex_from_sep
def read_thrift(file_obj, ttype):
"""Read a thrift structure from the given fo."""
pin = TCompactProtocol(file_obj, True)
page_header = ttype()
page_header.read(pin)
return page_header
def _read_page(file_obj, page_header, column_metadata):
"""Read the data page from the given file-object and convert it to raw,
uncompressed bytes (if necessary)."""
raw_bytes = file_obj.read(page_header.compressed_page_size)
raw_bytes = decompress_data(raw_bytes, column_metadata.codec)
assert len(raw_bytes) == page_header.uncompressed_page_size, \
"found {0} raw bytes (expected {1})".format(
len(raw_bytes),
page_header.uncompressed_page_size)
return raw_bytes
def read_data(fobj, coding, count, bit_width):
"""For definition and repetition levels
Reads with RLE/bitpacked hybrid, where length is given by first byte.
"""
out = np.empty(count, dtype=np.int32)
o = encoding.Numpy32(out)
if coding == parquet_thrift.Encoding.RLE:
while o.loc < count:
encoding.read_rle_bit_packed_hybrid(fobj, bit_width, o=o)
else:
raise NotImplementedError('Encoding %s' % coding)
return out
def read_def(io_obj, daph, helper, metadata):
"""
Read the definition levels from this page, if any.
"""
definition_levels = None
num_nulls = 0
if not helper.is_required(metadata.path_in_schema):
max_definition_level = helper.max_definition_level(
metadata.path_in_schema)
bit_width = encoding.width_from_max_int(max_definition_level)
if bit_width:
definition_levels = read_data(
io_obj, daph.definition_level_encoding,
daph.num_values, bit_width)[:daph.num_values]
num_nulls = daph.num_values - (definition_levels ==
max_definition_level).sum()
if num_nulls == 0:
definition_levels = None
return definition_levels, num_nulls
def read_rep(io_obj, daph, helper, metadata):
"""
Read the repetition levels from this page, if any.
"""
repetition_levels = None
if len(metadata.path_in_schema) > 1:
max_repetition_level = helper.max_repetition_level(
metadata.path_in_schema)
if max_repetition_level == 0:
repetition_levels = None
else:
bit_width = encoding.width_from_max_int(max_repetition_level)
repetition_levels = read_data(io_obj, daph.repetition_level_encoding,
daph.num_values,
bit_width)[:daph.num_values]
# if repetition_levels.max() == 0:
# repetition_levels = None
return repetition_levels
def read_data_page(f, helper, header, metadata, skip_nulls=False,
selfmade=False):
"""Read a data page: definitions, repetitions, values (in order)
Only values are guaranteed to exist, e.g., for a top-level, required
field.
"""
daph = header.data_page_header
raw_bytes = _read_page(f, header, metadata)
io_obj = encoding.Numpy8(np.frombuffer(byte_buffer(raw_bytes),
dtype=np.uint8))
repetition_levels = read_rep(io_obj, daph, helper, metadata)
if skip_nulls and not helper.is_required(metadata.path_in_schema):
num_nulls = 0
definition_levels = None
skip_definition_bytes(io_obj, daph.num_values)
else:
definition_levels, num_nulls = read_def(io_obj, daph, helper, metadata)
nval = daph.num_values - num_nulls
if daph.encoding == parquet_thrift.Encoding.PLAIN:
width = helper.schema_element(metadata.path_in_schema).type_length
values = encoding.read_plain(raw_bytes[io_obj.loc:],
metadata.type,
int(daph.num_values - num_nulls),
width=width)
elif daph.encoding in [parquet_thrift.Encoding.PLAIN_DICTIONARY,
parquet_thrift.Encoding.RLE]:
# bit_width is stored as single byte.
if daph.encoding == parquet_thrift.Encoding.RLE:
bit_width = helper.schema_element(
metadata.path_in_schema).type_length
else:
bit_width = io_obj.read_byte()
if bit_width in [8, 16, 32] and selfmade:
num = (encoding.read_unsigned_var_int(io_obj) >> 1) * 8
values = io_obj.read(num * bit_width // 8).view('int%i' % bit_width)
elif bit_width:
values = encoding.Numpy32(np.empty(daph.num_values-num_nulls+7,
dtype=np.int32))
# length is simply "all data left in this page"
encoding.read_rle_bit_packed_hybrid(
io_obj, bit_width, io_obj.len-io_obj.loc, o=values)
values = values.data[:nval]
else:
values = np.zeros(nval, dtype=np.int8)
else:
raise NotImplementedError('Encoding %s' % daph.encoding)
return definition_levels, repetition_levels, values[:nval]
def skip_definition_bytes(io_obj, num):
io_obj.loc += 6
n = num // 64
while n:
io_obj.loc += 1
n //= 128
def read_dictionary_page(file_obj, schema_helper, page_header, column_metadata):
"""Read a page containing dictionary data.
Consumes data using the plain encoding and returns an array of values.
"""
raw_bytes = _read_page(file_obj, page_header, column_metadata)
if column_metadata.type == parquet_thrift.Type.BYTE_ARRAY:
values = unpack_byte_array(raw_bytes,
page_header.dictionary_page_header.num_values)
else:
width = schema_helper.schema_element(
column_metadata.path_in_schema).type_length
values = encoding.read_plain(
raw_bytes, column_metadata.type,
page_header.dictionary_page_header.num_values, width)
return values
def read_col(column, schema_helper, infile, use_cat=False,
grab_dict=False, selfmade=False, assign=None, catdef=None):
"""Using the given metadata, read one column in one row-group.
Parameters
----------
column: thrift structure
Details on the column
schema_helper: schema.SchemaHelper
Based on the schema for this parquet data
infile: open file or string
If a string, will open; if an open object, will use as-is
use_cat: bool (False)
If this column is encoded throughout with dict encoding, give back
a pandas categorical column; otherwise, decode to values
grab_dict: bool (False)
Short-cut mode to return the dictionary values only - skips the actual
data.
"""
cmd = column.meta_data
se = schema_helper.schema_element(cmd.path_in_schema)
off = min((cmd.dictionary_page_offset or cmd.data_page_offset,
cmd.data_page_offset))
infile.seek(off)
ph = read_thrift(infile, parquet_thrift.PageHeader)
dic = None
if ph.type == parquet_thrift.PageType.DICTIONARY_PAGE:
dic = np.array(read_dictionary_page(infile, schema_helper, ph, cmd))
ph = read_thrift(infile, parquet_thrift.PageHeader)
dic = convert(dic, se)
if grab_dict:
return dic
if use_cat:
catdef._categories = pd.Index(dic)
if np.iinfo(assign.dtype).max < len(dic):
raise RuntimeError('Assigned array dtype (%s) cannot accommodate '
'number of category labels (%i)' %
(assign.dtype, len(dic)))
rows = cmd.num_values
do_convert = True
if use_cat:
my_nan = -1
do_convert = False
else:
if assign.dtype.kind in ['f', 'i']:
my_nan = np.nan
elif assign.dtype.kind in ["M", 'm']:
my_nan = -9223372036854775808 # int64 version of NaT
else:
my_nan = None
num = 0
while True:
if (selfmade and hasattr(cmd, 'statistics') and
getattr(cmd.statistics, 'null_count', 1) == 0):
skip_nulls = True
else:
skip_nulls = False
defi, rep, val = read_data_page(infile, schema_helper, ph, cmd,
skip_nulls, selfmade=selfmade)
if rep is not None and assign.dtype.kind != 'O': # pragma: no cover
# this should never get called
raise ValueError('Column contains repeated value, must use object'
'type, but has assumed type: %s' % assign.dtype)
d = ph.data_page_header.encoding == parquet_thrift.Encoding.PLAIN_DICTIONARY
if use_cat and not d:
raise ValueError('Returning category type requires all chunks to'
'use dictionary encoding; column: %s',
cmd.path_in_schema)
max_defi = schema_helper.max_definition_level(cmd.path_in_schema)
if rep is not None:
null = not schema_helper.is_required(cmd.path_in_schema[0])
null_val = (se.repetition_type !=
parquet_thrift.FieldRepetitionType.REQUIRED)
num = encoding._assemble_objects(assign, defi, rep, val, dic, d,
null, null_val, max_defi)
elif defi is not None:
max_defi = schema_helper.max_definition_level(cmd.path_in_schema)
part = assign[num:num+len(defi)]
part[defi != max_defi] = my_nan
if d and not use_cat:
part[defi == max_defi] = dic[val]
elif do_convert:
part[defi == max_defi] = convert(val, se)
else:
part[defi == max_defi] = val
else:
piece = assign[num:num+len(val)]
if d and not use_cat:
piece[:] = dic[val]
elif do_convert:
piece[:] = convert(val, se)
else:
piece[:] = val
num += len(defi) if defi is not None else len(val)
if num >= rows:
break
ph = read_thrift(infile, parquet_thrift.PageHeader)
def read_row_group_file(fn, rg, columns, categories, schema_helper, cats,
open=open, selfmade=False, index=None, assign=None,
sep=os.sep):
with open(fn, mode='rb') as f:
return read_row_group(f, rg, columns, categories, schema_helper, cats,
selfmade=selfmade, index=index, assign=assign,
sep=sep)
def read_row_group_arrays(file, rg, columns, categories, schema_helper, cats,
selfmade=False, assign=None):
"""
Read a row group and return as a dict of arrays
Note that categorical columns (if appearing in the parameter categories)
will be pandas Categorical objects: the codes and the category labels
are arrays.
"""
out = assign
maps = {}
for column in rg.columns:
if (_is_list_like(schema_helper, column) or
_is_map_like(schema_helper, column)):
name = ".".join(column.meta_data.path_in_schema[:-2])
else:
name = ".".join(column.meta_data.path_in_schema)
if name not in columns:
continue
use = name in categories if categories is not None else False
read_col(column, schema_helper, file, use_cat=use,
selfmade=selfmade, assign=out[name],
catdef=out[name+'-catdef'] if use else None)
if _is_map_like(schema_helper, column):
if name not in maps:
maps[name] = out[name].copy()
else:
if column.meta_data.path_in_schema[0] == 'key':
key, value = out[name], maps[name]
else:
value, key = out[name], maps[name]
out[name][:] = [dict(zip(k, v)) if k is not None else None
for k, v in zip(key, value)]
def read_row_group(file, rg, columns, categories, schema_helper, cats,
selfmade=False, index=None, assign=None,
sep=os.sep):
"""
Access row-group in a file and read some columns into a data-frame.
"""
if assign is None:
raise RuntimeError('Going with pre-allocation!')
read_row_group_arrays(file, rg, columns, categories, schema_helper,
cats, selfmade, assign=assign)
for cat in cats:
s = ex_from_sep(sep)
partitions = s.findall(rg.columns[0].file_path)
if not partitions and sep in (rg.columns[0].file_path or ""):
partitions = [('dir%i' % i, v) for (i, v) in enumerate(
rg.columns[0].file_path.split(sep)[:-1])]
val = val_to_num([p[1] for p in partitions if p[0] == cat][0])
assign[cat][:] = cats[cat].index(val)