# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Cython wrappers for IO interfaces defined in arrow::io and messaging in
# arrow::ipc
from libc.stdlib cimport malloc, free
import codecs
import pickle
import re
import sys
import threading
import time
import warnings
from io import BufferedIOBase, IOBase, TextIOBase, UnsupportedOperation
from queue import Queue, Empty as QueueEmpty
from pyarrow.lib cimport check_status, HaveLibHdfs
from pyarrow.util import _is_path_like, _stringify_path
# 64K
DEFAULT_BUFFER_SIZE = 2 ** 16
cdef extern from "Python.h":
# To let us get a PyObject* and avoid Cython auto-ref-counting
PyObject* PyBytes_FromStringAndSizeNative" PyBytes_FromStringAndSize"(
char *v, Py_ssize_t len) except NULL
# Workaround https://github.com/cython/cython/issues/4707
bytearray PyByteArray_FromStringAndSize(char *string, Py_ssize_t len)
def have_libhdfs():
"""
Return true if HDFS (HadoopFileSystem) library is set up correctly.
"""
try:
with nogil:
check_status(HaveLibHdfs())
return True
except Exception:
return False
def io_thread_count():
"""
Return the number of threads to use for I/O operations.
Many operations, such as scanning a dataset, will implicitly make
use of this pool. The number of threads is set to a fixed value at
startup. It can be modified at runtime by calling
:func:`set_io_thread_count()`.
See Also
--------
set_io_thread_count : Modify the size of this pool.
cpu_count : The analogous function for the CPU thread pool.
"""
return GetIOThreadPoolCapacity()
def set_io_thread_count(int count):
"""
Set the number of threads to use for I/O operations.
Many operations, such as scanning a dataset, will implicitly make
use of this pool.
Parameters
----------
count : int
The max number of threads that may be used for I/O.
Must be positive.
See Also
--------
io_thread_count : Get the size of this pool.
set_cpu_count : The analogous function for the CPU thread pool.
"""
if count < 1:
raise ValueError("IO thread count must be strictly positive")
check_status(SetIOThreadPoolCapacity(count))
cdef class NativeFile(_Weakrefable):
"""
The base class for all Arrow streams.
Streams are either readable, writable, or both.
They optionally support seeking.
While this class exposes methods to read or write data from Python, the
primary intent of using a Arrow stream is to pass it to other Arrow
facilities that will make use of it, such as Arrow IPC routines.
Be aware that there are subtle differences with regular Python files,
e.g. destroying a writable Arrow stream without closing it explicitly
will not flush any pending data.
"""
# Default chunk size for chunked reads.
# Use a large enough value for networked filesystems.
_default_chunk_size = 256 * 1024
def __cinit__(self):
self.own_file = False
self.is_readable = False
self.is_writable = False
self.is_seekable = False
self._is_appending = False
def __dealloc__(self):
if self.own_file:
self.close()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, tb):
self.close()
def __repr__(self):
name = f"pyarrow.{self.__class__.__name__}"
return (f"<{name} "
f"closed={self.closed} "
f"own_file={self.own_file} "
f"is_seekable={self.is_seekable} "
f"is_writable={self.is_writable} "
f"is_readable={self.is_readable}>")
@property
def mode(self):
"""
The file mode. Currently instances of NativeFile may support:
* rb: binary read
* wb: binary write
* rb+: binary read and write
* ab: binary append
"""
# Emulate built-in file modes
if self.is_readable and self.is_writable:
return 'rb+'
elif self.is_readable:
return 'rb'
elif self.is_writable and self._is_appending:
return 'ab'
elif self.is_writable:
return 'wb'
else:
raise ValueError('File object is malformed, has no mode')
def readable(self):
self._assert_open()
return self.is_readable
def writable(self):
self._assert_open()
return self.is_writable
def seekable(self):
self._assert_open()
return self.is_seekable
def isatty(self):
self._assert_open()
return False
def fileno(self):
"""
NOT IMPLEMENTED
"""
raise UnsupportedOperation()
@property
def closed(self):
if self.is_readable:
return self.input_stream.get().closed()
elif self.is_writable:
return self.output_stream.get().closed()
else:
return True
def close(self):
if not self.closed:
with nogil:
if self.is_readable:
check_status(self.input_stream.get().Close())
else:
check_status(self.output_stream.get().Close())
cdef set_random_access_file(self, shared_ptr[CRandomAccessFile] handle):
self.input_stream = <shared_ptr[CInputStream]> handle
self.random_access = handle
self.is_seekable = True
cdef set_input_stream(self, shared_ptr[CInputStream] handle):
self.input_stream = handle
self.random_access.reset()
self.is_seekable = False
cdef set_output_stream(self, shared_ptr[COutputStream] handle):
self.output_stream = handle
cdef shared_ptr[CRandomAccessFile] get_random_access_file(self) except *:
self._assert_readable()
self._assert_seekable()
return self.random_access
cdef shared_ptr[CInputStream] get_input_stream(self) except *:
self._assert_readable()
return self.input_stream
cdef shared_ptr[COutputStream] get_output_stream(self) except *:
self._assert_writable()
return self.output_stream
def _assert_open(self):
if self.closed:
raise ValueError("I/O operation on closed file")
def _assert_readable(self):
self._assert_open()
if not self.is_readable:
# XXX UnsupportedOperation
raise IOError("only valid on readable files")
def _assert_writable(self):
self._assert_open()
if not self.is_writable:
raise IOError("only valid on writable files")
def _assert_seekable(self):
self._assert_open()
if not self.is_seekable:
raise IOError("only valid on seekable files")
def size(self):
"""
Return file size
"""
cdef int64_t size
handle = self.get_random_access_file()
with nogil:
size = GetResultValue(handle.get().GetSize())
return size
def metadata(self):
"""
Return file metadata
"""
cdef:
shared_ptr[const CKeyValueMetadata] c_metadata
handle = self.get_input_stream()
with nogil:
c_metadata = GetResultValue(handle.get().ReadMetadata())
metadata = {}
if c_metadata.get() != nullptr:
for i in range(c_metadata.get().size()):
metadata[frombytes(c_metadata.get().key(i))] = \
c_metadata.get().value(i)
return metadata
def tell(self):
"""
Return current stream position
"""
cdef int64_t position
if self.is_readable:
rd_handle = self.get_random_access_file()
with nogil:
position = GetResultValue(rd_handle.get().Tell())
else:
wr_handle = self.get_output_stream()
with nogil:
position = GetResultValue(wr_handle.get().Tell())
return position
def seek(self, int64_t position, int whence=0):
"""
Change current file stream position
Parameters
----------
position : int
Byte offset, interpreted relative to value of whence argument
whence : int, default 0
Point of reference for seek offset
Notes
-----
Values of whence:
* 0 -- start of stream (the default); offset should be zero or positive
* 1 -- current stream position; offset may be negative
* 2 -- end of stream; offset is usually negative
Returns
-------
int
The new absolute stream position.
"""
cdef int64_t offset
handle = self.get_random_access_file()
with nogil:
if whence == 0:
offset = position
elif whence == 1:
offset = GetResultValue(handle.get().Tell())
offset = offset + position
elif whence == 2:
offset = GetResultValue(handle.get().GetSize())
offset = offset + position
else:
with gil:
raise ValueError("Invalid value of whence: {0}"
.format(whence))
check_status(handle.get().Seek(offset))
return self.tell()
def flush(self):
Loading ...