# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# cython: profile=False
# distutils: language = c++
# cython: language_level = 3
from libcpp.memory cimport shared_ptr
from libcpp.string cimport string as c_string
from libcpp.vector cimport vector as c_vector
from libcpp.unordered_set cimport unordered_set as c_unordered_set
from libc.stdint cimport int64_t, int32_t
from pyarrow.includes.libarrow cimport *
from pyarrow.lib cimport (DataType, Field, MemoryPool, RecordBatch,
Schema, check_status, pyarrow_wrap_array,
pyarrow_wrap_data_type, ensure_type, _Weakrefable,
pyarrow_wrap_field)
from pyarrow.includes.libgandiva cimport (
CCondition, CGandivaExpression,
CNode, CProjector, CFilter,
CSelectionVector,
_ensure_selection_mode,
CConfiguration,
CConfigurationBuilder,
TreeExprBuilder_MakeExpression,
TreeExprBuilder_MakeFunction,
TreeExprBuilder_MakeBoolLiteral,
TreeExprBuilder_MakeUInt8Literal,
TreeExprBuilder_MakeUInt16Literal,
TreeExprBuilder_MakeUInt32Literal,
TreeExprBuilder_MakeUInt64Literal,
TreeExprBuilder_MakeInt8Literal,
TreeExprBuilder_MakeInt16Literal,
TreeExprBuilder_MakeInt32Literal,
TreeExprBuilder_MakeInt64Literal,
TreeExprBuilder_MakeFloatLiteral,
TreeExprBuilder_MakeDoubleLiteral,
TreeExprBuilder_MakeStringLiteral,
TreeExprBuilder_MakeBinaryLiteral,
TreeExprBuilder_MakeField,
TreeExprBuilder_MakeIf,
TreeExprBuilder_MakeAnd,
TreeExprBuilder_MakeOr,
TreeExprBuilder_MakeCondition,
TreeExprBuilder_MakeInExpressionInt32,
TreeExprBuilder_MakeInExpressionInt64,
TreeExprBuilder_MakeInExpressionTime32,
TreeExprBuilder_MakeInExpressionTime64,
TreeExprBuilder_MakeInExpressionDate32,
TreeExprBuilder_MakeInExpressionDate64,
TreeExprBuilder_MakeInExpressionTimeStamp,
TreeExprBuilder_MakeInExpressionString,
SelectionVector_MakeInt16,
SelectionVector_MakeInt32,
SelectionVector_MakeInt64,
Projector_Make,
Filter_Make,
CFunctionSignature,
GetRegisteredFunctionSignatures)
cdef class Node(_Weakrefable):
cdef:
shared_ptr[CNode] node
def __init__(self):
raise TypeError("Do not call {}'s constructor directly, use the "
"TreeExprBuilder API directly"
.format(self.__class__.__name__))
@staticmethod
cdef create(shared_ptr[CNode] node):
cdef Node self = Node.__new__(Node)
self.node = node
return self
def __str__(self):
return self.node.get().ToString().decode()
def __repr__(self):
type_format = object.__repr__(self)
return '{0}\n{1}'.format(type_format, str(self))
def return_type(self):
return pyarrow_wrap_data_type(self.node.get().return_type())
cdef class Expression(_Weakrefable):
cdef:
shared_ptr[CGandivaExpression] expression
cdef void init(self, shared_ptr[CGandivaExpression] expression):
self.expression = expression
def __str__(self):
return self.expression.get().ToString().decode()
def __repr__(self):
type_format = object.__repr__(self)
return '{0}\n{1}'.format(type_format, str(self))
def root(self):
return Node.create(self.expression.get().root())
def result(self):
return pyarrow_wrap_field(self.expression.get().result())
cdef class Condition(_Weakrefable):
cdef:
shared_ptr[CCondition] condition
def __init__(self):
raise TypeError("Do not call {}'s constructor directly, use the "
"TreeExprBuilder API instead"
.format(self.__class__.__name__))
@staticmethod
cdef create(shared_ptr[CCondition] condition):
cdef Condition self = Condition.__new__(Condition)
self.condition = condition
return self
def __str__(self):
return self.condition.get().ToString().decode()
def __repr__(self):
type_format = object.__repr__(self)
return '{0}\n{1}'.format(type_format, str(self))
def root(self):
return Node.create(self.condition.get().root())
def result(self):
return pyarrow_wrap_field(self.condition.get().result())
cdef class SelectionVector(_Weakrefable):
cdef:
shared_ptr[CSelectionVector] selection_vector
def __init__(self):
raise TypeError("Do not call {}'s constructor directly."
.format(self.__class__.__name__))
@staticmethod
cdef create(shared_ptr[CSelectionVector] selection_vector):
cdef SelectionVector self = SelectionVector.__new__(SelectionVector)
self.selection_vector = selection_vector
return self
def to_array(self):
cdef shared_ptr[CArray] result = self.selection_vector.get().ToArray()
return pyarrow_wrap_array(result)
cdef class Projector(_Weakrefable):
cdef:
shared_ptr[CProjector] projector
MemoryPool pool
def __init__(self):
raise TypeError("Do not call {}'s constructor directly, use "
"make_projector instead"
.format(self.__class__.__name__))
@staticmethod
cdef create(shared_ptr[CProjector] projector, MemoryPool pool):
cdef Projector self = Projector.__new__(Projector)
self.projector = projector
self.pool = pool
return self
@property
def llvm_ir(self):
return self.projector.get().DumpIR().decode()
def evaluate(self, RecordBatch batch, SelectionVector selection=None):
"""
Evaluate the specified record batch and return the arrays at the
filtered positions.
Parameters
----------
batch : pyarrow.RecordBatch
selection : pyarrow.gandiva.SelectionVector
Returns
-------
list[pyarrow.Array]
"""
cdef vector[shared_ptr[CArray]] results
if selection is None:
check_status(self.projector.get().Evaluate(
batch.sp_batch.get()[0], self.pool.pool, &results))
else:
check_status(
self.projector.get().Evaluate(
batch.sp_batch.get()[0], selection.selection_vector.get(),
self.pool.pool, &results))
cdef shared_ptr[CArray] result
arrays = []
for result in results:
arrays.append(pyarrow_wrap_array(result))
return arrays
cdef class Filter(_Weakrefable):
cdef:
shared_ptr[CFilter] filter
def __init__(self):
raise TypeError("Do not call {}'s constructor directly, use "
"make_filter instead"
.format(self.__class__.__name__))
@staticmethod
cdef create(shared_ptr[CFilter] filter):
cdef Filter self = Filter.__new__(Filter)
self.filter = filter
return self
@property
def llvm_ir(self):
return self.filter.get().DumpIR().decode()
def evaluate(self, RecordBatch batch, MemoryPool pool, dtype='int32'):
"""
Evaluate the specified record batch and return a selection vector.
Parameters
----------
batch : pyarrow.RecordBatch
pool : MemoryPool
dtype : DataType or str, default int32
Returns
-------
pyarrow.gandiva.SelectionVector
"""
cdef:
DataType type = ensure_type(dtype)
shared_ptr[CSelectionVector] selection
if type.id == _Type_INT16:
check_status(SelectionVector_MakeInt16(
batch.num_rows, pool.pool, &selection))
elif type.id == _Type_INT32:
check_status(SelectionVector_MakeInt32(
batch.num_rows, pool.pool, &selection))
elif type.id == _Type_INT64:
check_status(SelectionVector_MakeInt64(
batch.num_rows, pool.pool, &selection))
else:
raise ValueError("'dtype' of the selection vector should be "
"one of 'int16', 'int32' and 'int64'.")
check_status(self.filter.get().Evaluate(
batch.sp_batch.get()[0], selection))
return SelectionVector.create(selection)
cdef class TreeExprBuilder(_Weakrefable):
def make_literal(self, value, dtype):
"""
Create a node on a literal.
Parameters
----------
value : a literal value
dtype : DataType
Returns
-------
pyarrow.gandiva.Node
"""
cdef:
DataType type = ensure_type(dtype)
shared_ptr[CNode] r
if type.id == _Type_BOOL:
r = TreeExprBuilder_MakeBoolLiteral(value)
elif type.id == _Type_UINT8:
r = TreeExprBuilder_MakeUInt8Literal(value)
elif type.id == _Type_UINT16:
r = TreeExprBuilder_MakeUInt16Literal(value)
elif type.id == _Type_UINT32:
r = TreeExprBuilder_MakeUInt32Literal(value)
elif type.id == _Type_UINT64:
r = TreeExprBuilder_MakeUInt64Literal(value)
elif type.id == _Type_INT8:
r = TreeExprBuilder_MakeInt8Literal(value)
elif type.id == _Type_INT16:
r = TreeExprBuilder_MakeInt16Literal(value)
elif type.id == _Type_INT32:
r = TreeExprBuilder_MakeInt32Literal(value)
elif type.id == _Type_INT64:
r = TreeExprBuilder_MakeInt64Literal(value)
elif type.id == _Type_FLOAT:
r = TreeExprBuilder_MakeFloatLiteral(value)
elif type.id == _Type_DOUBLE:
r = TreeExprBuilder_MakeDoubleLiteral(value)
elif type.id == _Type_STRING:
r = TreeExprBuilder_MakeStringLiteral(value.encode('UTF-8'))
elif type.id == _Type_BINARY:
r = TreeExprBuilder_MakeBinaryLiteral(value)
else:
raise TypeError("Didn't recognize dtype " + str(dtype))
return Node.create(r)
def make_expression(self, Node root_node not None,
Field return_field not None):
"""
Create an expression with the specified root_node,
and the result written to result_field.
Parameters
----------
root_node : pyarrow.gandiva.Node
return_field : pyarrow.Field
Returns
-------
pyarrow.gandiva.Expression
"""
cdef shared_ptr[CGandivaExpression] r = TreeExprBuilder_MakeExpression(
root_node.node, return_field.sp_field)
Loading ...