Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

arrow-nightlies / pyarrow   python

Repository URL to install this package:

/ tests / test_types.py

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

from collections import OrderedDict
from collections.abc import Iterator
from functools import partial
import datetime
import sys

import pytest
import hypothesis as h
import hypothesis.strategies as st
try:
    import hypothesis.extra.pytz as tzst
except ImportError:
    tzst = None
import weakref

import numpy as np
import pyarrow as pa
import pyarrow.types as types
import pyarrow.tests.strategies as past


def get_many_types():
    # returning them from a function is required because of pa.dictionary
    # type holds a pyarrow array and test_array.py::test_toal_bytes_allocated
    # checks that the default memory pool has zero allocated bytes
    return (
        pa.null(),
        pa.bool_(),
        pa.int32(),
        pa.time32('s'),
        pa.time64('us'),
        pa.date32(),
        pa.timestamp('us'),
        pa.timestamp('us', tz='UTC'),
        pa.timestamp('us', tz='Europe/Paris'),
        pa.duration('s'),
        pa.float16(),
        pa.float32(),
        pa.float64(),
        pa.decimal128(19, 4),
        pa.decimal256(76, 38),
        pa.string(),
        pa.binary(),
        pa.binary(10),
        pa.large_string(),
        pa.large_binary(),
        pa.string_view(),
        pa.binary_view(),
        pa.list_(pa.int32()),
        pa.list_(pa.int32(), 2),
        pa.large_list(pa.uint16()),
        pa.list_view(pa.int32()),
        pa.large_list_view(pa.uint16()),
        pa.map_(pa.string(), pa.int32()),
        pa.map_(pa.field('key', pa.int32(), nullable=False),
                pa.field('value', pa.int32())),
        pa.struct([pa.field('a', pa.int32()),
                   pa.field('b', pa.int8()),
                   pa.field('c', pa.string())]),
        pa.struct([pa.field('a', pa.int32(), nullable=False),
                   pa.field('b', pa.int8(), nullable=False),
                   pa.field('c', pa.string())]),
        pa.union([pa.field('a', pa.binary(10)),
                  pa.field('b', pa.string())], mode=pa.lib.UnionMode_DENSE),
        pa.union([pa.field('a', pa.binary(10)),
                  pa.field('b', pa.string())], mode=pa.lib.UnionMode_DENSE,
                 type_codes=[4, 8]),
        pa.union([pa.field('a', pa.binary(10)),
                  pa.field('b', pa.string())], mode=pa.lib.UnionMode_SPARSE),
        pa.union([pa.field('a', pa.binary(10), nullable=False),
                  pa.field('b', pa.string())], mode=pa.lib.UnionMode_SPARSE),
        pa.dictionary(pa.int32(), pa.string()),
        pa.run_end_encoded(pa.int16(), pa.int32()),
        pa.run_end_encoded(pa.int32(), pa.string()),
        pa.run_end_encoded(pa.int64(), pa.uint8())
    )


def test_is_boolean():
    assert types.is_boolean(pa.bool_())
    assert not types.is_boolean(pa.int8())


def test_is_integer():
    signed_ints = [pa.int8(), pa.int16(), pa.int32(), pa.int64()]
    unsigned_ints = [pa.uint8(), pa.uint16(), pa.uint32(), pa.uint64()]

    for t in signed_ints + unsigned_ints:
        assert types.is_integer(t)

    for t in signed_ints:
        assert types.is_signed_integer(t)
        assert not types.is_unsigned_integer(t)

    for t in unsigned_ints:
        assert types.is_unsigned_integer(t)
        assert not types.is_signed_integer(t)

    assert not types.is_integer(pa.float32())
    assert not types.is_signed_integer(pa.float32())


def test_is_floating():
    for t in [pa.float16(), pa.float32(), pa.float64()]:
        assert types.is_floating(t)

    assert not types.is_floating(pa.int32())


def test_is_null():
    assert types.is_null(pa.null())
    assert not types.is_null(pa.list_(pa.int32()))


def test_null_field_may_not_be_non_nullable():
    # ARROW-7273
    with pytest.raises(ValueError):
        pa.field('f0', pa.null(), nullable=False)


def test_is_decimal():
    decimal128 = pa.decimal128(19, 4)
    decimal256 = pa.decimal256(76, 38)
    int32 = pa.int32()

    assert types.is_decimal(decimal128)
    assert types.is_decimal(decimal256)
    assert not types.is_decimal(int32)

    assert types.is_decimal128(decimal128)
    assert not types.is_decimal128(decimal256)
    assert not types.is_decimal128(int32)

    assert not types.is_decimal256(decimal128)
    assert types.is_decimal256(decimal256)
    assert not types.is_decimal256(int32)


def test_is_list():
    a = pa.list_(pa.int32())
    b = pa.large_list(pa.int32())
    c = pa.list_(pa.int32(), 3)

    assert types.is_list(a)
    assert not types.is_large_list(a)
    assert not types.is_fixed_size_list(a)
    assert types.is_large_list(b)
    assert not types.is_list(b)
    assert not types.is_fixed_size_list(b)
    assert types.is_fixed_size_list(c)
    assert not types.is_list(c)
    assert not types.is_large_list(c)

    assert not types.is_list(pa.int32())


def test_is_list_view():
    a = pa.list_view(pa.int32())
    b = pa.large_list_view(pa.int32())

    assert types.is_list_view(a)
    assert not types.is_large_list_view(a)
    assert not types.is_list(a)
    assert types.is_large_list_view(b)
    assert not types.is_list_view(b)
    assert not types.is_large_list(b)


def test_is_map():
    m = pa.map_(pa.utf8(), pa.int32())

    assert types.is_map(m)
    assert not types.is_map(pa.int32())

    fields = pa.map_(pa.field('key_name', pa.utf8(), nullable=False),
                     pa.field('value_name', pa.int32()))
    assert types.is_map(fields)

    entries_type = pa.struct([pa.field('key', pa.int8()),
                              pa.field('value', pa.int8())])
    list_type = pa.list_(entries_type)
    assert not types.is_map(list_type)


def test_is_dictionary():
    assert types.is_dictionary(pa.dictionary(pa.int32(), pa.string()))
    assert not types.is_dictionary(pa.int32())


def test_is_nested_or_struct():
    struct_ex = pa.struct([pa.field('a', pa.int32()),
                           pa.field('b', pa.int8()),
                           pa.field('c', pa.string())])

    assert types.is_struct(struct_ex)
    assert not types.is_struct(pa.list_(pa.int32()))

    assert types.is_nested(struct_ex)
    assert types.is_nested(pa.list_(pa.int32()))
    assert types.is_nested(pa.list_(pa.int32(), 3))
    assert types.is_nested(pa.large_list(pa.int32()))
    assert types.is_nested(pa.list_view(pa.int32()))
    assert types.is_nested(pa.large_list_view(pa.int32()))
    assert not types.is_nested(pa.int32())


def test_is_union():
    for mode in [pa.lib.UnionMode_SPARSE, pa.lib.UnionMode_DENSE]:
        assert types.is_union(pa.union([pa.field('a', pa.int32()),
                                        pa.field('b', pa.int8()),
                                        pa.field('c', pa.string())],
                                       mode=mode))
    assert not types.is_union(pa.list_(pa.int32()))


def test_is_run_end_encoded():
    assert types.is_run_end_encoded(pa.run_end_encoded(pa.int32(), pa.int64()))
    assert not types.is_run_end_encoded(pa.utf8())


# TODO(wesm): is_map, once implemented


def test_is_binary_string():
    assert types.is_binary(pa.binary())
    assert not types.is_binary(pa.string())
    assert not types.is_binary(pa.large_binary())
    assert not types.is_binary(pa.large_string())

    assert types.is_string(pa.string())
    assert types.is_unicode(pa.string())
    assert not types.is_string(pa.binary())
    assert not types.is_string(pa.large_string())
    assert not types.is_string(pa.large_binary())

    assert types.is_large_binary(pa.large_binary())
    assert not types.is_large_binary(pa.large_string())
    assert not types.is_large_binary(pa.binary())
    assert not types.is_large_binary(pa.string())

    assert types.is_large_string(pa.large_string())
    assert not types.is_large_string(pa.large_binary())
    assert not types.is_large_string(pa.string())
    assert not types.is_large_string(pa.binary())

    assert types.is_fixed_size_binary(pa.binary(5))
    assert not types.is_fixed_size_binary(pa.binary())

    assert types.is_string_view(pa.string_view())
    assert not types.is_string_view(pa.string())
    assert types.is_binary_view(pa.binary_view())
    assert not types.is_binary_view(pa.binary())
    assert not types.is_binary_view(pa.string_view())


def test_is_temporal_date_time_timestamp():
    date_types = [pa.date32(), pa.date64()]
    time_types = [pa.time32('s'), pa.time64('ns')]
    timestamp_types = [pa.timestamp('ms')]
    duration_types = [pa.duration('ms')]
    interval_types = [pa.month_day_nano_interval()]

    for case in (date_types + time_types + timestamp_types + duration_types +
                 interval_types):
        assert types.is_temporal(case)

    for case in date_types:
        assert types.is_date(case)
        assert not types.is_time(case)
        assert not types.is_timestamp(case)
        assert not types.is_duration(case)
        assert not types.is_interval(case)

    for case in time_types:
        assert types.is_time(case)
        assert not types.is_date(case)
        assert not types.is_timestamp(case)
        assert not types.is_duration(case)
        assert not types.is_interval(case)

    for case in timestamp_types:
        assert types.is_timestamp(case)
        assert not types.is_date(case)
        assert not types.is_time(case)
        assert not types.is_duration(case)
        assert not types.is_interval(case)

    for case in duration_types:
        assert types.is_duration(case)
        assert not types.is_date(case)
        assert not types.is_time(case)
        assert not types.is_timestamp(case)
        assert not types.is_interval(case)

    for case in interval_types:
        assert types.is_interval(case)
        assert not types.is_date(case)
        assert not types.is_time(case)
        assert not types.is_timestamp(case)

    assert not types.is_temporal(pa.int32())


def test_is_primitive():
    assert types.is_primitive(pa.int32())
    assert not types.is_primitive(pa.list_(pa.int32()))


@pytest.mark.parametrize(('tz', 'expected'), [
    (datetime.timezone.utc, 'UTC'),
    (datetime.timezone(datetime.timedelta(hours=1, minutes=30)), '+01:30')
])
def test_tzinfo_to_string(tz, expected):
    assert pa.lib.tzinfo_to_string(tz) == expected


def test_pytz_tzinfo_to_string():
    pytz = pytest.importorskip("pytz")

    tz = [pytz.utc, pytz.timezone('Europe/Paris')]
    expected = ['UTC', 'Europe/Paris']
    assert [pa.lib.tzinfo_to_string(i) for i in tz] == expected

    # StaticTzInfo.tzname returns with '-09' so we need to infer the timezone's
    # name from the tzinfo.zone attribute
    tz = [pytz.timezone('Etc/GMT-9'), pytz.FixedOffset(180)]
    expected = ['Etc/GMT-9', '+03:00']
    assert [pa.lib.tzinfo_to_string(i) for i in tz] == expected
Loading ...