Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
fastparquet / util.py
Size: Mime:
import ast
import os, os.path
import shutil
import pandas as pd
import pytest
import re
import tempfile
import thriftpy
import sys
import six


PY2 = six.PY2
PY3 = six.PY3
STR_TYPE = six.string_types[0]  # 'str' for Python3, 'basestring' for Python2
created_by = "fastparquet-python version 1.0.0 (build 111)"


class ParquetException(Exception):
    """Generic Exception related to unexpected data format when
     reading parquet file."""
    pass


def sep_from_open(opener):
    if opener is default_open:
        return os.sep
    else:
        return '/'


if PY2:
    def default_mkdirs(f):
        if not os.path.exists(f):
            os.makedirs(f)
else:
    def default_mkdirs(f):
        os.makedirs(f, exist_ok=True)


def default_open(f, mode='rb'):
    return open(f, mode)


def val_to_num(x):
    if x in ['NOW', 'TODAY']:
        return x
    try:
        return ast.literal_eval(x)
    except:
        pass
    try:
        return pd.to_datetime(x)
    except:
        pass
    try:
        return pd.to_timedelta(x)
    except:
        return x


@pytest.yield_fixture()
def tempdir():
    d = tempfile.mkdtemp()
    yield d
    if os.path.exists(d):
        shutil.rmtree(d, ignore_errors=True)

if PY2:
    def ensure_bytes(s):
        return s.encode('utf-8') if isinstance(s, unicode) else s
else:
    def ensure_bytes(s):
        return s.encode('utf-8') if isinstance(s, str) else s


def thrift_print(structure, offset=0):
    """
    Handy recursive text ouput for thrift structures
    """
    if not isinstance(structure, thriftpy.thrift.TPayload):
        return str(structure)
    s = str(structure.__class__) + '\n'
    for key in dir(structure):
        if key.startswith('_') or key in ['thrift_spec', 'read', 'write',
                                          'default_spec']:
            continue
        s = s + ' ' * offset + key + ': ' + thrift_print(getattr(structure, key)
                                                         , offset+2) + '\n'
    return s
thriftpy.thrift.TPayload.__str__ = thrift_print
thriftpy.thrift.TPayload.__repr__ = thrift_print


def thrift_copy(structure):
    """
    Recursively copy a thriftpy structure
    """
    base = structure.__class__()
    for key in dir(structure):
        if key.startswith('_') or key in ['thrift_spec', 'read', 'write',
                                          'default_spec']:
            continue
        val = getattr(structure, key)
        if isinstance(val, list):
            setattr(base, key, [thrift_copy(item)
                                if isinstance(item, thriftpy.thrift.TPayload)
                                else item for item in val])
        elif isinstance(val, thriftpy.thrift.TPayload):
            setattr(base, key, thrift_copy(val))
        else:
            setattr(base, key, val)
    return base


def index_like(index):
    """
    Does index look like a default range index?
    """
    return not (isinstance(index, pd.RangeIndex) and
                index._start == 0 and
                index._stop == len(index) and
                index._step == 1 and index.name is None)


def check_column_names(columns, *args):
    """Ensure that parameters listing column names have corresponding columns"""
    for arg in args:
        if isinstance(arg, (tuple, list)):
            if set(arg) - set(columns):
                raise ValueError("Column name not in list.\n"
                                 "Requested %s\n"
                                 "Allowed %s" % (arg, columns))


def byte_buffer(raw_bytes):
    return buffer(raw_bytes) if PY2 else memoryview(raw_bytes)


def metadata_from_many(file_list, verify_schema=False, open_with=default_open,
                       root=False):
    """
    Given list of parquet files, make a FileMetaData that points to them

    Parameters
    ----------
    file_list: list of paths of parquet files
    verify_schema: bool (False)
        Whether to assert that the schemas in each file are identical
    open_with: function
        Use this to open each path.
    root: str
        Top of the dataset's directory tree, for cases where it can't be
        automatically inferred.

    Returns
    -------
    basepath: the root path that other paths are relative to
    fmd: metadata thrift structure
    """
    from fastparquet import api
    sep = sep_from_open(open_with)
    if all(isinstance(pf, api.ParquetFile) for pf in file_list):
        pfs = file_list
        file_list = [pf.fn for pf in pfs]
    elif all(not isinstance(pf, api.ParquetFile) for pf in file_list):
        pfs = [api.ParquetFile(fn, open_with=open_with) for fn in file_list]
    else:
        raise ValueError("Merge requires all PaquetFile instances or none")
    basepath, file_list = analyse_paths(file_list, sep, root=root)

    if verify_schema:
        for pf in pfs[1:]:
            if pf._schema != pfs[0]._schema:
                raise ValueError('Incompatible schemas')

    fmd = thrift_copy(pfs[0].fmd)  # we inherit "created by" field
    fmd.row_groups = []

    for pf, fn in zip(pfs, file_list):
        if pf.file_scheme not in ['simple', 'empty']:
            # should remove 'empty' datasets up front? Get ignored on load
            # anyway.
            raise ValueError('Cannot merge multi-file input', fn)
        for rg in pf.row_groups:
            rg = thrift_copy(rg)
            for chunk in rg.columns:
                chunk.file_path = fn
            fmd.row_groups.append(rg)

    fmd.num_rows = sum(rg.num_rows for rg in fmd.row_groups)
    return basepath, fmd

# simple cache to avoid re compile every time
seps = {}


def ex_from_sep(sep):
    """Generate regex for category folder matching"""
    if sep not in seps:
        if sep in r'\^$.|?*+()[]':
            s = re.compile(r"([a-zA-Z_]+)=([^\{}]+)".format(sep))
        else:
            s = re.compile("([a-zA-Z_]+)=([^{}]+)".format(sep))
        seps[sep] = s
    return seps[sep]


def analyse_paths(file_list, sep=os.sep, root=False):
    """Consolidate list of file-paths into acceptable parquet relative paths"""
    path_parts_list = [fn.split(sep) for fn in file_list]
    if len({len(path_parts) for path_parts in path_parts_list}) > 1:
        raise ValueError('Mixed nesting in merge files')
    s = ex_from_sep(sep)
    if root is False:
        basepath = path_parts_list[0][:-1]
        for i, path_parts in enumerate(path_parts_list):
            j = len(path_parts) - 1
            for k, (base_part, path_part) in enumerate(zip(basepath, path_parts)):
                if base_part != path_part:
                    j = k
                    break
            basepath = basepath[:j]
        l = len(basepath)

    else:
        basepath = root.split(sep)
        l = len(basepath)
        assert all(p[:l] == basepath for p in path_parts_list
                   ), "All paths must begin with the given root"
    l = len(basepath)
    if len({tuple([p.split('=')[0] for p in parts[l:-1]])
            for parts in path_parts_list}) > 1:
        raise ValueError('Partitioning directories do not agree')
    out_list = []
    for path_parts in path_parts_list:
        for path_part in path_parts[l:-1]:
            if s.match(path_part) is None:
                raise ValueError('Malformed paths set at', sep.join(path_parts))
        out_list.append(sep.join(path_parts[l:]))

    return sep.join(basepath), out_list


def infer_dtype(column):
    try:
        return pd.api.types.infer_dtype(column)
    except AttributeError:
        return pd.lib.infer_dtype(column)


def get_column_metadata(column, name):
    """Produce pandas column metadata block"""
    # from pyarrow.pandas_compat
    # https://github.com/apache/arrow/blob/master/python/pyarrow/pandas_compat.py
    inferred_dtype = infer_dtype(column)
    dtype = column.dtype

    if str(dtype) == 'category':
        extra_metadata = {
            'num_categories': len(column.cat.categories),
            'ordered': column.cat.ordered,
        }
        dtype = column.cat.codes.dtype
    elif hasattr(dtype, 'tz'):
        extra_metadata = {'timezone': str(dtype.tz)}
    else:
        extra_metadata = None

    if not isinstance(name, six.string_types):
        raise TypeError(
            'Column name must be a string. Got column {} of type {}'.format(
                name, type(name).__name__
            )
        )

    return {
        'name': name,
        'pandas_type': {
            'string': 'bytes' if PY2 else 'unicode',
            'datetime64': (
                'datetimetz' if hasattr(dtype, 'tz')
                else 'datetime'
            ),
            'integer': str(dtype),
            'floating': str(dtype),
        }.get(inferred_dtype, inferred_dtype),
        'numpy_type': str(dtype),
        'metadata': extra_metadata,
    }