# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import ast
from collections.abc import Sequence
from concurrent import futures
# import threading submodule upfront to avoid partially initialized
# module bug (ARROW-11983)
import concurrent.futures.thread # noqa
from copy import deepcopy
from itertools import zip_longest
import json
import operator
import re
import warnings
import numpy as np
import pyarrow as pa
from pyarrow.lib import _pandas_api, frombytes # noqa
_logical_type_map = {}
def get_logical_type_map():
global _logical_type_map
if not _logical_type_map:
_logical_type_map.update({
pa.lib.Type_NA: 'empty',
pa.lib.Type_BOOL: 'bool',
pa.lib.Type_INT8: 'int8',
pa.lib.Type_INT16: 'int16',
pa.lib.Type_INT32: 'int32',
pa.lib.Type_INT64: 'int64',
pa.lib.Type_UINT8: 'uint8',
pa.lib.Type_UINT16: 'uint16',
pa.lib.Type_UINT32: 'uint32',
pa.lib.Type_UINT64: 'uint64',
pa.lib.Type_HALF_FLOAT: 'float16',
pa.lib.Type_FLOAT: 'float32',
pa.lib.Type_DOUBLE: 'float64',
pa.lib.Type_DATE32: 'date',
pa.lib.Type_DATE64: 'date',
pa.lib.Type_TIME32: 'time',
pa.lib.Type_TIME64: 'time',
pa.lib.Type_BINARY: 'bytes',
pa.lib.Type_FIXED_SIZE_BINARY: 'bytes',
pa.lib.Type_STRING: 'unicode',
})
return _logical_type_map
def get_logical_type(arrow_type):
logical_type_map = get_logical_type_map()
try:
return logical_type_map[arrow_type.id]
except KeyError:
if isinstance(arrow_type, pa.lib.DictionaryType):
return 'categorical'
elif isinstance(arrow_type, pa.lib.ListType):
return 'list[{}]'.format(get_logical_type(arrow_type.value_type))
elif isinstance(arrow_type, pa.lib.TimestampType):
return 'datetimetz' if arrow_type.tz is not None else 'datetime'
elif isinstance(arrow_type, pa.lib.Decimal128Type):
return 'decimal'
return 'object'
_numpy_logical_type_map = {
np.bool_: 'bool',
np.int8: 'int8',
np.int16: 'int16',
np.int32: 'int32',
np.int64: 'int64',
np.uint8: 'uint8',
np.uint16: 'uint16',
np.uint32: 'uint32',
np.uint64: 'uint64',
np.float32: 'float32',
np.float64: 'float64',
'datetime64[D]': 'date',
np.str_: 'string',
np.bytes_: 'bytes',
}
def get_logical_type_from_numpy(pandas_collection):
try:
return _numpy_logical_type_map[pandas_collection.dtype.type]
except KeyError:
if hasattr(pandas_collection.dtype, 'tz'):
return 'datetimetz'
# See https://github.com/pandas-dev/pandas/issues/24739
if str(pandas_collection.dtype) == 'datetime64[ns]':
return 'datetime64[ns]'
result = _pandas_api.infer_dtype(pandas_collection)
if result == 'string':
return 'unicode'
return result
def get_extension_dtype_info(column):
dtype = column.dtype
if str(dtype) == 'category':
cats = getattr(column, 'cat', column)
assert cats is not None
metadata = {
'num_categories': len(cats.categories),
'ordered': cats.ordered,
}
physical_dtype = str(cats.codes.dtype)
elif hasattr(dtype, 'tz'):
metadata = {'timezone': pa.lib.tzinfo_to_string(dtype.tz)}
physical_dtype = 'datetime64[ns]'
else:
metadata = None
physical_dtype = str(dtype)
return physical_dtype, metadata
def get_column_metadata(column, name, arrow_type, field_name):
"""Construct the metadata for a given column
Parameters
----------
column : pandas.Series or pandas.Index
name : str
arrow_type : pyarrow.DataType
field_name : str
Equivalent to `name` when `column` is a `Series`, otherwise if `column`
is a pandas Index then `field_name` will not be the same as `name`.
This is the name of the field in the arrow Table's schema.
Returns
-------
dict
"""
logical_type = get_logical_type(arrow_type)
string_dtype, extra_metadata = get_extension_dtype_info(column)
if logical_type == 'decimal':
extra_metadata = {
'precision': arrow_type.precision,
'scale': arrow_type.scale,
}
string_dtype = 'object'
if name is not None and not isinstance(name, str):
raise TypeError(
'Column name must be a string. Got column {} of type {}'.format(
name, type(name).__name__
)
)
assert field_name is None or isinstance(field_name, str), \
str(type(field_name))
return {
'name': name,
'field_name': 'None' if field_name is None else field_name,
'pandas_type': logical_type,
'numpy_type': string_dtype,
'metadata': extra_metadata,
}
def construct_metadata(columns_to_convert, df, column_names, index_levels,
index_descriptors, preserve_index, types):
"""Returns a dictionary containing enough metadata to reconstruct a pandas
DataFrame as an Arrow Table, including index columns.
Parameters
----------
columns_to_convert : list[pd.Series]
df : pandas.DataFrame
index_levels : List[pd.Index]
index_descriptors : List[Dict]
preserve_index : bool
types : List[pyarrow.DataType]
Returns
-------
dict
"""
num_serialized_index_levels = len([descr for descr in index_descriptors
if not isinstance(descr, dict)])
# Use ntypes instead of Python shorthand notation [:-len(x)] as [:-0]
# behaves differently to what we want.
ntypes = len(types)
df_types = types[:ntypes - num_serialized_index_levels]
index_types = types[ntypes - num_serialized_index_levels:]
column_metadata = []
for col, sanitized_name, arrow_type in zip(columns_to_convert,
column_names, df_types):
metadata = get_column_metadata(col, name=sanitized_name,
arrow_type=arrow_type,
field_name=sanitized_name)
column_metadata.append(metadata)
index_column_metadata = []
if preserve_index is not False:
non_str_index_names = []
for level, arrow_type, descriptor in zip(index_levels, index_types,
index_descriptors):
if isinstance(descriptor, dict):
# The index is represented in a non-serialized fashion,
# e.g. RangeIndex
continue
if level.name is not None and not isinstance(level.name, str):
non_str_index_names.append(level.name)
metadata = get_column_metadata(
level,
name=_column_name_to_strings(level.name),
arrow_type=arrow_type,
field_name=descriptor,
)
index_column_metadata.append(metadata)
if len(non_str_index_names) > 0:
warnings.warn(
f"The DataFrame has non-str index name `{non_str_index_names}`"
" which will be converted to string"
" and not roundtrip correctly.",
UserWarning, stacklevel=4)
column_indexes = []
levels = getattr(df.columns, 'levels', [df.columns])
names = getattr(df.columns, 'names', [df.columns.name])
for level, name in zip(levels, names):
metadata = _get_simple_index_descriptor(level, name)
column_indexes.append(metadata)
else:
index_descriptors = index_column_metadata = column_indexes = []
return {
b'pandas': json.dumps({
'index_columns': index_descriptors,
'column_indexes': column_indexes,
'columns': column_metadata + index_column_metadata,
'creator': {
'library': 'pyarrow',
'version': pa.__version__
},
'pandas_version': _pandas_api.version
}).encode('utf8')
}
def _get_simple_index_descriptor(level, name):
string_dtype, extra_metadata = get_extension_dtype_info(level)
pandas_type = get_logical_type_from_numpy(level)
if 'mixed' in pandas_type:
warnings.warn(
"The DataFrame has column names of mixed type. They will be "
"converted to strings and not roundtrip correctly.",
UserWarning, stacklevel=4)
if pandas_type == 'unicode':
assert not extra_metadata
extra_metadata = {'encoding': 'UTF-8'}
return {
'name': name,
'field_name': name,
'pandas_type': pandas_type,
'numpy_type': string_dtype,
'metadata': extra_metadata,
}
def _column_name_to_strings(name):
"""Convert a column name (or level) to either a string or a recursive
collection of strings.
Parameters
----------
name : str or tuple
Returns
-------
value : str or tuple
Examples
--------
>>> name = 'foo'
>>> _column_name_to_strings(name)
'foo'
>>> name = ('foo', 'bar')
>>> _column_name_to_strings(name)
"('foo', 'bar')"
>>> import pandas as pd
>>> name = (1, pd.Timestamp('2017-02-01 00:00:00'))
>>> _column_name_to_strings(name)
"('1', '2017-02-01 00:00:00')"
"""
if isinstance(name, str):
return name
elif isinstance(name, bytes):
# XXX: should we assume that bytes in Python 3 are UTF-8?
return name.decode('utf8')
elif isinstance(name, tuple):
return str(tuple(map(_column_name_to_strings, name)))
elif isinstance(name, Sequence):
raise TypeError("Unsupported type for MultiIndex level")
elif name is None:
return None
return str(name)
def _index_level_name(index, i, column_names):
"""Return the name of an index level or a default name if `index.name` is
None or is already a column name.
Parameters
----------
index : pandas.Index
i : int
Returns
-------
name : str
"""
if index.name is not None and index.name not in column_names:
return _column_name_to_strings(index.name)
else:
return '__index_level_{:d}__'.format(i)
Loading ...