# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
from typing import (
Any,
Tuple,
)
from pyarrow.interchange.column import (
DtypeKind,
ColumnBuffers,
ColumnNullType,
)
import pyarrow as pa
import re
import pyarrow.compute as pc
from pyarrow.interchange.column import Dtype
# A typing protocol could be added later to let Mypy validate code using
# `from_dataframe` better.
DataFrameObject = Any
ColumnObject = Any
BufferObject = Any
_PYARROW_DTYPES: dict[DtypeKind, dict[int, Any]] = {
DtypeKind.INT: {8: pa.int8(),
16: pa.int16(),
32: pa.int32(),
64: pa.int64()},
DtypeKind.UINT: {8: pa.uint8(),
16: pa.uint16(),
32: pa.uint32(),
64: pa.uint64()},
DtypeKind.FLOAT: {16: pa.float16(),
32: pa.float32(),
64: pa.float64()},
DtypeKind.BOOL: {1: pa.bool_(),
8: pa.uint8()},
DtypeKind.STRING: {8: pa.string()},
}
def from_dataframe(df: DataFrameObject, allow_copy=True) -> pa.Table:
"""
Build a ``pa.Table`` from any DataFrame supporting the interchange protocol.
Parameters
----------
df : DataFrameObject
Object supporting the interchange protocol, i.e. `__dataframe__`
method.
allow_copy : bool, default: True
Whether to allow copying the memory to perform the conversion
(if false then zero-copy approach is requested).
Returns
-------
pa.Table
Examples
--------
>>> import pyarrow
>>> from pyarrow.interchange import from_dataframe
Convert a pandas dataframe to a pyarrow table:
>>> import pandas as pd
>>> df = pd.DataFrame({
... "n_attendees": [100, 10, 1],
... "country": ["Italy", "Spain", "Slovenia"],
... })
>>> df
n_attendees country
0 100 Italy
1 10 Spain
2 1 Slovenia
>>> from_dataframe(df)
pyarrow.Table
n_attendees: int64
country: large_string
----
n_attendees: [[100,10,1]]
country: [["Italy","Spain","Slovenia"]]
"""
if isinstance(df, pa.Table):
return df
elif isinstance(df, pa.RecordBatch):
return pa.Table.from_batches([df])
if not hasattr(df, "__dataframe__"):
raise ValueError("`df` does not support __dataframe__")
return _from_dataframe(df.__dataframe__(allow_copy=allow_copy),
allow_copy=allow_copy)
def _from_dataframe(df: DataFrameObject, allow_copy=True):
"""
Build a ``pa.Table`` from the DataFrame interchange object.
Parameters
----------
df : DataFrameObject
Object supporting the interchange protocol, i.e. `__dataframe__`
method.
allow_copy : bool, default: True
Whether to allow copying the memory to perform the conversion
(if false then zero-copy approach is requested).
Returns
-------
pa.Table
"""
batches = []
for chunk in df.get_chunks():
batch = protocol_df_chunk_to_pyarrow(chunk, allow_copy)
batches.append(batch)
if not batches:
batch = protocol_df_chunk_to_pyarrow(df)
batches.append(batch)
return pa.Table.from_batches(batches)
def protocol_df_chunk_to_pyarrow(
df: DataFrameObject,
allow_copy: bool = True
) -> pa.RecordBatch:
"""
Convert interchange protocol chunk to ``pa.RecordBatch``.
Parameters
----------
df : DataFrameObject
Object supporting the interchange protocol, i.e. `__dataframe__`
method.
allow_copy : bool, default: True
Whether to allow copying the memory to perform the conversion
(if false then zero-copy approach is requested).
Returns
-------
pa.RecordBatch
"""
# We need a dict of columns here, with each column being a pa.Array
columns: dict[str, pa.Array] = {}
for name in df.column_names():
if not isinstance(name, str):
raise ValueError(f"Column {name} is not a string")
if name in columns:
raise ValueError(f"Column {name} is not unique")
col = df.get_column_by_name(name)
dtype = col.dtype[0]
if dtype in (
DtypeKind.INT,
DtypeKind.UINT,
DtypeKind.FLOAT,
DtypeKind.STRING,
DtypeKind.DATETIME,
):
columns[name] = column_to_array(col, allow_copy)
elif dtype == DtypeKind.BOOL:
columns[name] = bool_column_to_array(col, allow_copy)
elif dtype == DtypeKind.CATEGORICAL:
columns[name] = categorical_column_to_dictionary(col, allow_copy)
else:
raise NotImplementedError(f"Data type {dtype} not handled yet")
return pa.RecordBatch.from_pydict(columns)
def column_to_array(
col: ColumnObject,
allow_copy: bool = True,
) -> pa.Array:
"""
Convert a column holding one of the primitive dtypes to a PyArrow array.
A primitive type is one of: int, uint, float, bool (1 bit).
Parameters
----------
col : ColumnObject
allow_copy : bool, default: True
Whether to allow copying the memory to perform the conversion
(if false then zero-copy approach is requested).
Returns
-------
pa.Array
"""
buffers = col.get_buffers()
data_type = col.dtype
data = buffers_to_array(buffers, data_type,
col.size(),
col.describe_null,
col.offset,
allow_copy)
return data
def bool_column_to_array(
col: ColumnObject,
allow_copy: bool = True,
) -> pa.Array:
"""
Convert a column holding boolean dtype to a PyArrow array.
Parameters
----------
col : ColumnObject
allow_copy : bool, default: True
Whether to allow copying the memory to perform the conversion
(if false then zero-copy approach is requested).
Returns
-------
pa.Array
"""
buffers = col.get_buffers()
size = buffers["data"][1][1]
# If booleans are byte-packed a copy to bit-packed will be made
if size == 8 and not allow_copy:
raise RuntimeError(
"Boolean column will be casted from uint8 and a copy "
"is required which is forbidden by allow_copy=False"
)
data_type = col.dtype
data = buffers_to_array(buffers, data_type,
col.size(),
col.describe_null,
col.offset)
if size == 8:
data = pc.cast(data, pa.bool_())
return data
def categorical_column_to_dictionary(
col: ColumnObject,
allow_copy: bool = True,
) -> pa.DictionaryArray:
"""
Convert a column holding categorical data to a pa.DictionaryArray.
Parameters
----------
col : ColumnObject
allow_copy : bool, default: True
Whether to allow copying the memory to perform the conversion
(if false then zero-copy approach is requested).
Returns
-------
pa.DictionaryArray
"""
if not allow_copy:
raise RuntimeError(
"Categorical column will be casted from uint8 and a copy "
"is required which is forbidden by allow_copy=False"
)
categorical = col.describe_categorical
if not categorical["is_dictionary"]:
raise NotImplementedError(
"Non-dictionary categoricals not supported yet")
# We need to first convert the dictionary column
cat_column = categorical["categories"]
dictionary = column_to_array(cat_column)
# Then we need to convert the indices
# Here we need to use the buffer data type!
buffers = col.get_buffers()
_, data_type = buffers["data"]
indices = buffers_to_array(buffers, data_type,
col.size(),
col.describe_null,
col.offset)
# Constructing a pa.DictionaryArray
dict_array = pa.DictionaryArray.from_arrays(indices, dictionary)
return dict_array
def parse_datetime_format_str(format_str):
"""Parse datetime `format_str` to interpret the `data`."""
# timestamp 'ts{unit}:tz'
timestamp_meta = re.match(r"ts([smun]):(.*)", format_str)
if timestamp_meta:
unit, tz = timestamp_meta.group(1), timestamp_meta.group(2)
if unit != "s":
# the format string describes only a first letter of the unit, so
# add one extra letter to convert the unit to numpy-style:
# 'm' -> 'ms', 'u' -> 'us', 'n' -> 'ns'
unit += "s"
return unit, tz
raise NotImplementedError(f"DateTime kind is not supported: {format_str}")
def map_date_type(data_type):
"""Map column date type to pyarrow date type. """
kind, bit_width, f_string, _ = data_type
if kind == DtypeKind.DATETIME:
unit, tz = parse_datetime_format_str(f_string)
return pa.timestamp(unit, tz=tz)
else:
pa_dtype = _PYARROW_DTYPES.get(kind, {}).get(bit_width, None)
# Error if dtype is not supported
if pa_dtype:
return pa_dtype
else:
raise NotImplementedError(
f"Conversion for {data_type} is not yet supported.")
def buffers_to_array(
Loading ...