# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# cython: language_level = 3
import sys
from cpython.object cimport Py_LT, Py_EQ, Py_GT, Py_LE, Py_NE, Py_GE
from cython.operator cimport dereference as deref
from collections import namedtuple
from pyarrow.lib import frombytes, tobytes, ArrowInvalid
from pyarrow.lib cimport *
from pyarrow.includes.common cimport *
from pyarrow.includes.libarrow cimport *
import pyarrow.lib as lib
from pyarrow.util import _DEPR_MSG
from libcpp cimport bool as c_bool
import inspect
import numpy as np
import warnings
__pas = None
_substrait_msg = (
"The pyarrow installation is not built with support for Substrait."
)
def _pas():
global __pas
if __pas is None:
try:
import pyarrow.substrait as pas
__pas = pas
except ImportError:
raise ImportError(_substrait_msg)
return __pas
def _forbid_instantiation(klass, subclasses_instead=True):
msg = '{} is an abstract class thus cannot be initialized.'.format(
klass.__name__
)
if subclasses_instead:
subclasses = [cls.__name__ for cls in klass.__subclasses__]
msg += ' Use one of the subclasses instead: {}'.format(
', '.join(subclasses)
)
raise TypeError(msg)
cdef wrap_scalar_function(const shared_ptr[CFunction]& sp_func):
"""
Wrap a C++ scalar Function in a ScalarFunction object.
"""
cdef ScalarFunction func = ScalarFunction.__new__(ScalarFunction)
func.init(sp_func)
return func
cdef wrap_vector_function(const shared_ptr[CFunction]& sp_func):
"""
Wrap a C++ vector Function in a VectorFunction object.
"""
cdef VectorFunction func = VectorFunction.__new__(VectorFunction)
func.init(sp_func)
return func
cdef wrap_scalar_aggregate_function(const shared_ptr[CFunction]& sp_func):
"""
Wrap a C++ aggregate Function in a ScalarAggregateFunction object.
"""
cdef ScalarAggregateFunction func = \
ScalarAggregateFunction.__new__(ScalarAggregateFunction)
func.init(sp_func)
return func
cdef wrap_hash_aggregate_function(const shared_ptr[CFunction]& sp_func):
"""
Wrap a C++ aggregate Function in a HashAggregateFunction object.
"""
cdef HashAggregateFunction func = \
HashAggregateFunction.__new__(HashAggregateFunction)
func.init(sp_func)
return func
cdef wrap_meta_function(const shared_ptr[CFunction]& sp_func):
"""
Wrap a C++ meta Function in a MetaFunction object.
"""
cdef MetaFunction func = MetaFunction.__new__(MetaFunction)
func.init(sp_func)
return func
cdef wrap_function(const shared_ptr[CFunction]& sp_func):
"""
Wrap a C++ Function in a Function object.
This dispatches to specialized wrappers depending on the function kind.
"""
if sp_func.get() == NULL:
raise ValueError("Function was NULL")
cdef FunctionKind c_kind = sp_func.get().kind()
if c_kind == FunctionKind_SCALAR:
return wrap_scalar_function(sp_func)
elif c_kind == FunctionKind_VECTOR:
return wrap_vector_function(sp_func)
elif c_kind == FunctionKind_SCALAR_AGGREGATE:
return wrap_scalar_aggregate_function(sp_func)
elif c_kind == FunctionKind_HASH_AGGREGATE:
return wrap_hash_aggregate_function(sp_func)
elif c_kind == FunctionKind_META:
return wrap_meta_function(sp_func)
else:
raise NotImplementedError("Unknown Function::Kind")
cdef wrap_scalar_kernel(const CScalarKernel* c_kernel):
if c_kernel == NULL:
raise ValueError("Kernel was NULL")
cdef ScalarKernel kernel = ScalarKernel.__new__(ScalarKernel)
kernel.init(c_kernel)
return kernel
cdef wrap_vector_kernel(const CVectorKernel* c_kernel):
if c_kernel == NULL:
raise ValueError("Kernel was NULL")
cdef VectorKernel kernel = VectorKernel.__new__(VectorKernel)
kernel.init(c_kernel)
return kernel
cdef wrap_scalar_aggregate_kernel(const CScalarAggregateKernel* c_kernel):
if c_kernel == NULL:
raise ValueError("Kernel was NULL")
cdef ScalarAggregateKernel kernel = \
ScalarAggregateKernel.__new__(ScalarAggregateKernel)
kernel.init(c_kernel)
return kernel
cdef wrap_hash_aggregate_kernel(const CHashAggregateKernel* c_kernel):
if c_kernel == NULL:
raise ValueError("Kernel was NULL")
cdef HashAggregateKernel kernel = \
HashAggregateKernel.__new__(HashAggregateKernel)
kernel.init(c_kernel)
return kernel
cdef class Kernel(_Weakrefable):
"""
A kernel object.
Kernels handle the execution of a Function for a certain signature.
"""
def __init__(self):
raise TypeError("Do not call {}'s constructor directly"
.format(self.__class__.__name__))
cdef class ScalarKernel(Kernel):
cdef const CScalarKernel* kernel
cdef void init(self, const CScalarKernel* kernel) except *:
self.kernel = kernel
def __repr__(self):
return ("ScalarKernel<{}>"
.format(frombytes(self.kernel.signature.get().ToString())))
cdef class VectorKernel(Kernel):
cdef const CVectorKernel* kernel
cdef void init(self, const CVectorKernel* kernel) except *:
self.kernel = kernel
def __repr__(self):
return ("VectorKernel<{}>"
.format(frombytes(self.kernel.signature.get().ToString())))
cdef class ScalarAggregateKernel(Kernel):
cdef const CScalarAggregateKernel* kernel
cdef void init(self, const CScalarAggregateKernel* kernel) except *:
self.kernel = kernel
def __repr__(self):
return ("ScalarAggregateKernel<{}>"
.format(frombytes(self.kernel.signature.get().ToString())))
cdef class HashAggregateKernel(Kernel):
cdef const CHashAggregateKernel* kernel
cdef void init(self, const CHashAggregateKernel* kernel) except *:
self.kernel = kernel
def __repr__(self):
return ("HashAggregateKernel<{}>"
.format(frombytes(self.kernel.signature.get().ToString())))
FunctionDoc = namedtuple(
"FunctionDoc",
("summary", "description", "arg_names", "options_class",
"options_required"))
cdef class Function(_Weakrefable):
"""
A compute function.
A function implements a certain logical computation over a range of
possible input signatures. Each signature accepts a range of input
types and is implemented by a given Kernel.
Functions can be of different kinds:
* "scalar" functions apply an item-wise computation over all items
of their inputs. Each item in the output only depends on the values
of the inputs at the same position. Examples: addition, comparisons,
string predicates...
* "vector" functions apply a collection-wise computation, such that
each item in the output may depend on the values of several items
in each input. Examples: dictionary encoding, sorting, extracting
unique values...
* "scalar_aggregate" functions reduce the dimensionality of the inputs by
applying a reduction function. Examples: sum, min_max, mode...
* "hash_aggregate" functions apply a reduction function to an input
subdivided by grouping criteria. They may not be directly called.
Examples: hash_sum, hash_min_max...
* "meta" functions dispatch to other functions.
"""
cdef:
shared_ptr[CFunction] sp_func
CFunction* base_func
_kind_map = {
FunctionKind_SCALAR: "scalar",
FunctionKind_VECTOR: "vector",
FunctionKind_SCALAR_AGGREGATE: "scalar_aggregate",
FunctionKind_HASH_AGGREGATE: "hash_aggregate",
FunctionKind_META: "meta",
}
def __init__(self):
raise TypeError("Do not call {}'s constructor directly"
.format(self.__class__.__name__))
cdef void init(self, const shared_ptr[CFunction]& sp_func) except *:
self.sp_func = sp_func
self.base_func = sp_func.get()
def __repr__(self):
return ("arrow.compute.Function<name={}, kind={}, "
"arity={}, num_kernels={}>"
.format(self.name, self.kind, self.arity, self.num_kernels))
def __reduce__(self):
# Reduction uses the global registry
return get_function, (self.name,)
@property
def name(self):
"""
The function name.
"""
return frombytes(self.base_func.name())
@property
def arity(self):
"""
The function arity.
If Ellipsis (i.e. `...`) is returned, the function takes a variable
number of arguments.
"""
cdef CArity arity = self.base_func.arity()
if arity.is_varargs:
return ...
else:
return arity.num_args
@property
def kind(self):
"""
The function kind.
"""
cdef FunctionKind c_kind = self.base_func.kind()
try:
return self._kind_map[c_kind]
except KeyError:
raise NotImplementedError("Unknown Function::Kind")
@property
def _doc(self):
"""
The C++-like function documentation (for internal use).
"""
cdef CFunctionDoc c_doc = self.base_func.doc()
return FunctionDoc(frombytes(c_doc.summary),
frombytes(c_doc.description),
[frombytes(s) for s in c_doc.arg_names],
frombytes(c_doc.options_class),
c_doc.options_required)
@property
def num_kernels(self):
"""
The number of kernels implementing this function.
"""
return self.base_func.num_kernels()
Loading ...