Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

arrow-nightlies / pyarrow   python

Repository URL to install this package:

Version: 19.0.0.dev70 

/ tests / interchange / test_interchange_spec.py

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

import ctypes
import hypothesis as h
import hypothesis.strategies as st

import pytest
try:
    import numpy as np
except ImportError:
    np = None
import pyarrow as pa
import pyarrow.tests.strategies as past


all_types = st.deferred(
    lambda: (
        past.signed_integer_types |
        past.unsigned_integer_types |
        past.floating_types |
        past.bool_type |
        past.string_type |
        past.large_string_type
    )
)


# datetime is tested in test_extra.py
# dictionary is tested in test_categorical()
@pytest.mark.numpy
@h.given(past.arrays(all_types, size=3))
def test_dtypes(arr):
    table = pa.table([arr], names=["a"])
    df = table.__dataframe__()

    null_count = df.get_column(0).null_count
    assert null_count == arr.null_count
    assert isinstance(null_count, int)
    assert df.get_column(0).size() == 3
    assert df.get_column(0).offset == 0


@pytest.mark.numpy
@pytest.mark.parametrize(
    "uint, uint_bw",
    [
        (pa.uint8(), 8),
        (pa.uint16(), 16),
        (pa.uint32(), 32)
    ]
)
@pytest.mark.parametrize(
    "int, int_bw", [
        (pa.int8(), 8),
        (pa.int16(), 16),
        (pa.int32(), 32),
        (pa.int64(), 64)
    ]
)
@pytest.mark.parametrize(
    "float, float_bw, np_float_str", [
        (pa.float16(), 16, "float16"),
        (pa.float32(), 32, "float32"),
        (pa.float64(), 64, "float64")
    ]
)
@pytest.mark.parametrize("unit", ['s', 'ms', 'us', 'ns'])
@pytest.mark.parametrize("tz", ['', 'America/New_York', '+07:30', '-04:30'])
@pytest.mark.parametrize("use_batch", [False, True])
def test_mixed_dtypes(uint, uint_bw, int, int_bw,
                      float, float_bw, np_float_str, unit, tz,
                      use_batch):
    from datetime import datetime as dt
    arr = [1, 2, 3]
    dt_arr = [dt(2007, 7, 13), dt(2007, 7, 14), dt(2007, 7, 15)]
    table = pa.table(
        {
            "a": pa.array(arr, type=uint),
            "b": pa.array(arr, type=int),
            "c": pa.array(np.array(arr, dtype=np.dtype(np_float_str)), type=float),
            "d": [True, False, True],
            "e": ["a", "", "c"],
            "f": pa.array(dt_arr, type=pa.timestamp(unit, tz=tz))
        }
    )
    if use_batch:
        table = table.to_batches()[0]
    df = table.__dataframe__()
    # 0 = DtypeKind.INT, 1 = DtypeKind.UINT, 2 = DtypeKind.FLOAT,
    # 20 = DtypeKind.BOOL, 21 = DtypeKind.STRING, 22 = DtypeKind.DATETIME
    # see DtypeKind class in column.py
    columns = {"a": 1, "b": 0, "c": 2, "d": 20, "e": 21, "f": 22}

    for column, kind in columns.items():
        col = df.get_column_by_name(column)

        assert col.null_count == 0
        assert col.size() == 3
        assert col.offset == 0
        assert col.dtype[0] == kind

    assert df.get_column_by_name("a").dtype[1] == uint_bw
    assert df.get_column_by_name("b").dtype[1] == int_bw
    assert df.get_column_by_name("c").dtype[1] == float_bw


def test_na_float():
    table = pa.table({"a": [1.0, None, 2.0]})
    df = table.__dataframe__()
    col = df.get_column_by_name("a")
    assert col.null_count == 1
    assert isinstance(col.null_count, int)


def test_noncategorical():
    table = pa.table({"a": [1, 2, 3]})
    df = table.__dataframe__()
    col = df.get_column_by_name("a")
    with pytest.raises(TypeError, match=".*categorical.*"):
        col.describe_categorical


@pytest.mark.parametrize("use_batch", [False, True])
def test_categorical(use_batch):
    import pyarrow as pa
    arr = ["Mon", "Tue", "Mon", "Wed", "Mon", "Thu", "Fri", "Sat", None]
    table = pa.table(
        {"weekday": pa.array(arr).dictionary_encode()}
    )
    if use_batch:
        table = table.to_batches()[0]

    col = table.__dataframe__().get_column_by_name("weekday")
    categorical = col.describe_categorical
    assert isinstance(categorical["is_ordered"], bool)
    assert isinstance(categorical["is_dictionary"], bool)


@pytest.mark.parametrize("use_batch", [False, True])
def test_dataframe(use_batch):
    n = pa.chunked_array([[2, 2, 4], [4, 5, 100]])
    a = pa.chunked_array([["Flamingo", "Parrot", "Cow"],
                         ["Horse", "Brittle stars", "Centipede"]])
    table = pa.table([n, a], names=['n_legs', 'animals'])
    if use_batch:
        table = table.combine_chunks().to_batches()[0]
    df = table.__dataframe__()

    assert df.num_columns() == 2
    assert df.num_rows() == 6
    if use_batch:
        assert df.num_chunks() == 1
    else:
        assert df.num_chunks() == 2
    assert list(df.column_names()) == ['n_legs', 'animals']
    assert list(df.select_columns((1,)).column_names()) == list(
        df.select_columns_by_name(("animals",)).column_names()
    )


@pytest.mark.parametrize("use_batch", [False, True])
@pytest.mark.parametrize(["size", "n_chunks"], [(10, 3), (12, 3), (12, 5)])
def test_df_get_chunks(use_batch, size, n_chunks):
    table = pa.table({"x": list(range(size))})
    if use_batch:
        table = table.to_batches()[0]
    df = table.__dataframe__()
    chunks = list(df.get_chunks(n_chunks))
    assert len(chunks) == n_chunks
    assert sum(chunk.num_rows() for chunk in chunks) == size


@pytest.mark.parametrize("use_batch", [False, True])
@pytest.mark.parametrize(["size", "n_chunks"], [(10, 3), (12, 3), (12, 5)])
def test_column_get_chunks(use_batch, size, n_chunks):
    table = pa.table({"x": list(range(size))})
    if use_batch:
        table = table.to_batches()[0]
    df = table.__dataframe__()
    chunks = list(df.get_column(0).get_chunks(n_chunks))
    assert len(chunks) == n_chunks
    assert sum(chunk.size() for chunk in chunks) == size


@pytest.mark.pandas
@pytest.mark.parametrize(
    "uint", [pa.uint8(), pa.uint16(), pa.uint32()]
)
@pytest.mark.parametrize(
    "int", [pa.int8(), pa.int16(), pa.int32(), pa.int64()]
)
@pytest.mark.parametrize(
    "float, np_float_str", [
        (pa.float16(), "float16"),
        (pa.float32(), "float32"),
        (pa.float64(), "float64")
    ]
)
@pytest.mark.parametrize("use_batch", [False, True])
def test_get_columns(uint, int, float, np_float_str, use_batch):
    arr = [[1, 2, 3], [4, 5]]
    arr_float = np.array([1, 2, 3, 4, 5], dtype=np.dtype(np_float_str))
    table = pa.table(
        {
            "a": pa.chunked_array(arr, type=uint),
            "b": pa.chunked_array(arr, type=int),
            "c": pa.array(arr_float, type=float)
        }
    )
    if use_batch:
        table = table.combine_chunks().to_batches()[0]
    df = table.__dataframe__()
    for col in df.get_columns():
        assert col.size() == 5
        assert col.num_chunks() == 1

    # 0 = DtypeKind.INT, 1 = DtypeKind.UINT, 2 = DtypeKind.FLOAT,
    # see DtypeKind class in column.py
    assert df.get_column(0).dtype[0] == 1  # UINT
    assert df.get_column(1).dtype[0] == 0  # INT
    assert df.get_column(2).dtype[0] == 2  # FLOAT


@pytest.mark.parametrize(
    "int", [pa.int8(), pa.int16(), pa.int32(), pa.int64()]
)
@pytest.mark.parametrize("use_batch", [False, True])
def test_buffer(int, use_batch):
    arr = [0, 1, -1]
    table = pa.table({"a": pa.array(arr, type=int)})
    if use_batch:
        table = table.to_batches()[0]
    df = table.__dataframe__()
    col = df.get_column(0)
    buf = col.get_buffers()

    dataBuf, dataDtype = buf["data"]

    assert dataBuf.bufsize > 0
    assert dataBuf.ptr != 0
    device, _ = dataBuf.__dlpack_device__()

    # 0 = DtypeKind.INT
    # see DtypeKind class in column.py
    assert dataDtype[0] == 0

    if device == 1:  # CPU-only as we're going to directly read memory here
        bitwidth = dataDtype[1]
        ctype = {
            8: ctypes.c_int8,
            16: ctypes.c_int16,
            32: ctypes.c_int32,
            64: ctypes.c_int64,
        }[bitwidth]

        for idx, truth in enumerate(arr):
            val = ctype.from_address(dataBuf.ptr + idx * (bitwidth // 8)).value
            assert val == truth, f"Buffer at index {idx} mismatch"


@pytest.mark.parametrize(
    "indices_type, bitwidth, f_string", [
        (pa.int8(), 8, "c"),
        (pa.int16(), 16, "s"),
        (pa.int32(), 32, "i"),
        (pa.int64(), 64, "l")
    ]
)
def test_categorical_dtype(indices_type, bitwidth, f_string):
    type = pa.dictionary(indices_type, pa.string())
    arr = pa.array(["a", "b", None, "d"], type)
    table = pa.table({'a': arr})

    df = table.__dataframe__()
    col = df.get_column(0)
    assert col.dtype[0] == 23  # <DtypeKind.CATEGORICAL: 23>
    assert col.dtype[1] == bitwidth
    assert col.dtype[2] == f_string