Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
pyarrow / _compute.pyx
Size: Mime:
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

# cython: language_level = 3

import sys

from cpython.object cimport Py_LT, Py_EQ, Py_GT, Py_LE, Py_NE, Py_GE
from cython.operator cimport dereference as deref

from collections import namedtuple

from pyarrow.lib import frombytes, tobytes, ArrowInvalid
from pyarrow.lib cimport *
from pyarrow.includes.common cimport *
from pyarrow.includes.libarrow cimport *
import pyarrow.lib as lib
from pyarrow.util import _DEPR_MSG
from libcpp cimport bool as c_bool

import inspect
import numpy as np
import warnings


__pas = None
_substrait_msg = (
    "The pyarrow installation is not built with support for Substrait."
)


def _pas():
    global __pas
    if __pas is None:
        try:
            import pyarrow.substrait as pas
            __pas = pas
        except ImportError:
            raise ImportError(_substrait_msg)
    return __pas


def _forbid_instantiation(klass, subclasses_instead=True):
    msg = '{} is an abstract class thus cannot be initialized.'.format(
        klass.__name__
    )
    if subclasses_instead:
        subclasses = [cls.__name__ for cls in klass.__subclasses__]
        msg += ' Use one of the subclasses instead: {}'.format(
            ', '.join(subclasses)
        )
    raise TypeError(msg)


cdef wrap_scalar_function(const shared_ptr[CFunction]& sp_func):
    """
    Wrap a C++ scalar Function in a ScalarFunction object.
    """
    cdef ScalarFunction func = ScalarFunction.__new__(ScalarFunction)
    func.init(sp_func)
    return func


cdef wrap_vector_function(const shared_ptr[CFunction]& sp_func):
    """
    Wrap a C++ vector Function in a VectorFunction object.
    """
    cdef VectorFunction func = VectorFunction.__new__(VectorFunction)
    func.init(sp_func)
    return func


cdef wrap_scalar_aggregate_function(const shared_ptr[CFunction]& sp_func):
    """
    Wrap a C++ aggregate Function in a ScalarAggregateFunction object.
    """
    cdef ScalarAggregateFunction func = \
        ScalarAggregateFunction.__new__(ScalarAggregateFunction)
    func.init(sp_func)
    return func


cdef wrap_hash_aggregate_function(const shared_ptr[CFunction]& sp_func):
    """
    Wrap a C++ aggregate Function in a HashAggregateFunction object.
    """
    cdef HashAggregateFunction func = \
        HashAggregateFunction.__new__(HashAggregateFunction)
    func.init(sp_func)
    return func


cdef wrap_meta_function(const shared_ptr[CFunction]& sp_func):
    """
    Wrap a C++ meta Function in a MetaFunction object.
    """
    cdef MetaFunction func = MetaFunction.__new__(MetaFunction)
    func.init(sp_func)
    return func


cdef wrap_function(const shared_ptr[CFunction]& sp_func):
    """
    Wrap a C++ Function in a Function object.

    This dispatches to specialized wrappers depending on the function kind.
    """
    if sp_func.get() == NULL:
        raise ValueError("Function was NULL")

    cdef FunctionKind c_kind = sp_func.get().kind()
    if c_kind == FunctionKind_SCALAR:
        return wrap_scalar_function(sp_func)
    elif c_kind == FunctionKind_VECTOR:
        return wrap_vector_function(sp_func)
    elif c_kind == FunctionKind_SCALAR_AGGREGATE:
        return wrap_scalar_aggregate_function(sp_func)
    elif c_kind == FunctionKind_HASH_AGGREGATE:
        return wrap_hash_aggregate_function(sp_func)
    elif c_kind == FunctionKind_META:
        return wrap_meta_function(sp_func)
    else:
        raise NotImplementedError("Unknown Function::Kind")


cdef wrap_scalar_kernel(const CScalarKernel* c_kernel):
    if c_kernel == NULL:
        raise ValueError("Kernel was NULL")
    cdef ScalarKernel kernel = ScalarKernel.__new__(ScalarKernel)
    kernel.init(c_kernel)
    return kernel


cdef wrap_vector_kernel(const CVectorKernel* c_kernel):
    if c_kernel == NULL:
        raise ValueError("Kernel was NULL")
    cdef VectorKernel kernel = VectorKernel.__new__(VectorKernel)
    kernel.init(c_kernel)
    return kernel


cdef wrap_scalar_aggregate_kernel(const CScalarAggregateKernel* c_kernel):
    if c_kernel == NULL:
        raise ValueError("Kernel was NULL")
    cdef ScalarAggregateKernel kernel = \
        ScalarAggregateKernel.__new__(ScalarAggregateKernel)
    kernel.init(c_kernel)
    return kernel


cdef wrap_hash_aggregate_kernel(const CHashAggregateKernel* c_kernel):
    if c_kernel == NULL:
        raise ValueError("Kernel was NULL")
    cdef HashAggregateKernel kernel = \
        HashAggregateKernel.__new__(HashAggregateKernel)
    kernel.init(c_kernel)
    return kernel


cdef class Kernel(_Weakrefable):
    """
    A kernel object.

    Kernels handle the execution of a Function for a certain signature.
    """

    def __init__(self):
        raise TypeError("Do not call {}'s constructor directly"
                        .format(self.__class__.__name__))


cdef class ScalarKernel(Kernel):
    cdef const CScalarKernel* kernel

    cdef void init(self, const CScalarKernel* kernel) except *:
        self.kernel = kernel

    def __repr__(self):
        return ("ScalarKernel<{}>"
                .format(frombytes(self.kernel.signature.get().ToString())))


cdef class VectorKernel(Kernel):
    cdef const CVectorKernel* kernel

    cdef void init(self, const CVectorKernel* kernel) except *:
        self.kernel = kernel

    def __repr__(self):
        return ("VectorKernel<{}>"
                .format(frombytes(self.kernel.signature.get().ToString())))


cdef class ScalarAggregateKernel(Kernel):
    cdef const CScalarAggregateKernel* kernel

    cdef void init(self, const CScalarAggregateKernel* kernel) except *:
        self.kernel = kernel

    def __repr__(self):
        return ("ScalarAggregateKernel<{}>"
                .format(frombytes(self.kernel.signature.get().ToString())))


cdef class HashAggregateKernel(Kernel):
    cdef const CHashAggregateKernel* kernel

    cdef void init(self, const CHashAggregateKernel* kernel) except *:
        self.kernel = kernel

    def __repr__(self):
        return ("HashAggregateKernel<{}>"
                .format(frombytes(self.kernel.signature.get().ToString())))


FunctionDoc = namedtuple(
    "FunctionDoc",
    ("summary", "description", "arg_names", "options_class",
     "options_required"))


cdef class Function(_Weakrefable):
    """
    A compute function.

    A function implements a certain logical computation over a range of
    possible input signatures.  Each signature accepts a range of input
    types and is implemented by a given Kernel.

    Functions can be of different kinds:

    * "scalar" functions apply an item-wise computation over all items
      of their inputs.  Each item in the output only depends on the values
      of the inputs at the same position.  Examples: addition, comparisons,
      string predicates...

    * "vector" functions apply a collection-wise computation, such that
      each item in the output may depend on the values of several items
      in each input.  Examples: dictionary encoding, sorting, extracting
      unique values...

    * "scalar_aggregate" functions reduce the dimensionality of the inputs by
      applying a reduction function.  Examples: sum, min_max, mode...

    * "hash_aggregate" functions apply a reduction function to an input
      subdivided by grouping criteria.  They may not be directly called.
      Examples: hash_sum, hash_min_max...

    * "meta" functions dispatch to other functions.
    """

    cdef:
        shared_ptr[CFunction] sp_func
        CFunction* base_func

    _kind_map = {
        FunctionKind_SCALAR: "scalar",
        FunctionKind_VECTOR: "vector",
        FunctionKind_SCALAR_AGGREGATE: "scalar_aggregate",
        FunctionKind_HASH_AGGREGATE: "hash_aggregate",
        FunctionKind_META: "meta",
    }

    def __init__(self):
        raise TypeError("Do not call {}'s constructor directly"
                        .format(self.__class__.__name__))

    cdef void init(self, const shared_ptr[CFunction]& sp_func) except *:
        self.sp_func = sp_func
        self.base_func = sp_func.get()

    def __repr__(self):
        return ("arrow.compute.Function<name={}, kind={}, "
                "arity={}, num_kernels={}>"
                .format(self.name, self.kind, self.arity, self.num_kernels))

    def __reduce__(self):
        # Reduction uses the global registry
        return get_function, (self.name,)

    @property
    def name(self):
        """
        The function name.
        """
        return frombytes(self.base_func.name())

    @property
    def arity(self):
        """
        The function arity.

        If Ellipsis (i.e. `...`) is returned, the function takes a variable
        number of arguments.
        """
        cdef CArity arity = self.base_func.arity()
        if arity.is_varargs:
            return ...
        else:
            return arity.num_args

    @property
    def kind(self):
        """
        The function kind.
        """
        cdef FunctionKind c_kind = self.base_func.kind()
        try:
            return self._kind_map[c_kind]
        except KeyError:
            raise NotImplementedError("Unknown Function::Kind")

    @property
    def _doc(self):
        """
        The C++-like function documentation (for internal use).
        """
        cdef CFunctionDoc c_doc = self.base_func.doc()
        return FunctionDoc(frombytes(c_doc.summary),
                           frombytes(c_doc.description),
                           [frombytes(s) for s in c_doc.arg_names],
                           frombytes(c_doc.options_class),
                           c_doc.options_required)

    @property
    def num_kernels(self):
        """
        The number of kernels implementing this function.
        """
        return self.base_func.num_kernels()

    def call(self, args, FunctionOptions options=None,
             MemoryPool memory_pool=None, length=None):
        """
        Call the function on the given arguments.

        Parameters
        ----------
        args : iterable
            The arguments to pass to the function.  Accepted types depend
            on the specific function.
        options : FunctionOptions, optional
            Options instance for executing this function.  This should have
            the right concrete options type.
        memory_pool : pyarrow.MemoryPool, optional
            If not passed, will allocate memory from the default memory pool.
        length : int, optional
            Batch size for execution, for nullary (no argument) functions. If
            not passed, will be inferred from passed data.
        """
        cdef:
            const CFunctionOptions* c_options = NULL
            CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool)
            CExecContext c_exec_ctx = CExecContext(pool)
            CExecBatch c_batch
            CDatum result

        _pack_compute_args(args, &c_batch.values)

        if options is not None:
            c_options = options.get_options()

        if length is not None:
            c_batch.length = length
            with nogil:
                result = GetResultValue(
                    self.base_func.Execute(c_batch, c_options, &c_exec_ctx)
                )
        else:
            with nogil:
                result = GetResultValue(
                    self.base_func.Execute(c_batch.values, c_options,
                                           &c_exec_ctx)
                )

        return wrap_datum(result)


cdef class ScalarFunction(Function):
    cdef const CScalarFunction* func

    cdef void init(self, const shared_ptr[CFunction]& sp_func) except *:
        Function.init(self, sp_func)
        self.func = <const CScalarFunction*> sp_func.get()

    @property
    def kernels(self):
        """
        The kernels implementing this function.
        """
        cdef vector[const CScalarKernel*] kernels = self.func.kernels()
        return [wrap_scalar_kernel(k) for k in kernels]


cdef class VectorFunction(Function):
    cdef const CVectorFunction* func

    cdef void init(self, const shared_ptr[CFunction]& sp_func) except *:
        Function.init(self, sp_func)
        self.func = <const CVectorFunction*> sp_func.get()

    @property
    def kernels(self):
        """
        The kernels implementing this function.
        """
        cdef vector[const CVectorKernel*] kernels = self.func.kernels()
        return [wrap_vector_kernel(k) for k in kernels]


cdef class ScalarAggregateFunction(Function):
    cdef const CScalarAggregateFunction* func

    cdef void init(self, const shared_ptr[CFunction]& sp_func) except *:
        Function.init(self, sp_func)
        self.func = <const CScalarAggregateFunction*> sp_func.get()

    @property
    def kernels(self):
        """
        The kernels implementing this function.
        """
        cdef vector[const CScalarAggregateKernel*] kernels = \
            self.func.kernels()
        return [wrap_scalar_aggregate_kernel(k) for k in kernels]


cdef class HashAggregateFunction(Function):
    cdef const CHashAggregateFunction* func

    cdef void init(self, const shared_ptr[CFunction]& sp_func) except *:
        Function.init(self, sp_func)
        self.func = <const CHashAggregateFunction*> sp_func.get()

    @property
    def kernels(self):
        """
        The kernels implementing this function.
        """
        cdef vector[const CHashAggregateKernel*] kernels = self.func.kernels()
        return [wrap_hash_aggregate_kernel(k) for k in kernels]


cdef class MetaFunction(Function):
    cdef const CMetaFunction* func

    cdef void init(self, const shared_ptr[CFunction]& sp_func) except *:
        Function.init(self, sp_func)
        self.func = <const CMetaFunction*> sp_func.get()

    # Since num_kernels is exposed, also expose a kernels property
    @property
    def kernels(self):
        """
        The kernels implementing this function.
        """
        return []


cdef _pack_compute_args(object values, vector[CDatum]* out):
    for val in values:
        if isinstance(val, (list, np.ndarray)):
            val = lib.asarray(val)

        if isinstance(val, Array):
            out.push_back(CDatum((<Array> val).sp_array))
            continue
        elif isinstance(val, ChunkedArray):
            out.push_back(CDatum((<ChunkedArray> val).sp_chunked_array))
            continue
        elif isinstance(val, Scalar):
            out.push_back(CDatum((<Scalar> val).unwrap()))
            continue
        elif isinstance(val, RecordBatch):
            out.push_back(CDatum((<RecordBatch> val).sp_batch))
            continue
        elif isinstance(val, Table):
            out.push_back(CDatum((<Table> val).sp_table))
            continue
        else:
            # Is it a Python scalar?
            try:
                scal = lib.scalar(val)
            except Exception:
                # Raise dedicated error below
                pass
            else:
                out.push_back(CDatum((<Scalar> scal).unwrap()))
                continue

        raise TypeError(f"Got unexpected argument type {type(val)} "
                        "for compute function")


cdef class FunctionRegistry(_Weakrefable):
    cdef CFunctionRegistry* registry

    def __init__(self):
        self.registry = GetFunctionRegistry()

    def list_functions(self):
        """
        Return all function names in the registry.
        """
        cdef vector[c_string] names = self.registry.GetFunctionNames()
        return [frombytes(name) for name in names]

    def get_function(self, name):
        """
        Look up a function by name in the registry.

        Parameters
        ----------
        name : str
            The name of the function to lookup
        """
        cdef:
            c_string c_name = tobytes(name)
            shared_ptr[CFunction] func
        with nogil:
            func = GetResultValue(self.registry.GetFunction(c_name))
        return wrap_function(func)


cdef FunctionRegistry _global_func_registry = FunctionRegistry()


def function_registry():
    return _global_func_registry


def get_function(name):
    """
    Get a function by name.

    The function is looked up in the global registry
    (as returned by `function_registry()`).

    Parameters
    ----------
    name : str
        The name of the function to lookup
    """
    return _global_func_registry.get_function(name)


def list_functions():
    """
    Return all function names in the global registry.
    """
    return _global_func_registry.list_functions()


def call_function(name, args, options=None, memory_pool=None, length=None):
    """
    Call a named function.

    The function is looked up in the global registry
    (as returned by `function_registry()`).

    Parameters
    ----------
    name : str
        The name of the function to call.
    args : list
        The arguments to the function.
    options : optional
        options provided to the function.
    memory_pool : MemoryPool, optional
        memory pool to use for allocations during function execution.
    length : int, optional
        Batch size for execution, for nullary (no argument) functions. If not
        passed, inferred from data.
    """
    func = _global_func_registry.get_function(name)
    return func.call(args, options=options, memory_pool=memory_pool,
                     length=length)


cdef class FunctionOptions(_Weakrefable):
    __slots__ = ()  # avoid mistakingly creating attributes

    cdef const CFunctionOptions* get_options(self) except NULL:
        return self.wrapped.get()

    cdef void init(self, const shared_ptr[CFunctionOptions]& sp):
        self.wrapped = sp

    cdef inline shared_ptr[CFunctionOptions] unwrap(self):
        return self.wrapped

    def serialize(self):
        cdef:
            CResult[shared_ptr[CBuffer]] res = self.get_options().Serialize()
            shared_ptr[CBuffer] c_buf = GetResultValue(res)
        return pyarrow_wrap_buffer(c_buf)

    @staticmethod
    def deserialize(buf):
        """
        Deserialize options for a function.

        Parameters
        ----------
        buf : Buffer
            The buffer containing the data to deserialize.
        """
        cdef:
            shared_ptr[CBuffer] c_buf = pyarrow_unwrap_buffer(buf)
            CResult[unique_ptr[CFunctionOptions]] maybe_options = \
                DeserializeFunctionOptions(deref(c_buf))
            shared_ptr[CFunctionOptions] c_options
        c_options = to_shared(GetResultValue(move(maybe_options)))
        type_name = frombytes(c_options.get().options_type().type_name())
        module = globals()
        if type_name not in module:
            raise ValueError(f'Cannot deserialize "{type_name}"')
        klass = module[type_name]
        options = klass.__new__(klass)
        (<FunctionOptions> options).init(c_options)
        return options

    def __repr__(self):
        type_name = self.__class__.__name__
        # Remove {} so we can use our own braces
        string_repr = frombytes(self.get_options().ToString())[1:-1]
        return f"{type_name}({string_repr})"

    def __eq__(self, FunctionOptions other):
        return self.get_options().Equals(deref(other.get_options()))


def _raise_invalid_function_option(value, description, *,
                                   exception_class=ValueError):
    raise exception_class(f"\"{value}\" is not a valid {description}")


# NOTE:
# To properly expose the constructor signature of FunctionOptions
# subclasses, we use a two-level inheritance:
# 1. a C extension class that implements option validation and setting
#    (won't expose function signatures because of
#     https://github.com/cython/cython/issues/3873)
# 2. a Python derived class that implements the constructor

cdef class _CastOptions(FunctionOptions):
    cdef CCastOptions* options

    cdef void init(self, const shared_ptr[CFunctionOptions]& sp):
        FunctionOptions.init(self, sp)
        self.options = <CCastOptions*> self.wrapped.get()

    def _set_options(self, DataType target_type, allow_int_overflow,
                     allow_time_truncate, allow_time_overflow,
                     allow_decimal_truncate, allow_float_truncate,
                     allow_invalid_utf8):
        cdef:
            shared_ptr[CCastOptions] wrapped = make_shared[CCastOptions]()
        self.init(<shared_ptr[CFunctionOptions]> wrapped)
        self._set_type(target_type)
        if allow_int_overflow is not None:
            self.allow_int_overflow = allow_int_overflow
        if allow_time_truncate is not None:
            self.allow_time_truncate = allow_time_truncate
        if allow_time_overflow is not None:
            self.allow_time_overflow = allow_time_overflow
        if allow_decimal_truncate is not None:
            self.allow_decimal_truncate = allow_decimal_truncate
        if allow_float_truncate is not None:
            self.allow_float_truncate = allow_float_truncate
        if allow_invalid_utf8 is not None:
            self.allow_invalid_utf8 = allow_invalid_utf8

    def _set_type(self, target_type=None):
        if target_type is not None:
            deref(self.options).to_type = \
                (<DataType> ensure_type(target_type)).sp_type

    def _set_safe(self):
        self.init(shared_ptr[CFunctionOptions](
            new CCastOptions(CCastOptions.Safe())))

    def _set_unsafe(self):
        self.init(shared_ptr[CFunctionOptions](
            new CCastOptions(CCastOptions.Unsafe())))

    def is_safe(self):
        return not (deref(self.options).allow_int_overflow or
                    deref(self.options).allow_time_truncate or
                    deref(self.options).allow_time_overflow or
                    deref(self.options).allow_decimal_truncate or
                    deref(self.options).allow_float_truncate or
                    deref(self.options).allow_invalid_utf8)

    @property
    def allow_int_overflow(self):
        return deref(self.options).allow_int_overflow

    @allow_int_overflow.setter
    def allow_int_overflow(self, c_bool flag):
        deref(self.options).allow_int_overflow = flag

    @property
    def allow_time_truncate(self):
        return deref(self.options).allow_time_truncate

    @allow_time_truncate.setter
    def allow_time_truncate(self, c_bool flag):
        deref(self.options).allow_time_truncate = flag

    @property
    def allow_time_overflow(self):
        return deref(self.options).allow_time_overflow

    @allow_time_overflow.setter
    def allow_time_overflow(self, c_bool flag):
        deref(self.options).allow_time_overflow = flag

    @property
    def allow_decimal_truncate(self):
        return deref(self.options).allow_decimal_truncate

    @allow_decimal_truncate.setter
    def allow_decimal_truncate(self, c_bool flag):
        deref(self.options).allow_decimal_truncate = flag

    @property
    def allow_float_truncate(self):
        return deref(self.options).allow_float_truncate

    @allow_float_truncate.setter
    def allow_float_truncate(self, c_bool flag):
        deref(self.options).allow_float_truncate = flag

    @property
    def allow_invalid_utf8(self):
        return deref(self.options).allow_invalid_utf8

    @allow_invalid_utf8.setter
    def allow_invalid_utf8(self, c_bool flag):
        deref(self.options).allow_invalid_utf8 = flag


class CastOptions(_CastOptions):
    """
    Options for the `cast` function.

    Parameters
    ----------
    target_type : DataType, optional
        The PyArrow type to cast to.
    allow_int_overflow : bool, default False
        Whether integer overflow is allowed when casting.
    allow_time_truncate : bool, default False
        Whether time precision truncation is allowed when casting.
    allow_time_overflow : bool, default False
        Whether date/time range overflow is allowed when casting.
    allow_decimal_truncate : bool, default False
        Whether decimal precision truncation is allowed when casting.
    allow_float_truncate : bool, default False
        Whether floating-point precision truncation is allowed when casting.
    allow_invalid_utf8 : bool, default False
        Whether producing invalid utf8 data is allowed when casting.
    """

    def __init__(self, target_type=None, *, allow_int_overflow=None,
                 allow_time_truncate=None, allow_time_overflow=None,
                 allow_decimal_truncate=None, allow_float_truncate=None,
                 allow_invalid_utf8=None):
        self._set_options(target_type, allow_int_overflow, allow_time_truncate,
                          allow_time_overflow, allow_decimal_truncate,
                          allow_float_truncate, allow_invalid_utf8)

    @staticmethod
    def safe(target_type=None):
        """"
        Create a CastOptions for a safe cast.

        Parameters
        ----------
        target_type : optional
            Target cast type for the safe cast.
        """
        self = CastOptions()
        self._set_safe()
        self._set_type(target_type)
        return self

    @staticmethod
    def unsafe(target_type=None):
        """"
        Create a CastOptions for an unsafe cast.

        Parameters
        ----------
        target_type : optional
            Target cast type for the unsafe cast.
        """
        self = CastOptions()
        self._set_unsafe()
        self._set_type(target_type)
        return self


def _skip_nulls_doc():
    # (note the weird indent because of how the string is inserted
    #  by callers)
    return """skip_nulls : bool, default True
        Whether to skip (ignore) nulls in the input.
        If False, any null in the input forces the output to null.
"""


def _min_count_doc(*, default):
    return f"""min_count : int, default {default}
        Minimum number of non-null values in the input.  If the number
        of non-null values is below `min_count`, the output is null.
"""


cdef class _ElementWiseAggregateOptions(FunctionOptions):
    def _set_options(self, skip_nulls):
        self.wrapped.reset(new CElementWiseAggregateOptions(skip_nulls))


class ElementWiseAggregateOptions(_ElementWiseAggregateOptions):
    __doc__ = f"""
    Options for element-wise aggregate functions.

    Parameters
    ----------
    {_skip_nulls_doc()}
    """

    def __init__(self, *, skip_nulls=True):
        self._set_options(skip_nulls)


cdef CRoundMode unwrap_round_mode(round_mode) except *:
    if round_mode == "down":
        return CRoundMode_DOWN
    elif round_mode == "up":
        return CRoundMode_UP
    elif round_mode == "towards_zero":
        return CRoundMode_TOWARDS_ZERO
    elif round_mode == "towards_infinity":
        return CRoundMode_TOWARDS_INFINITY
    elif round_mode == "half_down":
        return CRoundMode_HALF_DOWN
    elif round_mode == "half_up":
        return CRoundMode_HALF_UP
    elif round_mode == "half_towards_zero":
        return CRoundMode_HALF_TOWARDS_ZERO
    elif round_mode == "half_towards_infinity":
        return CRoundMode_HALF_TOWARDS_INFINITY
    elif round_mode == "half_to_even":
        return CRoundMode_HALF_TO_EVEN
    elif round_mode == "half_to_odd":
        return CRoundMode_HALF_TO_ODD
    _raise_invalid_function_option(round_mode, "round mode")


cdef class _RoundOptions(FunctionOptions):
    def _set_options(self, ndigits, round_mode):
        self.wrapped.reset(
            new CRoundOptions(ndigits, unwrap_round_mode(round_mode))
        )


class RoundOptions(_RoundOptions):
    """
    Options for rounding numbers.

    Parameters
    ----------
    ndigits : int, default 0
        Number of fractional digits to round to.
    round_mode : str, default "half_to_even"
        Rounding and tie-breaking mode.
        Accepted values are "down", "up", "towards_zero", "towards_infinity",
        "half_down", "half_up", "half_towards_zero", "half_towards_infinity",
        "half_to_even", "half_to_odd".
    """

    def __init__(self, ndigits=0, round_mode="half_to_even"):
        self._set_options(ndigits, round_mode)


cdef class _RoundBinaryOptions(FunctionOptions):
    def _set_options(self, round_mode):
        self.wrapped.reset(
            new CRoundBinaryOptions(unwrap_round_mode(round_mode))
        )


class RoundBinaryOptions(_RoundBinaryOptions):
    """
    Options for rounding numbers when ndigits is provided by a second array

    Parameters
    ----------
    round_mode : str, default "half_to_even"
        Rounding and tie-breaking mode.
        Accepted values are "down", "up", "towards_zero", "towards_infinity",
        "half_down", "half_up", "half_towards_zero", "half_towards_infinity",
        "half_to_even", "half_to_odd".
    """

    def __init__(self, round_mode="half_to_even"):
        self._set_options(round_mode)


cdef CCalendarUnit unwrap_round_temporal_unit(unit) except *:
    if unit == "nanosecond":
        return CCalendarUnit_NANOSECOND
    elif unit == "microsecond":
        return CCalendarUnit_MICROSECOND
    elif unit == "millisecond":
        return CCalendarUnit_MILLISECOND
    elif unit == "second":
        return CCalendarUnit_SECOND
    elif unit == "minute":
        return CCalendarUnit_MINUTE
    elif unit == "hour":
        return CCalendarUnit_HOUR
    elif unit == "day":
        return CCalendarUnit_DAY
    elif unit == "week":
        return CCalendarUnit_WEEK
    elif unit == "month":
        return CCalendarUnit_MONTH
    elif unit == "quarter":
        return CCalendarUnit_QUARTER
    elif unit == "year":
        return CCalendarUnit_YEAR
    _raise_invalid_function_option(unit, "Calendar unit")


cdef class _RoundTemporalOptions(FunctionOptions):
    def _set_options(self, multiple, unit, week_starts_monday,
                     ceil_is_strictly_greater, calendar_based_origin):
        self.wrapped.reset(
            new CRoundTemporalOptions(
                multiple, unwrap_round_temporal_unit(unit),
                week_starts_monday, ceil_is_strictly_greater,
                calendar_based_origin)
        )


class RoundTemporalOptions(_RoundTemporalOptions):
    """
    Options for rounding temporal values.

    Parameters
    ----------
    multiple : int, default 1
        Number of units to round to.
    unit : str, default "day"
        The unit in which `multiple` is expressed.
        Accepted values are "year", "quarter", "month", "week", "day",
        "hour", "minute", "second", "millisecond", "microsecond",
        "nanosecond".
    week_starts_monday : bool, default True
        If True, weeks start on Monday; if False, on Sunday.
    ceil_is_strictly_greater : bool, default False
        If True, ceil returns a rounded value that is strictly greater than the
        input. For example: ceiling 1970-01-01T00:00:00 to 3 hours would
        yield 1970-01-01T03:00:00 if set to True and 1970-01-01T00:00:00
        if set to False.
        This applies to the ceil_temporal function only.
    calendar_based_origin : bool, default False
        By default, the origin is 1970-01-01T00:00:00. By setting this to True,
        rounding origin will be beginning of one less precise calendar unit.
        E.g.: rounding to hours will use beginning of day as origin.

        By default time is rounded to a multiple of units since
        1970-01-01T00:00:00. By setting calendar_based_origin to true,
        time will be rounded to number of units since the last greater
        calendar unit.
        For example: rounding to multiple of days since the beginning of the
        month or to hours since the beginning of the day.
        Exceptions: week and quarter are not used as greater units,
        therefore days will be rounded to the beginning of the month not
        week. Greater unit of week is a year.
        Note that ceiling and rounding might change sorting order of an array
        near greater unit change. For example rounding YYYY-mm-dd 23:00:00 to
        5 hours will ceil and round to YYYY-mm-dd+1 01:00:00 and floor to
        YYYY-mm-dd 20:00:00. On the other hand YYYY-mm-dd+1 00:00:00 will
        ceil, round and floor to YYYY-mm-dd+1 00:00:00. This can break the
        order of an already ordered array.

    """

    def __init__(self, multiple=1, unit="day", *, week_starts_monday=True,
                 ceil_is_strictly_greater=False,
                 calendar_based_origin=False):
        self._set_options(multiple, unit, week_starts_monday,
                          ceil_is_strictly_greater,
                          calendar_based_origin)


cdef class _RoundToMultipleOptions(FunctionOptions):
    def _set_options(self, multiple, round_mode):
        if not isinstance(multiple, Scalar):
            try:
                multiple = lib.scalar(multiple)
            except Exception:
                _raise_invalid_function_option(
                    multiple, "multiple type for RoundToMultipleOptions",
                    exception_class=TypeError)

        self.wrapped.reset(
            new CRoundToMultipleOptions(
                pyarrow_unwrap_scalar(multiple), unwrap_round_mode(round_mode))
        )


class RoundToMultipleOptions(_RoundToMultipleOptions):
    """
    Options for rounding numbers to a multiple.

    Parameters
    ----------
    multiple : numeric scalar, default 1.0
        Multiple to round to. Should be a scalar of a type compatible
        with the argument to be rounded.
    round_mode : str, default "half_to_even"
        Rounding and tie-breaking mode.
        Accepted values are "down", "up", "towards_zero", "towards_infinity",
        "half_down", "half_up", "half_towards_zero", "half_towards_infinity",
        "half_to_even", "half_to_odd".
    """

    def __init__(self, multiple=1.0, round_mode="half_to_even"):
        self._set_options(multiple, round_mode)


cdef class _JoinOptions(FunctionOptions):
    _null_handling_map = {
        "emit_null": CJoinNullHandlingBehavior_EMIT_NULL,
        "skip": CJoinNullHandlingBehavior_SKIP,
        "replace": CJoinNullHandlingBehavior_REPLACE,
    }

    def _set_options(self, null_handling, null_replacement):
        try:
            self.wrapped.reset(
                new CJoinOptions(self._null_handling_map[null_handling],
                                 tobytes(null_replacement))
            )
        except KeyError:
            _raise_invalid_function_option(null_handling, "null handling")


class JoinOptions(_JoinOptions):
    """
    Options for the `binary_join_element_wise` function.

    Parameters
    ----------
    null_handling : str, default "emit_null"
        How to handle null values in the inputs.
        Accepted values are "emit_null", "skip", "replace".
    null_replacement : str, default ""
        Replacement string to emit for null inputs if `null_handling`
        is "replace".
    """

    def __init__(self, null_handling="emit_null", null_replacement=""):
        self._set_options(null_handling, null_replacement)


cdef class _MatchSubstringOptions(FunctionOptions):
    def _set_options(self, pattern, ignore_case):
        self.wrapped.reset(
            new CMatchSubstringOptions(tobytes(pattern), ignore_case)
        )


class MatchSubstringOptions(_MatchSubstringOptions):
    """
    Options for looking for a substring.

    Parameters
    ----------
    pattern : str
        Substring pattern to look for inside input values.
    ignore_case : bool, default False
        Whether to perform a case-insensitive match.
    """

    def __init__(self, pattern, *, ignore_case=False):
        self._set_options(pattern, ignore_case)


cdef class _PadOptions(FunctionOptions):
    def _set_options(self, width, padding, lean_left_on_odd_padding):
        self.wrapped.reset(new CPadOptions(width, tobytes(padding), lean_left_on_odd_padding))


class PadOptions(_PadOptions):
    """
    Options for padding strings.

    Parameters
    ----------
    width : int
        Desired string length.
    padding : str, default " "
        What to pad the string with. Should be one byte or codepoint.
    lean_left_on_odd_padding : bool, default True
        What to do if there is an odd number of padding characters (in case
        of centered padding). Defaults to aligning on the left (i.e. adding
        the extra padding character on the right).
    """

    def __init__(self, width, padding=' ', lean_left_on_odd_padding=True):
        self._set_options(width, padding, lean_left_on_odd_padding)


cdef class _TrimOptions(FunctionOptions):
    def _set_options(self, characters):
        self.wrapped.reset(new CTrimOptions(tobytes(characters)))


class TrimOptions(_TrimOptions):
    """
    Options for trimming characters from strings.

    Parameters
    ----------
    characters : str
        Individual characters to be trimmed from the string.
    """

    def __init__(self, characters):
        self._set_options(tobytes(characters))


cdef class _ReplaceSubstringOptions(FunctionOptions):
    def _set_options(self, pattern, replacement, max_replacements):
        self.wrapped.reset(
            new CReplaceSubstringOptions(tobytes(pattern),
                                         tobytes(replacement),
                                         max_replacements)
        )


class ReplaceSubstringOptions(_ReplaceSubstringOptions):
    """
    Options for replacing matched substrings.

    Parameters
    ----------
    pattern : str
        Substring pattern to look for inside input values.
    replacement : str
        What to replace the pattern with.
    max_replacements : int or None, default None
        The maximum number of strings to replace in each
        input value (unlimited if None).
    """

    def __init__(self, pattern, replacement, *, max_replacements=None):
        if max_replacements is None:
            max_replacements = -1
        self._set_options(pattern, replacement, max_replacements)


cdef class _ExtractRegexOptions(FunctionOptions):
    def _set_options(self, pattern):
        self.wrapped.reset(new CExtractRegexOptions(tobytes(pattern)))


class ExtractRegexOptions(_ExtractRegexOptions):
    """
    Options for the `extract_regex` function.

    Parameters
    ----------
    pattern : str
        Regular expression with named capture fields.
    """

    def __init__(self, pattern):
        self._set_options(pattern)


cdef class _SliceOptions(FunctionOptions):
    def _set_options(self, start, stop, step):
        self.wrapped.reset(new CSliceOptions(start, stop, step))


class SliceOptions(_SliceOptions):
    """
    Options for slicing.

    Parameters
    ----------
    start : int
        Index to start slicing at (inclusive).
    stop : int or None, default None
        If given, index to stop slicing at (exclusive).
        If not given, slicing will stop at the end.
    step : int, default 1
        Slice step.
    """

    def __init__(self, start, stop=None, step=1):
        if stop is None:
            stop = sys.maxsize
            if step < 0:
                stop = -stop
        self._set_options(start, stop, step)


cdef class _ListSliceOptions(FunctionOptions):
    cpdef _set_options(self, start, stop=None, step=1, return_fixed_size_list=None):
        cdef:
            CListSliceOptions* opts
        opts = new CListSliceOptions(
            start,
            <optional[int64_t]>nullopt if stop is None
            else <optional[int64_t]>(<int64_t>stop),
            step,
            <optional[c_bool]>nullopt if return_fixed_size_list is None
            else <optional[c_bool]>(<c_bool>return_fixed_size_list)
        )
        self.wrapped.reset(opts)


class ListSliceOptions(_ListSliceOptions):
    """
    Options for list array slicing.

    Parameters
    ----------
    start : int
        Index to start slicing inner list elements (inclusive).
    stop : Optional[int], default None
        If given, index to stop slicing at (exclusive).
        If not given, slicing will stop at the end. (NotImplemented)
    step : int, default 1
        Slice step.
    return_fixed_size_list : Optional[bool], default None
        Whether to return a FixedSizeListArray. If true _and_ stop is after
        a list element's length, nulls will be appended to create the
        requested slice size. The default of `None` will return the same
        type which was passed in.
    """

    def __init__(self, start, stop=None, step=1, return_fixed_size_list=None):
        self._set_options(start, stop, step, return_fixed_size_list)


cdef class _ReplaceSliceOptions(FunctionOptions):
    def _set_options(self, start, stop, replacement):
        self.wrapped.reset(
            new CReplaceSliceOptions(start, stop, tobytes(replacement))
        )


class ReplaceSliceOptions(_ReplaceSliceOptions):
    """
    Options for replacing slices.

    Parameters
    ----------
    start : int
        Index to start slicing at (inclusive).
    stop : int
        Index to stop slicing at (exclusive).
    replacement : str
        What to replace the slice with.
    """

    def __init__(self, start, stop, replacement):
        self._set_options(start, stop, replacement)


cdef class _FilterOptions(FunctionOptions):
    _null_selection_map = {
        "drop": CFilterNullSelectionBehavior_DROP,
        "emit_null": CFilterNullSelectionBehavior_EMIT_NULL,
    }

    def _set_options(self, null_selection_behavior):
        try:
            self.wrapped.reset(
                new CFilterOptions(
                    self._null_selection_map[null_selection_behavior]
                )
            )
        except KeyError:
            _raise_invalid_function_option(null_selection_behavior,
                                           "null selection behavior")


class FilterOptions(_FilterOptions):
    """
    Options for selecting with a boolean filter.

    Parameters
    ----------
    null_selection_behavior : str, default "drop"
        How to handle nulls in the selection filter.
        Accepted values are "drop", "emit_null".
    """

    def __init__(self, null_selection_behavior="drop"):
        self._set_options(null_selection_behavior)


cdef class _DictionaryEncodeOptions(FunctionOptions):
    _null_encoding_map = {
        "encode": CDictionaryEncodeNullEncodingBehavior_ENCODE,
        "mask": CDictionaryEncodeNullEncodingBehavior_MASK,
    }

    def _set_options(self, null_encoding):
        try:
            self.wrapped.reset(
                new CDictionaryEncodeOptions(
                    self._null_encoding_map[null_encoding]
                )
            )
        except KeyError:
            _raise_invalid_function_option(null_encoding, "null encoding")


class DictionaryEncodeOptions(_DictionaryEncodeOptions):
    """
    Options for dictionary encoding.

    Parameters
    ----------
    null_encoding : str, default "mask"
        How to encode nulls in the input.
        Accepted values are "mask" (null inputs emit a null in the indices
        array), "encode" (null inputs emit a non-null index pointing to
        a null value in the dictionary array).
    """

    def __init__(self, null_encoding="mask"):
        self._set_options(null_encoding)


cdef class _RunEndEncodeOptions(FunctionOptions):
    def _set_options(self, run_end_type):
        run_end_ty = ensure_type(run_end_type)
        self.wrapped.reset(new CRunEndEncodeOptions(pyarrow_unwrap_data_type(run_end_ty)))


class RunEndEncodeOptions(_RunEndEncodeOptions):
    """
    Options for run-end encoding.

    Parameters
    ----------
    run_end_type : DataType, default pyarrow.int32()
        The data type of the run_ends array.

        Accepted values are pyarrow.{int16(), int32(), int64()}.
    """

    def __init__(self, run_end_type=lib.int32()):
        self._set_options(run_end_type)


cdef class _TakeOptions(FunctionOptions):
    def _set_options(self, boundscheck):
        self.wrapped.reset(new CTakeOptions(boundscheck))


class TakeOptions(_TakeOptions):
    """
    Options for the `take` and `array_take` functions.

    Parameters
    ----------
    boundscheck : boolean, default True
        Whether to check indices are within bounds. If False and an
        index is out of bounds, behavior is undefined (the process
        may crash).
    """

    def __init__(self, *, boundscheck=True):
        self._set_options(boundscheck)


cdef class _MakeStructOptions(FunctionOptions):
    def _set_options(self, field_names, field_nullability, field_metadata):
        cdef:
            vector[c_string] c_field_names
            vector[shared_ptr[const CKeyValueMetadata]] c_field_metadata
        for name in field_names:
            c_field_names.push_back(tobytes(name))
        for metadata in field_metadata:
            c_field_metadata.push_back(pyarrow_unwrap_metadata(metadata))
        self.wrapped.reset(
            new CMakeStructOptions(c_field_names, field_nullability,
                                   c_field_metadata)
        )


class MakeStructOptions(_MakeStructOptions):
    """
    Options for the `make_struct` function.

    Parameters
    ----------
    field_names : sequence of str
        Names of the struct fields to create.
    field_nullability : sequence of bool, optional
        Nullability information for each struct field.
        If omitted, all fields are nullable.
    field_metadata : sequence of KeyValueMetadata, optional
        Metadata for each struct field.
    """

    def __init__(self, field_names=(), *, field_nullability=None,
                 field_metadata=None):
        if field_nullability is None:
            field_nullability = [True] * len(field_names)
        if field_metadata is None:
            field_metadata = [None] * len(field_names)
        self._set_options(field_names, field_nullability, field_metadata)


cdef CFieldRef _ensure_field_ref(value) except *:
    cdef:
        CFieldRef field_ref
        const CFieldRef* field_ref_ptr

    if isinstance(value, (list, tuple)):
        value = Expression._nested_field(tuple(value))

    if isinstance(value, Expression):
        field_ref_ptr = (<Expression>value).unwrap().field_ref()
        if field_ref_ptr is NULL:
            raise ValueError("Unable to get FieldRef from Expression")
        field_ref = <CFieldRef>deref(field_ref_ptr)
    elif isinstance(value, (bytes, str)):
        if value.startswith(b'.' if isinstance(value, bytes) else '.'):
            field_ref = GetResultValue(
                CFieldRef.FromDotPath(<c_string>tobytes(value)))
        else:
            field_ref = CFieldRef(<c_string>tobytes(value))
    elif isinstance(value, int):
        field_ref = CFieldRef(<int> value)
    else:
        raise TypeError("Expected a field reference as a str or int, list of "
                        f"str or int, or Expression. Got {type(value)} instead.")
    return field_ref


cdef class _StructFieldOptions(FunctionOptions):
    def _set_options(self, indices):

        if isinstance(indices, (list, tuple)) and not len(indices):
            # Allow empty indices; effectively return same array
            self.wrapped.reset(
                new CStructFieldOptions(<vector[int]>indices))
            return

        cdef CFieldRef field_ref = _ensure_field_ref(indices)
        self.wrapped.reset(new CStructFieldOptions(field_ref))


class StructFieldOptions(_StructFieldOptions):
    """
    Options for the `struct_field` function.

    Parameters
    ----------
    indices : List[str], List[bytes], List[int], Expression, bytes, str, or int
        List of indices for chained field lookup, for example `[4, 1]`
        will look up the second nested field in the fifth outer field.
    """

    def __init__(self, indices):
        self._set_options(indices)


cdef class _ScalarAggregateOptions(FunctionOptions):
    def _set_options(self, skip_nulls, min_count):
        self.wrapped.reset(new CScalarAggregateOptions(skip_nulls, min_count))


class ScalarAggregateOptions(_ScalarAggregateOptions):
    __doc__ = f"""
    Options for scalar aggregations.

    Parameters
    ----------
    {_skip_nulls_doc()}
    {_min_count_doc(default=1)}
    """

    def __init__(self, *, skip_nulls=True, min_count=1):
        self._set_options(skip_nulls, min_count)


cdef class _CountOptions(FunctionOptions):
    _mode_map = {
        "only_valid": CCountMode_ONLY_VALID,
        "only_null": CCountMode_ONLY_NULL,
        "all": CCountMode_ALL,
    }

    def _set_options(self, mode):
        try:
            self.wrapped.reset(new CCountOptions(self._mode_map[mode]))
        except KeyError:
            _raise_invalid_function_option(mode, "count mode")


class CountOptions(_CountOptions):
    """
    Options for the `count` function.

    Parameters
    ----------
    mode : str, default "only_valid"
        Which values to count in the input.
        Accepted values are "only_valid", "only_null", "all".
    """

    def __init__(self, mode="only_valid"):
        self._set_options(mode)


cdef class _IndexOptions(FunctionOptions):
    def _set_options(self, scalar):
        self.wrapped.reset(new CIndexOptions(pyarrow_unwrap_scalar(scalar)))


class IndexOptions(_IndexOptions):
    """
    Options for the `index` function.

    Parameters
    ----------
    value : Scalar
        The value to search for.
    """

    def __init__(self, value):
        self._set_options(value)


cdef class _MapLookupOptions(FunctionOptions):
    _occurrence_map = {
        "all": CMapLookupOccurrence_ALL,
        "first": CMapLookupOccurrence_FIRST,
        "last": CMapLookupOccurrence_LAST,
    }

    def _set_options(self, query_key, occurrence):
        try:
            self.wrapped.reset(
                new CMapLookupOptions(
                    pyarrow_unwrap_scalar(query_key),
                    self._occurrence_map[occurrence]
                )
            )
        except KeyError:
            _raise_invalid_function_option(occurrence,
                                           "Should either be first, last, or all")


class MapLookupOptions(_MapLookupOptions):
    """
    Options for the `map_lookup` function.

    Parameters
    ----------
    query_key : Scalar or Object can be converted to Scalar
        The key to search for.
    occurrence : str
        The occurrence(s) to return from the Map
        Accepted values are "first", "last", or "all".
    """

    def __init__(self, query_key, occurrence):
        if not isinstance(query_key, lib.Scalar):
            query_key = lib.scalar(query_key)

        self._set_options(query_key, occurrence)


cdef class _ModeOptions(FunctionOptions):
    def _set_options(self, n, skip_nulls, min_count):
        self.wrapped.reset(new CModeOptions(n, skip_nulls, min_count))


class ModeOptions(_ModeOptions):
    __doc__ = f"""
    Options for the `mode` function.

    Parameters
    ----------
    n : int, default 1
        Number of distinct most-common values to return.
    {_skip_nulls_doc()}
    {_min_count_doc(default=0)}
    """

    def __init__(self, n=1, *, skip_nulls=True, min_count=0):
        self._set_options(n, skip_nulls, min_count)


cdef class _SetLookupOptions(FunctionOptions):
    def _set_options(self, value_set, c_bool skip_nulls):
        cdef unique_ptr[CDatum] valset
        if isinstance(value_set, Array):
            valset.reset(new CDatum((<Array> value_set).sp_array))
        elif isinstance(value_set, ChunkedArray):
            valset.reset(
                new CDatum((<ChunkedArray> value_set).sp_chunked_array)
            )
        elif isinstance(value_set, Scalar):
            valset.reset(new CDatum((<Scalar> value_set).unwrap()))
        else:
            _raise_invalid_function_option(value_set, "value set",
                                           exception_class=TypeError)

        self.wrapped.reset(new CSetLookupOptions(deref(valset), skip_nulls))


class SetLookupOptions(_SetLookupOptions):
    """
    Options for the `is_in` and `index_in` functions.

    Parameters
    ----------
    value_set : Array
        Set of values to look for in the input.
    skip_nulls : bool, default False
        If False, nulls in the input are matched in the value_set just
        like regular values.
        If True, nulls in the input always fail matching.
    """

    def __init__(self, value_set, *, skip_nulls=False):
        self._set_options(value_set, skip_nulls)


cdef class _StrptimeOptions(FunctionOptions):
    _unit_map = {
        "s": TimeUnit_SECOND,
        "ms": TimeUnit_MILLI,
        "us": TimeUnit_MICRO,
        "ns": TimeUnit_NANO,
    }

    def _set_options(self, format, unit, error_is_null):
        try:
            self.wrapped.reset(
                new CStrptimeOptions(tobytes(format), self._unit_map[unit],
                                     error_is_null)
            )
        except KeyError:
            _raise_invalid_function_option(unit, "time unit")


class StrptimeOptions(_StrptimeOptions):
    """
    Options for the `strptime` function.

    Parameters
    ----------
    format : str
        Pattern for parsing input strings as timestamps, such as "%Y/%m/%d".
        Note that the semantics of the format follow the C/C++ strptime, not the Python one.
        There are differences in behavior, for example how the "%y" placeholder
        handles years with less than four digits.
    unit : str
        Timestamp unit of the output.
        Accepted values are "s", "ms", "us", "ns".
    error_is_null : boolean, default False
        Return null on parsing errors if true or raise if false.
    """

    def __init__(self, format, unit, error_is_null=False):
        self._set_options(format, unit, error_is_null)


cdef class _StrftimeOptions(FunctionOptions):
    def _set_options(self, format, locale):
        self.wrapped.reset(
            new CStrftimeOptions(tobytes(format), tobytes(locale))
        )


class StrftimeOptions(_StrftimeOptions):
    """
    Options for the `strftime` function.

    Parameters
    ----------
    format : str, default "%Y-%m-%dT%H:%M:%S"
        Pattern for formatting input values.
    locale : str, default "C"
        Locale to use for locale-specific format specifiers.
    """

    def __init__(self, format="%Y-%m-%dT%H:%M:%S", locale="C"):
        self._set_options(format, locale)


cdef class _DayOfWeekOptions(FunctionOptions):
    def _set_options(self, count_from_zero, week_start):
        self.wrapped.reset(
            new CDayOfWeekOptions(count_from_zero, week_start)
        )


class DayOfWeekOptions(_DayOfWeekOptions):
    """
    Options for the `day_of_week` function.

    Parameters
    ----------
    count_from_zero : bool, default True
        If True, number days from 0, otherwise from 1.
    week_start : int, default 1
        Which day does the week start with (Monday=1, Sunday=7).
        How this value is numbered is unaffected by `count_from_zero`.
    """

    def __init__(self, *, count_from_zero=True, week_start=1):
        self._set_options(count_from_zero, week_start)


cdef class _WeekOptions(FunctionOptions):
    def _set_options(self, week_starts_monday, count_from_zero,
                     first_week_is_fully_in_year):
        self.wrapped.reset(
            new CWeekOptions(week_starts_monday, count_from_zero,
                             first_week_is_fully_in_year)
        )


class WeekOptions(_WeekOptions):
    """
    Options for the `week` function.

    Parameters
    ----------
    week_starts_monday : bool, default True
        If True, weeks start on Monday; if False, on Sunday.
    count_from_zero : bool, default False
        If True, dates at the start of a year that fall into the last week
        of the previous year emit 0.
        If False, they emit 52 or 53 (the week number of the last week
        of the previous year).
    first_week_is_fully_in_year : bool, default False
        If True, week number 0 is fully in January.
        If False, a week that begins on December 29, 30 or 31 is considered
        to be week number 0 of the following year.
    """

    def __init__(self, *, week_starts_monday=True, count_from_zero=False,
                 first_week_is_fully_in_year=False):
        self._set_options(week_starts_monday,
                          count_from_zero, first_week_is_fully_in_year)


cdef class _AssumeTimezoneOptions(FunctionOptions):
    _ambiguous_map = {
        "raise": CAssumeTimezoneAmbiguous_AMBIGUOUS_RAISE,
        "earliest": CAssumeTimezoneAmbiguous_AMBIGUOUS_EARLIEST,
        "latest": CAssumeTimezoneAmbiguous_AMBIGUOUS_LATEST,
    }
    _nonexistent_map = {
        "raise": CAssumeTimezoneNonexistent_NONEXISTENT_RAISE,
        "earliest": CAssumeTimezoneNonexistent_NONEXISTENT_EARLIEST,
        "latest": CAssumeTimezoneNonexistent_NONEXISTENT_LATEST,
    }

    def _set_options(self, timezone, ambiguous, nonexistent):
        if ambiguous not in self._ambiguous_map:
            _raise_invalid_function_option(ambiguous,
                                           "'ambiguous' timestamp handling")
        if nonexistent not in self._nonexistent_map:
            _raise_invalid_function_option(nonexistent,
                                           "'nonexistent' timestamp handling")
        self.wrapped.reset(
            new CAssumeTimezoneOptions(tobytes(timezone),
                                       self._ambiguous_map[ambiguous],
                                       self._nonexistent_map[nonexistent])
        )


class AssumeTimezoneOptions(_AssumeTimezoneOptions):
    """
    Options for the `assume_timezone` function.

    Parameters
    ----------
    timezone : str
        Timezone to assume for the input.
    ambiguous : str, default "raise"
        How to handle timestamps that are ambiguous in the assumed timezone.
        Accepted values are "raise", "earliest", "latest".
    nonexistent : str, default "raise"
        How to handle timestamps that don't exist in the assumed timezone.
        Accepted values are "raise", "earliest", "latest".
    """

    def __init__(self, timezone, *, ambiguous="raise", nonexistent="raise"):
        self._set_options(timezone, ambiguous, nonexistent)


cdef class _NullOptions(FunctionOptions):
    def _set_options(self, nan_is_null):
        self.wrapped.reset(new CNullOptions(nan_is_null))


class NullOptions(_NullOptions):
    """
    Options for the `is_null` function.

    Parameters
    ----------
    nan_is_null : bool, default False
        Whether floating-point NaN values are considered null.
    """

    def __init__(self, *, nan_is_null=False):
        self._set_options(nan_is_null)


cdef class _VarianceOptions(FunctionOptions):
    def _set_options(self, ddof, skip_nulls, min_count):
        self.wrapped.reset(new CVarianceOptions(ddof, skip_nulls, min_count))


class VarianceOptions(_VarianceOptions):
    __doc__ = f"""
    Options for the `variance` and `stddev` functions.

    Parameters
    ----------
    ddof : int, default 0
        Number of degrees of freedom.
    {_skip_nulls_doc()}
    {_min_count_doc(default=0)}
    """

    def __init__(self, *, ddof=0, skip_nulls=True, min_count=0):
        self._set_options(ddof, skip_nulls, min_count)


cdef class _SplitOptions(FunctionOptions):
    def _set_options(self, max_splits, reverse):
        self.wrapped.reset(new CSplitOptions(max_splits, reverse))


class SplitOptions(_SplitOptions):
    """
    Options for splitting on whitespace.

    Parameters
    ----------
    max_splits : int or None, default None
        Maximum number of splits for each input value (unlimited if None).
    reverse : bool, default False
        Whether to start splitting from the end of each input value.
        This only has an effect if `max_splits` is not None.
    """

    def __init__(self, *, max_splits=None, reverse=False):
        if max_splits is None:
            max_splits = -1
        self._set_options(max_splits, reverse)


cdef class _SplitPatternOptions(FunctionOptions):
    def _set_options(self, pattern, max_splits, reverse):
        self.wrapped.reset(
            new CSplitPatternOptions(tobytes(pattern), max_splits, reverse)
        )


class SplitPatternOptions(_SplitPatternOptions):
    """
    Options for splitting on a string pattern.

    Parameters
    ----------
    pattern : str
        String pattern to split on.
    max_splits : int or None, default None
        Maximum number of splits for each input value (unlimited if None).
    reverse : bool, default False
        Whether to start splitting from the end of each input value.
        This only has an effect if `max_splits` is not None.
    """

    def __init__(self, pattern, *, max_splits=None, reverse=False):
        if max_splits is None:
            max_splits = -1
        self._set_options(pattern, max_splits, reverse)


cdef CSortOrder unwrap_sort_order(order) except *:
    if order == "ascending":
        return CSortOrder_Ascending
    elif order == "descending":
        return CSortOrder_Descending
    _raise_invalid_function_option(order, "sort order")


cdef CNullPlacement unwrap_null_placement(null_placement) except *:
    if null_placement == "at_start":
        return CNullPlacement_AtStart
    elif null_placement == "at_end":
        return CNullPlacement_AtEnd
    _raise_invalid_function_option(null_placement, "null placement")


cdef class _PartitionNthOptions(FunctionOptions):
    def _set_options(self, pivot, null_placement):
        self.wrapped.reset(new CPartitionNthOptions(
            pivot, unwrap_null_placement(null_placement)))


class PartitionNthOptions(_PartitionNthOptions):
    """
    Options for the `partition_nth_indices` function.

    Parameters
    ----------
    pivot : int
        Index into the equivalent sorted array of the pivot element.
    null_placement : str, default "at_end"
        Where nulls in the input should be partitioned.
        Accepted values are "at_start", "at_end".
    """

    def __init__(self, pivot, *, null_placement="at_end"):
        self._set_options(pivot, null_placement)


cdef class _CumulativeOptions(FunctionOptions):
    def _set_options(self, start, skip_nulls):
        if start is None:
            self.wrapped.reset(new CCumulativeOptions(skip_nulls))
        elif isinstance(start, Scalar):
            self.wrapped.reset(new CCumulativeOptions(
                pyarrow_unwrap_scalar(start), skip_nulls))
        else:
            try:
                start = lib.scalar(start)
                self.wrapped.reset(new CCumulativeOptions(
                    pyarrow_unwrap_scalar(start), skip_nulls))
            except Exception:
                _raise_invalid_function_option(
                    start, "`start` type for CumulativeOptions", TypeError)


class CumulativeOptions(_CumulativeOptions):
    """
    Options for `cumulative_*` functions.

    - cumulative_sum
    - cumulative_sum_checked
    - cumulative_prod
    - cumulative_prod_checked
    - cumulative_max
    - cumulative_min

    Parameters
    ----------
    start : Scalar, default None
        Starting value for the cumulative operation. If none is given,
        a default value depending on the operation and input type is used.
    skip_nulls : bool, default False
        When false, the first encountered null is propagated.
    """

    def __init__(self, start=None, *, skip_nulls=False):
        self._set_options(start, skip_nulls)


class CumulativeSumOptions(_CumulativeOptions):
    """
    Options for `cumulative_sum` function.

    Parameters
    ----------
    start : Scalar, default None
        Starting value for sum computation
    skip_nulls : bool, default False
        When false, the first encountered null is propagated.
    """

    def __init__(self, start=None, *, skip_nulls=False):
        warnings.warn(
            _DEPR_MSG.format("CumulativeSumOptions", "14.0", "CumulativeOptions"),
            FutureWarning,
            stacklevel=2
        )
        self._set_options(start, skip_nulls)


cdef class _PairwiseOptions(FunctionOptions):
    def _set_options(self, period):
        self.wrapped.reset(new CPairwiseOptions(period))


class PairwiseOptions(_PairwiseOptions):
    """
    Options for `pairwise` functions.

    Parameters
    ----------
    period : int, default 1
        Period for applying the period function.
    """

    def __init__(self, period=1):
        self._set_options(period)


cdef class _ListFlattenOptions(FunctionOptions):
    def _set_options(self, recursive):
        self.wrapped.reset(new CListFlattenOptions(recursive))


class ListFlattenOptions(_ListFlattenOptions):
    """
    Options for `list_flatten` function

    Parameters
    ----------
    recursive : bool, default False
        When True, the list array is flattened recursively until an array
        of non-list values is formed.
    """

    def __init__(self, recursive=False):
        self._set_options(recursive)


cdef class _ArraySortOptions(FunctionOptions):
    def _set_options(self, order, null_placement):
        self.wrapped.reset(new CArraySortOptions(
            unwrap_sort_order(order), unwrap_null_placement(null_placement)))


class ArraySortOptions(_ArraySortOptions):
    """
    Options for the `array_sort_indices` function.

    Parameters
    ----------
    order : str, default "ascending"
        Which order to sort values in.
        Accepted values are "ascending", "descending".
    null_placement : str, default "at_end"
        Where nulls in the input should be sorted.
        Accepted values are "at_start", "at_end".
    """

    def __init__(self, order="ascending", *, null_placement="at_end"):
        self._set_options(order, null_placement)


cdef class _SortOptions(FunctionOptions):
    def _set_options(self, sort_keys, null_placement):
        cdef vector[CSortKey] c_sort_keys
        for name, order in sort_keys:
            c_sort_keys.push_back(
                CSortKey(_ensure_field_ref(name), unwrap_sort_order(order))
            )
        self.wrapped.reset(new CSortOptions(
            c_sort_keys, unwrap_null_placement(null_placement)))


class SortOptions(_SortOptions):
    """
    Options for the `sort_indices` function.

    Parameters
    ----------
    sort_keys : sequence of (name, order) tuples
        Names of field/column keys to sort the input on,
        along with the order each field/column is sorted in.
        Accepted values for `order` are "ascending", "descending".
        The field name can be a string column name or expression.
    null_placement : str, default "at_end"
        Where nulls in input should be sorted, only applying to
        columns/fields mentioned in `sort_keys`.
        Accepted values are "at_start", "at_end".
    """

    def __init__(self, sort_keys=(), *, null_placement="at_end"):
        self._set_options(sort_keys, null_placement)


cdef class _SelectKOptions(FunctionOptions):
    def _set_options(self, k, sort_keys):
        cdef vector[CSortKey] c_sort_keys
        for name, order in sort_keys:
            c_sort_keys.push_back(
                CSortKey(_ensure_field_ref(name), unwrap_sort_order(order))
            )
        self.wrapped.reset(new CSelectKOptions(k, c_sort_keys))


class SelectKOptions(_SelectKOptions):
    """
    Options for top/bottom k-selection.

    Parameters
    ----------
    k : int
        Number of leading values to select in sorted order
        (i.e. the largest values if sort order is "descending",
        the smallest otherwise).
    sort_keys : sequence of (name, order) tuples
        Names of field/column keys to sort the input on,
        along with the order each field/column is sorted in.
        Accepted values for `order` are "ascending", "descending".
        The field name can be a string column name or expression.
    """

    def __init__(self, k, sort_keys):
        self._set_options(k, sort_keys)


cdef class _QuantileOptions(FunctionOptions):
    _interp_map = {
        "linear": CQuantileInterp_LINEAR,
        "lower": CQuantileInterp_LOWER,
        "higher": CQuantileInterp_HIGHER,
        "nearest": CQuantileInterp_NEAREST,
        "midpoint": CQuantileInterp_MIDPOINT,
    }

    def _set_options(self, quantiles, interp, skip_nulls, min_count):
        try:
            self.wrapped.reset(
                new CQuantileOptions(quantiles, self._interp_map[interp],
                                     skip_nulls, min_count)
            )
        except KeyError:
            _raise_invalid_function_option(interp, "quantile interpolation")


class QuantileOptions(_QuantileOptions):
    __doc__ = f"""
    Options for the `quantile` function.

    Parameters
    ----------
    q : double or sequence of double, default 0.5
        Probability levels of the quantiles to compute. All values must be in
        [0, 1].
    interpolation : str, default "linear"
        How to break ties between competing data points for a given quantile.
        Accepted values are:

        - "linear": compute an interpolation
        - "lower": always use the smallest of the two data points
        - "higher": always use the largest of the two data points
        - "nearest": select the data point that is closest to the quantile
        - "midpoint": compute the (unweighted) mean of the two data points
    {_skip_nulls_doc()}
    {_min_count_doc(default=0)}
    """

    def __init__(self, q=0.5, *, interpolation="linear", skip_nulls=True,
                 min_count=0):
        if not isinstance(q, (list, tuple, np.ndarray)):
            q = [q]
        self._set_options(q, interpolation, skip_nulls, min_count)


cdef class _TDigestOptions(FunctionOptions):
    def _set_options(self, quantiles, delta, buffer_size, skip_nulls,
                     min_count):
        self.wrapped.reset(
            new CTDigestOptions(quantiles, delta, buffer_size, skip_nulls,
                                min_count)
        )


class TDigestOptions(_TDigestOptions):
    __doc__ = f"""
    Options for the `tdigest` function.

    Parameters
    ----------
    q : double or sequence of double, default 0.5
        Probability levels of the quantiles to approximate. All values must be
        in [0, 1].
    delta : int, default 100
        Compression parameter for the T-digest algorithm.
    buffer_size : int, default 500
        Buffer size for the T-digest algorithm.
    {_skip_nulls_doc()}
    {_min_count_doc(default=0)}
    """

    def __init__(self, q=0.5, *, delta=100, buffer_size=500, skip_nulls=True,
                 min_count=0):
        if not isinstance(q, (list, tuple, np.ndarray)):
            q = [q]
        self._set_options(q, delta, buffer_size, skip_nulls, min_count)


cdef class _Utf8NormalizeOptions(FunctionOptions):
    _form_map = {
        "NFC": CUtf8NormalizeForm_NFC,
        "NFKC": CUtf8NormalizeForm_NFKC,
        "NFD": CUtf8NormalizeForm_NFD,
        "NFKD": CUtf8NormalizeForm_NFKD,
    }

    def _set_options(self, form):
        try:
            self.wrapped.reset(
                new CUtf8NormalizeOptions(self._form_map[form])
            )
        except KeyError:
            _raise_invalid_function_option(form,
                                           "Unicode normalization form")


class Utf8NormalizeOptions(_Utf8NormalizeOptions):
    """
    Options for the `utf8_normalize` function.

    Parameters
    ----------
    form : str
        Unicode normalization form.
        Accepted values are "NFC", "NFKC", "NFD", NFKD".
    """

    def __init__(self, form):
        self._set_options(form)


cdef class _RandomOptions(FunctionOptions):
    def _set_options(self, initializer):
        if initializer == 'system':
            self.wrapped.reset(new CRandomOptions(
                CRandomOptions.FromSystemRandom()))
            return

        if not isinstance(initializer, int):
            try:
                initializer = hash(initializer)
            except TypeError:
                raise TypeError(
                    f"initializer should be 'system', an integer, "
                    f"or a hashable object; got {initializer!r}")

        if initializer < 0:
            initializer += 2**64
        self.wrapped.reset(new CRandomOptions(
            CRandomOptions.FromSeed(initializer)))


class RandomOptions(_RandomOptions):
    """
    Options for random generation.

    Parameters
    ----------
    initializer : int or str
        How to initialize the underlying random generator.
        If an integer is given, it is used as a seed.
        If "system" is given, the random generator is initialized with
        a system-specific source of (hopefully true) randomness.
        Other values are invalid.
    """

    def __init__(self, *, initializer='system'):
        self._set_options(initializer)


cdef class _RankOptions(FunctionOptions):

    _tiebreaker_map = {
        "min": CRankOptionsTiebreaker_Min,
        "max": CRankOptionsTiebreaker_Max,
        "first": CRankOptionsTiebreaker_First,
        "dense": CRankOptionsTiebreaker_Dense,
    }

    def _set_options(self, sort_keys, null_placement, tiebreaker):
        cdef vector[CSortKey] c_sort_keys
        if isinstance(sort_keys, str):
            c_sort_keys.push_back(
                CSortKey(_ensure_field_ref(""), unwrap_sort_order(sort_keys))
            )
        else:
            for name, order in sort_keys:
                c_sort_keys.push_back(
                    CSortKey(_ensure_field_ref(name), unwrap_sort_order(order))
                )
        try:
            self.wrapped.reset(
                new CRankOptions(c_sort_keys,
                                 unwrap_null_placement(null_placement),
                                 self._tiebreaker_map[tiebreaker])
            )
        except KeyError:
            _raise_invalid_function_option(tiebreaker, "tiebreaker")


class RankOptions(_RankOptions):
    """
    Options for the `rank` function.

    Parameters
    ----------
    sort_keys : sequence of (name, order) tuples or str, default "ascending"
        Names of field/column keys to sort the input on,
        along with the order each field/column is sorted in.
        Accepted values for `order` are "ascending", "descending".
        The field name can be a string column name or expression.
        Alternatively, one can simply pass "ascending" or "descending" as a string
        if the input is array-like.
    null_placement : str, default "at_end"
        Where nulls in input should be sorted.
        Accepted values are "at_start", "at_end".
    tiebreaker : str, default "first"
        Configure how ties between equal values are handled.
        Accepted values are:

        - "min": Ties get the smallest possible rank in sorted order.
        - "max": Ties get the largest possible rank in sorted order.
        - "first": Ranks are assigned in order of when ties appear in the
                   input. This ensures the ranks are a stable permutation
                   of the input.
        - "dense": The ranks span a dense [1, M] interval where M is the
                   number of distinct values in the input.
    """

    def __init__(self, sort_keys="ascending", *, null_placement="at_end", tiebreaker="first"):
        self._set_options(sort_keys, null_placement, tiebreaker)


cdef class Expression(_Weakrefable):
    """
    A logical expression to be evaluated against some input.

    To create an expression:

    - Use the factory function ``pyarrow.compute.scalar()`` to create a
      scalar (not necessary when combined, see example below).
    - Use the factory function ``pyarrow.compute.field()`` to reference
      a field (column in table).
    - Compare fields and scalars with ``<``, ``<=``, ``==``, ``>=``, ``>``.
    - Combine expressions using python operators ``&`` (logical and),
      ``|`` (logical or) and ``~`` (logical not).
      Note: python keywords ``and``, ``or`` and ``not`` cannot be used
      to combine expressions.
    - Create expression predicates using Expression methods such as
      ``pyarrow.compute.Expression.isin()``.

    Examples
    --------

    >>> import pyarrow.compute as pc
    >>> (pc.field("a") < pc.scalar(3)) | (pc.field("b") > 7)
    <pyarrow.compute.Expression ((a < 3) or (b > 7))>
    >>> pc.field('a') != 3
    <pyarrow.compute.Expression (a != 3)>
    >>> pc.field('a').isin([1, 2, 3])
    <pyarrow.compute.Expression is_in(a, {value_set=int64:[
      1,
      2,
      3
    ], null_matching_behavior=MATCH})>
    """

    def __init__(self):
        msg = 'Expression is an abstract class thus cannot be initialized.'
        raise TypeError(msg)

    cdef void init(self, const CExpression& sp):
        self.expr = sp

    @staticmethod
    cdef wrap(const CExpression& sp):
        cdef Expression self = Expression.__new__(Expression)
        self.init(sp)
        return self

    cdef inline CExpression unwrap(self):
        return self.expr

    def equals(self, Expression other):
        """
        Parameters
        ----------
        other : pyarrow.dataset.Expression

        Returns
        -------
        bool
        """
        return self.expr.Equals(other.unwrap())

    def __str__(self):
        return frombytes(self.expr.ToString())

    def __repr__(self):
        return "<pyarrow.compute.{0} {1}>".format(
            self.__class__.__name__, str(self)
        )

    @staticmethod
    def from_substrait(object buffer not None):
        """
        Deserialize an expression from Substrait

        The serialized message must be an ExtendedExpression message that has
        only a single expression.  The name of the expression and the schema
        the expression was bound to will be ignored.  Use
        pyarrow.substrait.deserialize_expressions if this information is needed
        or if the message might contain multiple expressions.

        Parameters
        ----------
        buffer : bytes or Buffer
            The Substrait message to deserialize

        Returns
        -------
        Expression
            The deserialized expression
        """
        expressions = _pas().deserialize_expressions(buffer).expressions
        if len(expressions) == 0:
            raise ValueError("Substrait message did not contain any expressions")
        if len(expressions) > 1:
            raise ValueError(
                "Substrait message contained multiple expressions.  Use pyarrow.substrait.deserialize_expressions instead")
        return next(iter(expressions.values()))

    def to_substrait(self, Schema schema not None, c_bool allow_arrow_extensions=False):
        """
        Serialize the expression using Substrait

        The expression will be serialized as an ExtendedExpression message that has a
        single expression named "expression"

        Parameters
        ----------
        schema : Schema
            The input schema the expression will be bound to
        allow_arrow_extensions : bool, default False
            If False then only functions that are part of the core Substrait function
            definitions will be allowed.  Set this to True to allow pyarrow-specific functions
            but the result may not be accepted by other compute libraries.

        Returns
        -------
        Buffer
            A buffer containing the serialized Protobuf plan.
        """
        return _pas().serialize_expressions([self], ["expression"], schema, allow_arrow_extensions=allow_arrow_extensions)

    @staticmethod
    def _deserialize(Buffer buffer not None):
        return Expression.wrap(GetResultValue(CDeserializeExpression(
            pyarrow_unwrap_buffer(buffer))))

    def __reduce__(self):
        buffer = pyarrow_wrap_buffer(GetResultValue(
            CSerializeExpression(self.expr)))
        return Expression._deserialize, (buffer,)

    @staticmethod
    cdef Expression _expr_or_scalar(object expr):
        if isinstance(expr, Expression):
            return (<Expression> expr)
        return (<Expression> Expression._scalar(expr))

    @staticmethod
    def _call(str function_name, list arguments, FunctionOptions options=None):
        cdef:
            vector[CExpression] c_arguments
            shared_ptr[CFunctionOptions] c_options

        for argument in arguments:
            if not isinstance(argument, Expression):
                # Attempt to help convert this to an expression
                try:
                    argument = Expression._scalar(argument)
                except ArrowInvalid:
                    raise TypeError(
                        "only other expressions allowed as arguments")
            c_arguments.push_back((<Expression> argument).expr)

        if options is not None:
            c_options = options.unwrap()

        return Expression.wrap(CMakeCallExpression(
            tobytes(function_name), move(c_arguments), c_options))

    def __richcmp__(self, other, int op):
        other = Expression._expr_or_scalar(other)
        return Expression._call({
            Py_EQ: "equal",
            Py_NE: "not_equal",
            Py_GT: "greater",
            Py_GE: "greater_equal",
            Py_LT: "less",
            Py_LE: "less_equal",
        }[op], [self, other])

    def __bool__(self):
        raise ValueError(
            "An Expression cannot be evaluated to python True or False. "
            "If you are using the 'and', 'or' or 'not' operators, use '&', "
            "'|' or '~' instead."
        )

    def __invert__(self):
        return Expression._call("invert", [self])

    def __and__(Expression self, other):
        other = Expression._expr_or_scalar(other)
        return Expression._call("and_kleene", [self, other])

    def __or__(Expression self, other):
        other = Expression._expr_or_scalar(other)
        return Expression._call("or_kleene", [self, other])

    def __add__(Expression self, other):
        other = Expression._expr_or_scalar(other)
        return Expression._call("add_checked", [self, other])

    def __mul__(Expression self, other):
        other = Expression._expr_or_scalar(other)
        return Expression._call("multiply_checked", [self, other])

    def __sub__(Expression self, other):
        other = Expression._expr_or_scalar(other)
        return Expression._call("subtract_checked", [self, other])

    def __truediv__(Expression self, other):
        other = Expression._expr_or_scalar(other)
        return Expression._call("divide_checked", [self, other])

    def is_valid(self):
        """
        Check whether the expression is not-null (valid).

        This creates a new expression equivalent to calling the
        `is_valid` compute function on this expression.

        Returns
        -------
        is_valid : Expression
        """
        return Expression._call("is_valid", [self])

    def is_null(self, bint nan_is_null=False):
        """
        Check whether the expression is null.

        This creates a new expression equivalent to calling the
        `is_null` compute function on this expression.

        Parameters
        ----------
        nan_is_null : boolean, default False
            Whether floating-point NaNs are considered null.

        Returns
        -------
        is_null : Expression
        """
        options = NullOptions(nan_is_null=nan_is_null)
        return Expression._call("is_null", [self], options)

    def is_nan(self):
        """
        Check whether the expression is NaN.

        This creates a new expression equivalent to calling the
        `is_nan` compute function on this expression.

        Returns
        -------
        is_nan : Expression
        """
        return Expression._call("is_nan", [self])

    def cast(self, type=None, safe=None, options=None):
        """
        Explicitly set or change the expression's data type.

        This creates a new expression equivalent to calling the
        `cast` compute function on this expression.

        Parameters
        ----------
        type : DataType, default None
            Type to cast array to.
        safe : boolean, default True
            Whether to check for conversion errors such as overflow.
        options : CastOptions, default None
            Additional checks pass by CastOptions

        Returns
        -------
        cast : Expression
        """
        safe_vars_passed = (safe is not None) or (type is not None)

        if safe_vars_passed and (options is not None):
            raise ValueError("Must either pass values for 'type' and 'safe' or pass a "
                             "value for 'options'")

        if options is None:
            type = ensure_type(type, allow_none=False)
            if safe is False:
                options = CastOptions.unsafe(type)
            else:
                options = CastOptions.safe(type)
        return Expression._call("cast", [self], options)

    def isin(self, values):
        """
        Check whether the expression is contained in values.

        This creates a new expression equivalent to calling the
        `is_in` compute function on this expression.

        Parameters
        ----------
        values : Array or iterable
            The values to check for.

        Returns
        -------
        isin : Expression
            A new expression that, when evaluated, checks whether
            this expression's value is contained in `values`.
        """
        if not isinstance(values, Array):
            values = lib.array(values)

        options = SetLookupOptions(values)
        return Expression._call("is_in", [self], options)

    @staticmethod
    def _field(name_or_idx not None):
        cdef:
            CFieldRef c_field

        if isinstance(name_or_idx, int):
            return Expression.wrap(CMakeFieldExpressionByIndex(name_or_idx))
        else:
            c_field = CFieldRef(<c_string> tobytes(name_or_idx))
            return Expression.wrap(CMakeFieldExpression(c_field))

    @staticmethod
    def _nested_field(tuple names not None):
        cdef:
            vector[CFieldRef] nested

        if len(names) == 0:
            raise ValueError("nested field reference should be non-empty")
        nested.reserve(len(names))
        for name in names:
            if isinstance(name, int):
                nested.push_back(CFieldRef(<int>name))
            else:
                nested.push_back(CFieldRef(<c_string> tobytes(name)))
        return Expression.wrap(CMakeFieldExpression(CFieldRef(move(nested))))

    @staticmethod
    def _scalar(value):
        cdef:
            Scalar scalar

        if isinstance(value, Scalar):
            scalar = value
        else:
            scalar = lib.scalar(value)

        return Expression.wrap(CMakeScalarExpression(scalar.unwrap()))


_deserialize = Expression._deserialize
cdef CExpression _true = CMakeScalarExpression(
    <shared_ptr[CScalar]> make_shared[CBooleanScalar](True)
)


cdef CExpression _bind(Expression filter, Schema schema) except *:
    assert schema is not None

    if filter is None:
        return _true

    return GetResultValue(filter.unwrap().Bind(
        deref(pyarrow_unwrap_schema(schema).get())))


cdef class UdfContext:
    """
    Per-invocation function context/state.

    This object will always be the first argument to a user-defined
    function. It should not be used outside of a call to the function.
    """

    def __init__(self):
        raise TypeError("Do not call {}'s constructor directly"
                        .format(self.__class__.__name__))

    cdef void init(self, const CUdfContext &c_context):
        self.c_context = c_context

    @property
    def batch_length(self):
        """
        The common length of all input arguments (int).

        In the case that all arguments are scalars, this value
        is used to pass the "actual length" of the arguments,
        e.g. because the scalar values are encoding a column
        with a constant value.
        """
        return self.c_context.batch_length

    @property
    def memory_pool(self):
        """
        A memory pool for allocations (:class:`MemoryPool`).

        This is the memory pool supplied by the user when they invoked
        the function and it should be used in any calls to arrow that the
        UDF makes if that call accepts a memory_pool.
        """
        return box_memory_pool(self.c_context.pool)


cdef inline CFunctionDoc _make_function_doc(dict func_doc) except *:
    """
    Helper function to generate the FunctionDoc
    This function accepts a dictionary and expects the
    summary(str), description(str) and arg_names(List[str]) keys.
    """
    cdef:
        CFunctionDoc f_doc
        vector[c_string] c_arg_names

    f_doc.summary = tobytes(func_doc["summary"])
    f_doc.description = tobytes(func_doc["description"])
    for arg_name in func_doc["arg_names"]:
        c_arg_names.push_back(tobytes(arg_name))
    f_doc.arg_names = c_arg_names
    # UDFOptions integration:
    # TODO: https://issues.apache.org/jira/browse/ARROW-16041
    f_doc.options_class = b""
    f_doc.options_required = False
    return f_doc


cdef object box_udf_context(const CUdfContext& c_context):
    cdef UdfContext context = UdfContext.__new__(UdfContext)
    context.init(c_context)
    return context


cdef _udf_callback(user_function, const CUdfContext& c_context, inputs):
    """
    Helper callback function used to wrap the UdfContext from Python to C++
    execution.
    """
    context = box_udf_context(c_context)
    return user_function(context, *inputs)


def _get_udf_context(memory_pool, batch_length):
    cdef CUdfContext c_context
    c_context.pool = maybe_unbox_memory_pool(memory_pool)
    c_context.batch_length = batch_length
    context = box_udf_context(c_context)
    return context


ctypedef CStatus (*CRegisterUdf)(PyObject* function, function[CallbackUdf] wrapper,
                                 const CUdfOptions& options, CFunctionRegistry* registry)

cdef class RegisterUdf(_Weakrefable):
    cdef CRegisterUdf register_func

    cdef void init(self, const CRegisterUdf register_func):
        self.register_func = register_func


cdef get_register_scalar_function():
    cdef RegisterUdf reg = RegisterUdf.__new__(RegisterUdf)
    reg.register_func = RegisterScalarFunction
    return reg


cdef get_register_tabular_function():
    cdef RegisterUdf reg = RegisterUdf.__new__(RegisterUdf)
    reg.register_func = RegisterTabularFunction
    return reg


cdef get_register_aggregate_function():
    cdef RegisterUdf reg = RegisterUdf.__new__(RegisterUdf)
    reg.register_func = RegisterAggregateFunction
    return reg

cdef get_register_vector_function():
    cdef RegisterUdf reg = RegisterUdf.__new__(RegisterUdf)
    reg.register_func = RegisterVectorFunction
    return reg


def register_scalar_function(func, function_name, function_doc, in_types, out_type,
                             func_registry=None):
    """
    Register a user-defined scalar function.

    This API is EXPERIMENTAL.

    A scalar function is a function that executes elementwise
    operations on arrays or scalars, i.e. a scalar function must
    be computed row-by-row with no state where each output row
    is computed only from its corresponding input row.
    In other words, all argument arrays have the same length,
    and the output array is of the same length as the arguments.
    Scalar functions are the only functions allowed in query engine
    expressions.

    Parameters
    ----------
    func : callable
        A callable implementing the user-defined function.
        The first argument is the context argument of type
        UdfContext.
        Then, it must take arguments equal to the number of
        in_types defined. It must return an Array or Scalar
        matching the out_type. It must return a Scalar if
        all arguments are scalar, else it must return an Array.

        To define a varargs function, pass a callable that takes
        *args. The last in_type will be the type of all varargs
        arguments.
    function_name : str
        Name of the function. There should only be one function
        registered with this name in the function registry.
    function_doc : dict
        A dictionary object with keys "summary" (str),
        and "description" (str).
    in_types : Dict[str, DataType]
        A dictionary mapping function argument names to
        their respective DataType.
        The argument names will be used to generate
        documentation for the function. The number of
        arguments specified here determines the function
        arity.
    out_type : DataType
        Output type of the function.
    func_registry : FunctionRegistry
        Optional function registry to use instead of the default global one.

    Examples
    --------
    >>> import pyarrow as pa
    >>> import pyarrow.compute as pc
    >>>
    >>> func_doc = {}
    >>> func_doc["summary"] = "simple udf"
    >>> func_doc["description"] = "add a constant to a scalar"
    >>>
    >>> def add_constant(ctx, array):
    ...     return pc.add(array, 1, memory_pool=ctx.memory_pool)
    >>>
    >>> func_name = "py_add_func"
    >>> in_types = {"array": pa.int64()}
    >>> out_type = pa.int64()
    >>> pc.register_scalar_function(add_constant, func_name, func_doc,
    ...                   in_types, out_type)
    >>>
    >>> func = pc.get_function(func_name)
    >>> func.name
    'py_add_func'
    >>> answer = pc.call_function(func_name, [pa.array([20])])
    >>> answer
    <pyarrow.lib.Int64Array object at ...>
    [
      21
    ]
    """
    return _register_user_defined_function(get_register_scalar_function(),
                                           func, function_name, function_doc, in_types,
                                           out_type, func_registry)


def register_vector_function(func, function_name, function_doc, in_types, out_type,
                             func_registry=None):
    """
    Register a user-defined vector function.

    This API is EXPERIMENTAL.

    A vector function is a function that executes vector
    operations on arrays. Vector function is often used
    when compute doesn't fit other more specific types of
    functions (e.g., scalar and aggregate).

    Parameters
    ----------
    func : callable
        A callable implementing the user-defined function.
        The first argument is the context argument of type
        UdfContext.
        Then, it must take arguments equal to the number of
        in_types defined. It must return an Array or Scalar
        matching the out_type. It must return a Scalar if
        all arguments are scalar, else it must return an Array.

        To define a varargs function, pass a callable that takes
        *args. The last in_type will be the type of all varargs
        arguments.
    function_name : str
        Name of the function. There should only be one function
        registered with this name in the function registry.
    function_doc : dict
        A dictionary object with keys "summary" (str),
        and "description" (str).
    in_types : Dict[str, DataType]
        A dictionary mapping function argument names to
        their respective DataType.
        The argument names will be used to generate
        documentation for the function. The number of
        arguments specified here determines the function
        arity.
    out_type : DataType
        Output type of the function.
    func_registry : FunctionRegistry
        Optional function registry to use instead of the default global one.

    Examples
    --------
    >>> import pyarrow as pa
    >>> import pyarrow.compute as pc
    >>>
    >>> func_doc = {}
    >>> func_doc["summary"] = "percent rank"
    >>> func_doc["description"] = "compute percent rank"
    >>>
    >>> def list_flatten_udf(ctx, x):
    ...     return pc.list_flatten(x)
    >>>
    >>> func_name = "list_flatten_udf"
    >>> in_types = {"array": pa.list_(pa.int64())}
    >>> out_type = pa.int64()
    >>> pc.register_vector_function(list_flatten_udf, func_name, func_doc,
    ...                   in_types, out_type)
    >>>
    >>> answer = pc.call_function(func_name, [pa.array([[1, 2], [3, 4]])])
    >>> answer
    <pyarrow.lib.Int64Array object at ...>
    [
      1,
      2,
      3,
      4
    ]
    """
    return _register_user_defined_function(get_register_vector_function(),
                                           func, function_name, function_doc, in_types,
                                           out_type, func_registry)


def register_aggregate_function(func, function_name, function_doc, in_types, out_type,
                                func_registry=None):
    """
    Register a user-defined non-decomposable aggregate function.

    This API is EXPERIMENTAL.

    A non-decomposable aggregation function is a function that executes
    aggregate operations on the whole data that it is aggregating.
    In other words, non-decomposable aggregate function cannot be
    split into consume/merge/finalize steps.

    This is often used with ordered or segmented aggregation where groups
    can be emit before accumulating all of the input data.

    Note that currently the size of any input column cannot exceed 2 GB
    for a single segment (all groups combined).

    Parameters
    ----------
    func : callable
        A callable implementing the user-defined function.
        The first argument is the context argument of type
        UdfContext.
        Then, it must take arguments equal to the number of
        in_types defined. It must return a Scalar matching the
        out_type.
        To define a varargs function, pass a callable that takes
        *args. The in_type needs to match in type of inputs when
        the function gets called.
    function_name : str
        Name of the function. This name must be unique, i.e.,
        there should only be one function registered with
        this name in the function registry.
    function_doc : dict
        A dictionary object with keys "summary" (str),
        and "description" (str).
    in_types : Dict[str, DataType]
        A dictionary mapping function argument names to
        their respective DataType.
        The argument names will be used to generate
        documentation for the function. The number of
        arguments specified here determines the function
        arity.
    out_type : DataType
        Output type of the function.
    func_registry : FunctionRegistry
        Optional function registry to use instead of the default global one.

    Examples
    --------
    >>> import numpy as np
    >>> import pyarrow as pa
    >>> import pyarrow.compute as pc
    >>>
    >>> func_doc = {}
    >>> func_doc["summary"] = "simple median udf"
    >>> func_doc["description"] = "compute median"
    >>>
    >>> def compute_median(ctx, array):
    ...     return pa.scalar(np.median(array))
    >>>
    >>> func_name = "py_compute_median"
    >>> in_types = {"array": pa.int64()}
    >>> out_type = pa.float64()
    >>> pc.register_aggregate_function(compute_median, func_name, func_doc,
    ...                   in_types, out_type)
    >>>
    >>> func = pc.get_function(func_name)
    >>> func.name
    'py_compute_median'
    >>> answer = pc.call_function(func_name, [pa.array([20, 40])])
    >>> answer
    <pyarrow.DoubleScalar: 30.0>
    >>> table = pa.table([pa.array([1, 1, 2, 2]), pa.array([10, 20, 30, 40])], names=['k', 'v'])
    >>> result = table.group_by('k').aggregate([('v', 'py_compute_median')])
    >>> result
    pyarrow.Table
    k: int64
    v_py_compute_median: double
    ----
    k: [[1,2]]
    v_py_compute_median: [[15,35]]
    """
    return _register_user_defined_function(get_register_aggregate_function(),
                                           func, function_name, function_doc, in_types,
                                           out_type, func_registry)


def register_tabular_function(func, function_name, function_doc, in_types, out_type,
                              func_registry=None):
    """
    Register a user-defined tabular function.

    This API is EXPERIMENTAL.

    A tabular function is one accepting a context argument of type
    UdfContext and returning a generator of struct arrays.
    The in_types argument must be empty and the out_type argument
    specifies a schema. Each struct array must have field types
    corresponding to the schema.

    Parameters
    ----------
    func : callable
        A callable implementing the user-defined function.
        The only argument is the context argument of type
        UdfContext. It must return a callable that
        returns on each invocation a StructArray matching
        the out_type, where an empty array indicates end.
    function_name : str
        Name of the function. There should only be one function
        registered with this name in the function registry.
    function_doc : dict
        A dictionary object with keys "summary" (str),
        and "description" (str).
    in_types : Dict[str, DataType]
        Must be an empty dictionary (reserved for future use).
    out_type : Union[Schema, DataType]
        Schema of the function's output, or a corresponding flat struct type.
    func_registry : FunctionRegistry
        Optional function registry to use instead of the default global one.
    """
    cdef:
        shared_ptr[CSchema] c_schema
        shared_ptr[CDataType] c_type

    if isinstance(out_type, Schema):
        c_schema = pyarrow_unwrap_schema(out_type)
        with nogil:
            c_type = <shared_ptr[CDataType]>make_shared[CStructType](deref(c_schema).fields())
        out_type = pyarrow_wrap_data_type(c_type)
    return _register_user_defined_function(get_register_tabular_function(),
                                           func, function_name, function_doc, in_types,
                                           out_type, func_registry)


def _register_user_defined_function(register_func, func, function_name, function_doc, in_types,
                                    out_type, func_registry=None):
    """
    Register a user-defined function.

    This method itself doesn't care about the type of the UDF
    (i.e., scalar vs tabular vs aggregate)

    Parameters
    ----------
    register_func: object
        An object holding a CRegisterUdf in a "register_func" attribute.
    func : callable
        A callable implementing the user-defined function.
    function_name : str
        Name of the function. There should only be one function
        registered with this name in the function registry.
    function_doc : dict
        A dictionary object with keys "summary" (str),
        and "description" (str).
    in_types : Dict[str, DataType]
        A dictionary mapping function argument names to
        their respective DataType.
    out_type : DataType
        Output type of the function.
    func_registry : FunctionRegistry
        Optional function registry to use instead of the default global one.
    """
    cdef:
        CRegisterUdf c_register_func
        c_string c_func_name
        CArity c_arity
        CFunctionDoc c_func_doc
        vector[shared_ptr[CDataType]] c_in_types
        PyObject* c_function
        shared_ptr[CDataType] c_out_type
        CUdfOptions c_options
        CFunctionRegistry* c_func_registry

    if callable(func):
        c_function = <PyObject*>func
    else:
        raise TypeError("func must be a callable")

    c_func_name = tobytes(function_name)

    func_spec = inspect.getfullargspec(func)
    num_args = -1
    if isinstance(in_types, dict):
        for in_type in in_types.values():
            c_in_types.push_back(
                pyarrow_unwrap_data_type(ensure_type(in_type)))
        function_doc["arg_names"] = in_types.keys()
        num_args = len(in_types)
    else:
        raise TypeError(
            "in_types must be a dictionary of DataType")

    c_arity = CArity(<int> num_args, func_spec.varargs)

    if "summary" not in function_doc:
        raise ValueError("Function doc must contain a summary")

    if "description" not in function_doc:
        raise ValueError("Function doc must contain a description")

    if "arg_names" not in function_doc:
        raise ValueError("Function doc must contain arg_names")

    c_func_doc = _make_function_doc(function_doc)

    c_out_type = pyarrow_unwrap_data_type(ensure_type(out_type))

    c_options.func_name = c_func_name
    c_options.arity = c_arity
    c_options.func_doc = c_func_doc
    c_options.input_types = c_in_types
    c_options.output_type = c_out_type

    if func_registry is None:
        c_func_registry = NULL
    else:
        c_func_registry = (<FunctionRegistry>func_registry).registry

    c_register_func = (<RegisterUdf>register_func).register_func

    check_status(c_register_func(c_function,
                                 <function[CallbackUdf]> &_udf_callback,
                                 c_options, c_func_registry))


def call_tabular_function(function_name, args=None, func_registry=None):
    """
    Get a record batch iterator from a tabular function.

    Parameters
    ----------
    function_name : str
        Name of the function.
    args : iterable
        The arguments to pass to the function.  Accepted types depend
        on the specific function.  Currently, only an empty args is supported.
    func_registry : FunctionRegistry
        Optional function registry to use instead of the default global one.
    """
    cdef:
        c_string c_func_name
        vector[CDatum] c_args
        CFunctionRegistry* c_func_registry
        shared_ptr[CRecordBatchReader] c_reader
        RecordBatchReader reader

    c_func_name = tobytes(function_name)
    if func_registry is None:
        c_func_registry = NULL
    else:
        c_func_registry = (<FunctionRegistry>func_registry).registry
    if args is None:
        args = []
    _pack_compute_args(args, &c_args)

    with nogil:
        c_reader = GetResultValue(CallTabularFunction(
            c_func_name, c_args, c_func_registry))
    reader = RecordBatchReader.__new__(RecordBatchReader)
    reader.reader = c_reader
    return RecordBatchReader.from_batches(pyarrow_wrap_schema(deref(c_reader).schema()), reader)