# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Dataset is currently unstable. APIs subject to change without notice."""
import pyarrow as pa
from pyarrow.util import _is_iterable, _stringify_path, _is_path_like
try:
from pyarrow._dataset import ( # noqa
CsvFileFormat,
CsvFragmentScanOptions,
JsonFileFormat,
JsonFragmentScanOptions,
Dataset,
DatasetFactory,
DirectoryPartitioning,
FeatherFileFormat,
FilenamePartitioning,
FileFormat,
FileFragment,
FileSystemDataset,
FileSystemDatasetFactory,
FileSystemFactoryOptions,
FileWriteOptions,
Fragment,
FragmentScanOptions,
HivePartitioning,
IpcFileFormat,
IpcFileWriteOptions,
InMemoryDataset,
Partitioning,
PartitioningFactory,
Scanner,
TaggedRecordBatch,
UnionDataset,
UnionDatasetFactory,
WrittenFile,
get_partition_keys,
get_partition_keys as _get_partition_keys, # keep for backwards compatibility
_filesystemdataset_write,
)
except ImportError as exc:
raise ImportError(
f"The pyarrow installation is not built with support for 'dataset' ({str(exc)})"
) from None
# keep Expression functionality exposed here for backwards compatibility
from pyarrow.compute import Expression, scalar, field # noqa
_orc_available = False
_orc_msg = (
"The pyarrow installation is not built with support for the ORC file "
"format."
)
try:
from pyarrow._dataset_orc import OrcFileFormat
_orc_available = True
except ImportError:
pass
_parquet_available = False
_parquet_msg = (
"The pyarrow installation is not built with support for the Parquet file "
"format."
)
try:
from pyarrow._dataset_parquet import ( # noqa
ParquetDatasetFactory,
ParquetFactoryOptions,
ParquetFileFormat,
ParquetFileFragment,
ParquetFileWriteOptions,
ParquetFragmentScanOptions,
ParquetReadOptions,
RowGroupInfo,
)
_parquet_available = True
except ImportError:
pass
try:
from pyarrow._dataset_parquet_encryption import ( # noqa
ParquetDecryptionConfig,
ParquetEncryptionConfig,
)
except ImportError:
pass
def __getattr__(name):
if name == "OrcFileFormat" and not _orc_available:
raise ImportError(_orc_msg)
if name == "ParquetFileFormat" and not _parquet_available:
raise ImportError(_parquet_msg)
raise AttributeError(
"module 'pyarrow.dataset' has no attribute '{0}'".format(name)
)
def partitioning(schema=None, field_names=None, flavor=None,
dictionaries=None):
"""
Specify a partitioning scheme.
The supported schemes include:
- "DirectoryPartitioning": this scheme expects one segment in the file path
for each field in the specified schema (all fields are required to be
present). For example given schema<year:int16, month:int8> the path
"/2009/11" would be parsed to ("year"_ == 2009 and "month"_ == 11).
- "HivePartitioning": a scheme for "/$key=$value/" nested directories as
found in Apache Hive. This is a multi-level, directory based partitioning
scheme. Data is partitioned by static values of a particular column in
the schema. Partition keys are represented in the form $key=$value in
directory names. Field order is ignored, as are missing or unrecognized
field names.
For example, given schema<year:int16, month:int8, day:int8>, a possible
path would be "/year=2009/month=11/day=15" (but the field order does not
need to match).
- "FilenamePartitioning": this scheme expects the partitions will have
filenames containing the field values separated by "_".
For example, given schema<year:int16, month:int8, day:int8>, a possible
partition filename "2009_11_part-0.parquet" would be parsed
to ("year"_ == 2009 and "month"_ == 11).
Parameters
----------
schema : pyarrow.Schema, default None
The schema that describes the partitions present in the file path.
If not specified, and `field_names` and/or `flavor` are specified,
the schema will be inferred from the file path (and a
PartitioningFactory is returned).
field_names : list of str, default None
A list of strings (field names). If specified, the schema's types are
inferred from the file paths (only valid for DirectoryPartitioning).
flavor : str, default None
The default is DirectoryPartitioning. Specify ``flavor="hive"`` for
a HivePartitioning, and ``flavor="filename"`` for a
FilenamePartitioning.
dictionaries : dict[str, Array]
If the type of any field of `schema` is a dictionary type, the
corresponding entry of `dictionaries` must be an array containing
every value which may be taken by the corresponding column or an
error will be raised in parsing. Alternatively, pass `infer` to have
Arrow discover the dictionary values, in which case a
PartitioningFactory is returned.
Returns
-------
Partitioning or PartitioningFactory
The partitioning scheme
Examples
--------
Specify the Schema for paths like "/2009/June":
>>> import pyarrow as pa
>>> import pyarrow.dataset as ds
>>> part = ds.partitioning(pa.schema([("year", pa.int16()),
... ("month", pa.string())]))
or let the types be inferred by only specifying the field names:
>>> part = ds.partitioning(field_names=["year", "month"])
For paths like "/2009/June", the year will be inferred as int32 while month
will be inferred as string.
Specify a Schema with dictionary encoding, providing dictionary values:
>>> part = ds.partitioning(
... pa.schema([
... ("year", pa.int16()),
... ("month", pa.dictionary(pa.int8(), pa.string()))
... ]),
... dictionaries={
... "month": pa.array(["January", "February", "March"]),
... })
Alternatively, specify a Schema with dictionary encoding, but have Arrow
infer the dictionary values:
>>> part = ds.partitioning(
... pa.schema([
... ("year", pa.int16()),
... ("month", pa.dictionary(pa.int8(), pa.string()))
... ]),
... dictionaries="infer")
Create a Hive scheme for a path like "/year=2009/month=11":
>>> part = ds.partitioning(
... pa.schema([("year", pa.int16()), ("month", pa.int8())]),
... flavor="hive")
A Hive scheme can also be discovered from the directory structure (and
types will be inferred):
>>> part = ds.partitioning(flavor="hive")
"""
if flavor is None:
# default flavor
if schema is not None:
if field_names is not None:
raise ValueError(
"Cannot specify both 'schema' and 'field_names'")
if dictionaries == 'infer':
return DirectoryPartitioning.discover(schema=schema)
return DirectoryPartitioning(schema, dictionaries)
elif field_names is not None:
if isinstance(field_names, list):
return DirectoryPartitioning.discover(field_names)
else:
raise ValueError(
"Expected list of field names, got {}".format(
type(field_names)))
else:
raise ValueError(
"For the default directory flavor, need to specify "
"a Schema or a list of field names")
if flavor == "filename":
if schema is not None:
if field_names is not None:
raise ValueError(
"Cannot specify both 'schema' and 'field_names'")
if dictionaries == 'infer':
return FilenamePartitioning.discover(schema=schema)
return FilenamePartitioning(schema, dictionaries)
elif field_names is not None:
if isinstance(field_names, list):
return FilenamePartitioning.discover(field_names)
else:
raise ValueError(
"Expected list of field names, got {}".format(
type(field_names)))
else:
raise ValueError(
"For the filename flavor, need to specify "
"a Schema or a list of field names")
elif flavor == 'hive':
if field_names is not None:
raise ValueError("Cannot specify 'field_names' for flavor 'hive'")
elif schema is not None:
if isinstance(schema, pa.Schema):
if dictionaries == 'infer':
return HivePartitioning.discover(schema=schema)
return HivePartitioning(schema, dictionaries)
else:
raise ValueError(
"Expected Schema for 'schema', got {}".format(
type(schema)))
else:
return HivePartitioning.discover()
else:
raise ValueError("Unsupported flavor")
def _ensure_partitioning(scheme):
"""
Validate input and return a Partitioning(Factory).
It passes None through if no partitioning scheme is defined.
"""
if scheme is None:
pass
elif isinstance(scheme, str):
scheme = partitioning(flavor=scheme)
elif isinstance(scheme, list):
scheme = partitioning(field_names=scheme)
elif isinstance(scheme, (Partitioning, PartitioningFactory)):
pass
else:
raise ValueError("Expected Partitioning or PartitioningFactory, got {}"
.format(type(scheme)))
return scheme
def _ensure_format(obj):
if isinstance(obj, FileFormat):
return obj
elif obj == "parquet":
if not _parquet_available:
raise ValueError(_parquet_msg)
return ParquetFileFormat()
elif obj in {"ipc", "arrow"}:
return IpcFileFormat()
elif obj == "feather":
return FeatherFileFormat()
elif obj == "csv":
return CsvFileFormat()
elif obj == "orc":
if not _orc_available:
raise ValueError(_orc_msg)
return OrcFileFormat()
elif obj == "json":
return JsonFileFormat()
else:
raise ValueError("format '{}' is not supported".format(obj))
def _ensure_multiple_sources(paths, filesystem=None):
"""
Treat a list of paths as files belonging to a single file system
If the file system is local then also validates that all paths
are referencing existing *files* otherwise any non-file paths will be
silently skipped (for example on a remote filesystem).
Parameters
----------
paths : list of path-like
Note that URIs are not allowed.
filesystem : FileSystem or str, optional
If an URI is passed, then its path component will act as a prefix for
the file paths.
Returns
-------
(FileSystem, list of str)
File system object and a list of normalized paths.
Raises
------
Loading ...