Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

arrow-nightlies / pyarrow   python

Repository URL to install this package:

/ tests / conftest.py

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

import functools
import os
import pathlib
import subprocess
import sys
import time
import urllib.request

import pytest
import hypothesis as h
from ..conftest import groups, defaults

from pyarrow import set_timezone_db_path
from pyarrow.util import find_free_port


# setup hypothesis profiles
h.settings.register_profile('ci', max_examples=1000)
h.settings.register_profile('dev', max_examples=50)
h.settings.register_profile('debug', max_examples=10,
                            verbosity=h.Verbosity.verbose)

# load default hypothesis profile, either set HYPOTHESIS_PROFILE environment
# variable or pass --hypothesis-profile option to pytest, to see the generated
# examples try:
# pytest pyarrow -sv --enable-hypothesis --hypothesis-profile=debug
h.settings.load_profile(os.environ.get('HYPOTHESIS_PROFILE', 'dev'))

# Set this at the beginning before the AWS SDK was loaded to avoid reading in
# user configuration values.
os.environ['AWS_CONFIG_FILE'] = "/dev/null"


if sys.platform == 'win32':
    tzdata_set_path = os.environ.get('PYARROW_TZDATA_PATH', None)
    if tzdata_set_path:
        set_timezone_db_path(tzdata_set_path)


def pytest_addoption(parser):
    # Create options to selectively enable test groups
    def bool_env(name, default=None):
        value = os.environ.get(name.upper())
        if not value:  # missing or empty
            return default
        value = value.lower()
        if value in {'1', 'true', 'on', 'yes', 'y'}:
            return True
        elif value in {'0', 'false', 'off', 'no', 'n'}:
            return False
        else:
            raise ValueError('{}={} is not parsable as boolean'
                             .format(name.upper(), value))

    for group in groups:
        default = bool_env('PYARROW_TEST_{}'.format(group), defaults[group])
        parser.addoption('--enable-{}'.format(group),
                         action='store_true', default=default,
                         help=('Enable the {} test group'.format(group)))
        parser.addoption('--disable-{}'.format(group),
                         action='store_true', default=False,
                         help=('Disable the {} test group'.format(group)))


class PyArrowConfig:
    def __init__(self):
        self.is_enabled = {}

    def apply_mark(self, mark):
        group = mark.name
        if group in groups:
            self.requires(group)

    def requires(self, group):
        if not self.is_enabled[group]:
            pytest.skip('{} NOT enabled'.format(group))


def pytest_configure(config):
    # Apply command-line options to initialize PyArrow-specific config object
    config.pyarrow = PyArrowConfig()

    for mark in groups:
        config.addinivalue_line(
            "markers", mark,
        )

        enable_flag = '--enable-{}'.format(mark)
        disable_flag = '--disable-{}'.format(mark)

        is_enabled = (config.getoption(enable_flag) and not
                      config.getoption(disable_flag))
        config.pyarrow.is_enabled[mark] = is_enabled


def pytest_runtest_setup(item):
    # Apply test markers to skip tests selectively
    for mark in item.iter_markers():
        item.config.pyarrow.apply_mark(mark)


@pytest.fixture
def tempdir(tmpdir):
    # convert pytest's LocalPath to pathlib.Path
    return pathlib.Path(tmpdir.strpath)


@pytest.fixture(scope='session')
def base_datadir():
    return pathlib.Path(__file__).parent / 'data'


@pytest.fixture(autouse=True)
def disable_aws_metadata(monkeypatch):
    """Stop the AWS SDK from trying to contact the EC2 metadata server.

    Otherwise, this causes a 5 second delay in tests that exercise the
    S3 filesystem.
    """
    monkeypatch.setenv("AWS_EC2_METADATA_DISABLED", "true")


# TODO(kszucs): move the following fixtures to test_fs.py once the previous
# parquet dataset implementation and hdfs implementation are removed.

@pytest.fixture(scope='session')
def hdfs_connection():
    host = os.environ.get('ARROW_HDFS_TEST_HOST', 'default')
    port = int(os.environ.get('ARROW_HDFS_TEST_PORT', 0))
    user = os.environ.get('ARROW_HDFS_TEST_USER', 'hdfs')
    return host, port, user


@pytest.fixture(scope='session')
def s3_connection():
    host, port = 'localhost', find_free_port()
    access_key, secret_key = 'arrow', 'apachearrow'
    return host, port, access_key, secret_key


def retry(attempts=3, delay=1.0, max_delay=None, backoff=1):
    """
    Retry decorator

    Parameters
    ----------
    attempts : int, default 3
        The number of attempts.
    delay : float, default 1
        Initial delay in seconds.
    max_delay : float, optional
        The max delay between attempts.
    backoff : float, default 1
        The multiplier to delay after each attempt.
    """
    def decorate(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            remaining_attempts = attempts
            curr_delay = delay
            while remaining_attempts > 0:
                try:
                    return func(*args, **kwargs)
                except Exception as err:
                    remaining_attempts -= 1
                    last_exception = err
                    curr_delay *= backoff
                    if max_delay:
                        curr_delay = min(curr_delay, max_delay)
                    time.sleep(curr_delay)
            raise last_exception
        return wrapper
    return decorate


@pytest.fixture(scope='session')
def s3_server(s3_connection, tmpdir_factory):
    @retry(attempts=5, delay=0.1, backoff=2)
    def minio_server_health_check(address):
        resp = urllib.request.urlopen(f"http://{address}/minio/health/cluster")
        assert resp.getcode() == 200

    tmpdir = tmpdir_factory.getbasetemp()
    host, port, access_key, secret_key = s3_connection

    address = '{}:{}'.format(host, port)
    env = os.environ.copy()
    env.update({
        'MINIO_ACCESS_KEY': access_key,
        'MINIO_SECRET_KEY': secret_key
    })

    args = ['minio', '--compat', 'server', '--quiet', '--address',
            address, tmpdir]
    proc = None
    try:
        proc = subprocess.Popen(args, env=env)
    except OSError:
        pytest.skip('`minio` command cannot be located')
    else:
        # Wait for the server to startup before yielding
        minio_server_health_check(address)

        yield {
            'connection': s3_connection,
            'process': proc,
            'tempdir': tmpdir
        }
    finally:
        if proc is not None:
            proc.kill()
            proc.wait()


@pytest.fixture(scope='session')
def gcs_server():
    port = find_free_port()
    env = os.environ.copy()
    args = [sys.executable, '-m', 'testbench', '--port', str(port)]
    proc = None
    try:
        # check first if testbench module is available
        import testbench  # noqa:F401
        # start server
        proc = subprocess.Popen(args, env=env)
        # Make sure the server is alive.
        if proc.poll() is not None:
            pytest.skip(f"Command {args} did not start server successfully!")
    except (ModuleNotFoundError, OSError) as e:
        pytest.skip(f"Command {args} failed to execute: {e}")
    else:
        yield {
            'connection': ('localhost', port),
            'process': proc,
        }
    finally:
        if proc is not None:
            proc.kill()
            proc.wait()


@pytest.fixture(scope='session')
def azure_server(tmpdir_factory):
    port = find_free_port()
    env = os.environ.copy()
    tmpdir = tmpdir_factory.getbasetemp()
    # We only need blob service emulator, not queue or table.
    args = ['azurite-blob', "--location", tmpdir, "--blobPort", str(port)]
    proc = None
    try:
        proc = subprocess.Popen(args, env=env)
        # Make sure the server is alive.
        if proc.poll() is not None:
            pytest.skip(f"Command {args} did not start server successfully!")
    except (ModuleNotFoundError, OSError) as e:
        pytest.skip(f"Command {args} failed to execute: {e}")
    else:
        yield {
            # Use the standard azurite account_name and account_key.
            # https://learn.microsoft.com/en-us/azure/storage/common/storage-use-emulator#authorize-with-shared-key-credentials
            'connection': ('127.0.0.1', port, 'devstoreaccount1',
                           'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2'
                           'UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=='),
            'process': proc,
            'tempdir': tmpdir,
        }
    finally:
        if proc is not None:
            proc.kill()
            proc.wait()


@pytest.fixture(
    params=[
        'builtin_pickle',
        'cloudpickle'
    ],
    scope='session'
)
def pickle_module(request):
    return request.getfixturevalue(request.param)


@pytest.fixture(scope='session')
def builtin_pickle():
    import pickle
    return pickle


@pytest.fixture(scope='session')
def cloudpickle():
    cp = pytest.importorskip('cloudpickle')
    if 'HIGHEST_PROTOCOL' not in cp.__dict__:
        cp.HIGHEST_PROTOCOL = cp.DEFAULT_PROTOCOL
    return cp