Repository URL to install this package:
|
Version:
2022.10.0 ▾
|
import copyreg
import math
import numpy as np
import pandas as pd
try:
import pyarrow as pa
except ImportError:
pa = None
# Pickling of pyarrow arrays is effectively broken - pickling a slice of an
# array ends up pickling the entire backing array.
#
# See https://issues.apache.org/jira/browse/ARROW-10739
#
# This comes up when using pandas `string[pyarrow]` dtypes, which are backed by
# a `pyarrow.StringArray`. To fix this, we register a *global* override for
# pickling `pandas.core.arrays.ArrowStringArray` types. We do this at the
# pandas level rather than the pyarrow level for efficiency reasons (a pandas
# ArrowStringArray may contain many small pyarrow StringArray objects).
#
# This pickling implementation manually mucks with the backing buffers in a
# fairly efficient way:
#
# - The data buffer is never copied
# - The offsets buffer is only copied if the array is sliced with a start index
# (x[start:])
# - The mask buffer is never copied
#
# This implementation works with pickle protocol 5, allowing support for true
# zero-copy sends.
#
# XXX: Once pyarrow (or pandas) has fixed this bug, we should skip registering
# with copyreg for versions that lack this issue.
def pyarrow_stringarray_to_parts(array):
"""Decompose a ``pyarrow.StringArray`` into a tuple of components.
The resulting tuple can be passed to
``pyarrow_stringarray_from_parts(*components)`` to reconstruct the
``pyarrow.StringArray``.
"""
# Access the backing buffers.
#
# - mask: None, or a bitmask of length ceil(nitems / 8). 0 bits mark NULL
# elements, only present if NULL data is present, commonly None.
# - offsets: A uint32 array of nitems + 1 items marking the start/stop
# indices for the individual elements in `data`
# - data: All the utf8 string data concatenated together
#
# The structure of these buffers comes from the arrow format, documented at
# https://arrow.apache.org/docs/format/Columnar.html#physical-memory-layout.
# In particular, this is a `StringArray` (4 byte offsets), rather than a
# `LargeStringArray` (8 byte offsets).
assert pa.types.is_string(array.type)
mask, offsets, data = array.buffers()
nitems = len(array)
if not array.offset:
# No leading offset, only need to slice any unnecessary data from the
# backing buffers
offsets = offsets[: 4 * (nitems + 1)]
data_stop = int.from_bytes(offsets[-4:], "little")
data = data[:data_stop]
if mask is None:
return nitems, offsets, data
else:
mask = mask[: math.ceil(nitems / 8)]
return nitems, offsets, data, mask
# There is a leading offset. This complicates things a bit.
offsets_start = array.offset * 4
offsets_stop = offsets_start + (nitems + 1) * 4
data_start = int.from_bytes(offsets[offsets_start : offsets_start + 4], "little")
data_stop = int.from_bytes(offsets[offsets_stop - 4 : offsets_stop], "little")
data = data[data_start:data_stop]
if mask is None:
npad = 0
else:
# Since the mask is a bitmask, it can only represent even units of 8
# elements. To avoid shifting any bits, we pad the array with up to 7
# elements so the mask array can always be serialized zero copy.
npad = array.offset % 8
mask_start = array.offset // 8
mask_stop = math.ceil((array.offset + nitems) / 8)
mask = mask[mask_start:mask_stop]
# Subtract the offset of the starting element from every used offset in the
# offsets array, ensuring the first element in the serialized `offsets`
# array is always 0.
offsets_array = np.frombuffer(offsets, dtype="i4")
offsets_array = (
offsets_array[array.offset : array.offset + nitems + 1]
- offsets_array[array.offset]
)
# Pad the new offsets by `npad` offsets of 0 (see the `mask` comment above). We wrap
# this in a `pyarrow.py_buffer`, since this type transparently supports pickle 5,
# avoiding an extra copy inside the pickler.
offsets = pa.py_buffer(
b"\x00" * (4 * npad) + offsets_array.data if npad else offsets_array.data
)
if mask is None:
return nitems, offsets, data
else:
return nitems, offsets, data, mask, npad
def pyarrow_stringarray_from_parts(nitems, data_offsets, data, mask=None, offset=0):
"""Reconstruct a ``pyarrow.StringArray`` from the parts returned by
``pyarrow_stringarray_to_parts``."""
return pa.StringArray.from_buffers(nitems, data_offsets, data, mask, offset=offset)
def rebuild_arrowstringarray(*chunk_parts):
"""Rebuild a ``pandas.core.arrays.ArrowStringArray``"""
array = pa.chunked_array(
[pyarrow_stringarray_from_parts(*parts) for parts in chunk_parts],
type=pa.string(),
)
return pd.arrays.ArrowStringArray(array)
def reduce_arrowstringarray(x):
"""A pickle override for ``pandas.core.arrays.ArrowStringArray`` that avoids
serializing unnecessary data, while also avoiding/minimizing data copies"""
# Decompose each chunk in the backing ChunkedArray into their individual
# components for serialization. We filter out 0-length chunks, since they
# add no meaningful value to the chunked array.
chunks = tuple(
pyarrow_stringarray_to_parts(chunk)
for chunk in x._data.chunks
if len(chunk) > 0
)
return (rebuild_arrowstringarray, chunks)
if hasattr(pd.arrays, "ArrowStringArray") and pa is not None:
copyreg.dispatch_table[pd.arrays.ArrowStringArray] = reduce_arrowstringarray