Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

arrow-nightlies / pyarrow   python

Repository URL to install this package:

/ dataset.py

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

"""Dataset is currently unstable. APIs subject to change without notice."""

import pyarrow as pa
from pyarrow.util import _is_iterable, _stringify_path, _is_path_like

try:
    from pyarrow._dataset import (  # noqa
        CsvFileFormat,
        CsvFragmentScanOptions,
        JsonFileFormat,
        JsonFragmentScanOptions,
        Dataset,
        DatasetFactory,
        DirectoryPartitioning,
        FeatherFileFormat,
        FilenamePartitioning,
        FileFormat,
        FileFragment,
        FileSystemDataset,
        FileSystemDatasetFactory,
        FileSystemFactoryOptions,
        FileWriteOptions,
        Fragment,
        FragmentScanOptions,
        HivePartitioning,
        IpcFileFormat,
        IpcFileWriteOptions,
        InMemoryDataset,
        Partitioning,
        PartitioningFactory,
        Scanner,
        TaggedRecordBatch,
        UnionDataset,
        UnionDatasetFactory,
        WrittenFile,
        get_partition_keys,
        get_partition_keys as _get_partition_keys,  # keep for backwards compatibility
        _filesystemdataset_write,
    )
except ImportError as exc:
    raise ImportError(
        f"The pyarrow installation is not built with support for 'dataset' ({str(exc)})"
    ) from None

# keep Expression functionality exposed here for backwards compatibility
from pyarrow.compute import Expression, scalar, field  # noqa


_orc_available = False
_orc_msg = (
    "The pyarrow installation is not built with support for the ORC file "
    "format."
)

try:
    from pyarrow._dataset_orc import OrcFileFormat
    _orc_available = True
except ImportError:
    pass

_parquet_available = False
_parquet_msg = (
    "The pyarrow installation is not built with support for the Parquet file "
    "format."
)

try:
    from pyarrow._dataset_parquet import (  # noqa
        ParquetDatasetFactory,
        ParquetFactoryOptions,
        ParquetFileFormat,
        ParquetFileFragment,
        ParquetFileWriteOptions,
        ParquetFragmentScanOptions,
        ParquetReadOptions,
        RowGroupInfo,
    )
    _parquet_available = True
except ImportError:
    pass


try:
    from pyarrow._dataset_parquet_encryption import (  # noqa
        ParquetDecryptionConfig,
        ParquetEncryptionConfig,
    )
except ImportError:
    pass


def __getattr__(name):
    if name == "OrcFileFormat" and not _orc_available:
        raise ImportError(_orc_msg)

    if name == "ParquetFileFormat" and not _parquet_available:
        raise ImportError(_parquet_msg)

    raise AttributeError(
        "module 'pyarrow.dataset' has no attribute '{0}'".format(name)
    )


def partitioning(schema=None, field_names=None, flavor=None,
                 dictionaries=None):
    """
    Specify a partitioning scheme.

    The supported schemes include:

    - "DirectoryPartitioning": this scheme expects one segment in the file path
      for each field in the specified schema (all fields are required to be
      present). For example given schema<year:int16, month:int8> the path
      "/2009/11" would be parsed to ("year"_ == 2009 and "month"_ == 11).
    - "HivePartitioning": a scheme for "/$key=$value/" nested directories as
      found in Apache Hive. This is a multi-level, directory based partitioning
      scheme. Data is partitioned by static values of a particular column in
      the schema. Partition keys are represented in the form $key=$value in
      directory names. Field order is ignored, as are missing or unrecognized
      field names.
      For example, given schema<year:int16, month:int8, day:int8>, a possible
      path would be "/year=2009/month=11/day=15" (but the field order does not
      need to match).
    - "FilenamePartitioning": this scheme expects the partitions will have
      filenames containing the field values separated by "_".
      For example, given schema<year:int16, month:int8, day:int8>, a possible
      partition filename "2009_11_part-0.parquet" would be parsed
      to ("year"_ == 2009 and "month"_ == 11).

    Parameters
    ----------
    schema : pyarrow.Schema, default None
        The schema that describes the partitions present in the file path.
        If not specified, and `field_names` and/or `flavor` are specified,
        the schema will be inferred from the file path (and a
        PartitioningFactory is returned).
    field_names :  list of str, default None
        A list of strings (field names). If specified, the schema's types are
        inferred from the file paths (only valid for DirectoryPartitioning).
    flavor : str, default None
        The default is DirectoryPartitioning. Specify ``flavor="hive"`` for
        a HivePartitioning, and ``flavor="filename"`` for a
        FilenamePartitioning.
    dictionaries : dict[str, Array]
        If the type of any field of `schema` is a dictionary type, the
        corresponding entry of `dictionaries` must be an array containing
        every value which may be taken by the corresponding column or an
        error will be raised in parsing. Alternatively, pass `infer` to have
        Arrow discover the dictionary values, in which case a
        PartitioningFactory is returned.

    Returns
    -------
    Partitioning or PartitioningFactory
        The partitioning scheme

    Examples
    --------

    Specify the Schema for paths like "/2009/June":

    >>> import pyarrow as pa
    >>> import pyarrow.dataset as ds
    >>> part = ds.partitioning(pa.schema([("year", pa.int16()),
    ...                                   ("month", pa.string())]))

    or let the types be inferred by only specifying the field names:

    >>> part =  ds.partitioning(field_names=["year", "month"])

    For paths like "/2009/June", the year will be inferred as int32 while month
    will be inferred as string.

    Specify a Schema with dictionary encoding, providing dictionary values:

    >>> part = ds.partitioning(
    ...     pa.schema([
    ...         ("year", pa.int16()),
    ...         ("month", pa.dictionary(pa.int8(), pa.string()))
    ...     ]),
    ...     dictionaries={
    ...         "month": pa.array(["January", "February", "March"]),
    ...     })

    Alternatively, specify a Schema with dictionary encoding, but have Arrow
    infer the dictionary values:

    >>> part = ds.partitioning(
    ...     pa.schema([
    ...         ("year", pa.int16()),
    ...         ("month", pa.dictionary(pa.int8(), pa.string()))
    ...     ]),
    ...     dictionaries="infer")

    Create a Hive scheme for a path like "/year=2009/month=11":

    >>> part = ds.partitioning(
    ...     pa.schema([("year", pa.int16()), ("month", pa.int8())]),
    ...     flavor="hive")

    A Hive scheme can also be discovered from the directory structure (and
    types will be inferred):

    >>> part = ds.partitioning(flavor="hive")
    """
    if flavor is None:
        # default flavor
        if schema is not None:
            if field_names is not None:
                raise ValueError(
                    "Cannot specify both 'schema' and 'field_names'")
            if dictionaries == 'infer':
                return DirectoryPartitioning.discover(schema=schema)
            return DirectoryPartitioning(schema, dictionaries)
        elif field_names is not None:
            if isinstance(field_names, list):
                return DirectoryPartitioning.discover(field_names)
            else:
                raise ValueError(
                    "Expected list of field names, got {}".format(
                        type(field_names)))
        else:
            raise ValueError(
                "For the default directory flavor, need to specify "
                "a Schema or a list of field names")
    if flavor == "filename":
        if schema is not None:
            if field_names is not None:
                raise ValueError(
                    "Cannot specify both 'schema' and 'field_names'")
            if dictionaries == 'infer':
                return FilenamePartitioning.discover(schema=schema)
            return FilenamePartitioning(schema, dictionaries)
        elif field_names is not None:
            if isinstance(field_names, list):
                return FilenamePartitioning.discover(field_names)
            else:
                raise ValueError(
                    "Expected list of field names, got {}".format(
                        type(field_names)))
        else:
            raise ValueError(
                "For the filename flavor, need to specify "
                "a Schema or a list of field names")
    elif flavor == 'hive':
        if field_names is not None:
            raise ValueError("Cannot specify 'field_names' for flavor 'hive'")
        elif schema is not None:
            if isinstance(schema, pa.Schema):
                if dictionaries == 'infer':
                    return HivePartitioning.discover(schema=schema)
                return HivePartitioning(schema, dictionaries)
            else:
                raise ValueError(
                    "Expected Schema for 'schema', got {}".format(
                        type(schema)))
        else:
            return HivePartitioning.discover()
    else:
        raise ValueError("Unsupported flavor")


def _ensure_partitioning(scheme):
    """
    Validate input and return a Partitioning(Factory).

    It passes None through if no partitioning scheme is defined.
    """
    if scheme is None:
        pass
    elif isinstance(scheme, str):
        scheme = partitioning(flavor=scheme)
    elif isinstance(scheme, list):
        scheme = partitioning(field_names=scheme)
    elif isinstance(scheme, (Partitioning, PartitioningFactory)):
        pass
    else:
        raise ValueError("Expected Partitioning or PartitioningFactory, got {}"
                         .format(type(scheme)))
    return scheme


def _ensure_format(obj):
    if isinstance(obj, FileFormat):
        return obj
    elif obj == "parquet":
        if not _parquet_available:
            raise ValueError(_parquet_msg)
        return ParquetFileFormat()
    elif obj in {"ipc", "arrow"}:
        return IpcFileFormat()
    elif obj == "feather":
        return FeatherFileFormat()
    elif obj == "csv":
        return CsvFileFormat()
    elif obj == "orc":
        if not _orc_available:
            raise ValueError(_orc_msg)
        return OrcFileFormat()
    elif obj == "json":
        return JsonFileFormat()
    else:
        raise ValueError("format '{}' is not supported".format(obj))


def _ensure_multiple_sources(paths, filesystem=None):
    """
    Treat a list of paths as files belonging to a single file system

    If the file system is local then also validates that all paths
    are referencing existing *files* otherwise any non-file paths will be
    silently skipped (for example on a remote filesystem).

    Parameters
    ----------
    paths : list of path-like
        Note that URIs are not allowed.
    filesystem : FileSystem or str, optional
        If an URI is passed, then its path component will act as a prefix for
        the file paths.

    Returns
    -------
    (FileSystem, list of str)
        File system object and a list of normalized paths.

    Raises
    ------
Loading ...