Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

arrow-nightlies / pyarrow   python

Repository URL to install this package:

/ io.pxi

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

# Cython wrappers for IO interfaces defined in arrow::io and messaging in
# arrow::ipc

from libc.stdlib cimport malloc, free

import codecs
import pickle
import re
import sys
import threading
import time
import warnings
from io import BufferedIOBase, IOBase, TextIOBase, UnsupportedOperation
from queue import Queue, Empty as QueueEmpty

from pyarrow.lib cimport check_status, HaveLibHdfs
from pyarrow.util import _is_path_like, _stringify_path


# 64K
DEFAULT_BUFFER_SIZE = 2 ** 16


cdef extern from "Python.h":
    # To let us get a PyObject* and avoid Cython auto-ref-counting
    PyObject* PyBytes_FromStringAndSizeNative" PyBytes_FromStringAndSize"(
        char *v, Py_ssize_t len) except NULL

    # Workaround https://github.com/cython/cython/issues/4707
    bytearray PyByteArray_FromStringAndSize(char *string, Py_ssize_t len)


def have_libhdfs():
    """
    Return true if HDFS (HadoopFileSystem) library is set up correctly.
    """
    try:
        with nogil:
            check_status(HaveLibHdfs())
        return True
    except Exception:
        return False


def io_thread_count():
    """
    Return the number of threads to use for I/O operations.

    Many operations, such as scanning a dataset, will implicitly make
    use of this pool. The number of threads is set to a fixed value at
    startup. It can be modified at runtime by calling
    :func:`set_io_thread_count()`.

    See Also
    --------
    set_io_thread_count : Modify the size of this pool.
    cpu_count : The analogous function for the CPU thread pool.
    """
    return GetIOThreadPoolCapacity()


def set_io_thread_count(int count):
    """
    Set the number of threads to use for I/O operations.

    Many operations, such as scanning a dataset, will implicitly make
    use of this pool.

    Parameters
    ----------
    count : int
        The max number of threads that may be used for I/O.
        Must be positive.

    See Also
    --------
    io_thread_count : Get the size of this pool.
    set_cpu_count : The analogous function for the CPU thread pool.
    """
    if count < 1:
        raise ValueError("IO thread count must be strictly positive")
    check_status(SetIOThreadPoolCapacity(count))


cdef class NativeFile(_Weakrefable):
    """
    The base class for all Arrow streams.

    Streams are either readable, writable, or both.
    They optionally support seeking.

    While this class exposes methods to read or write data from Python, the
    primary intent of using a Arrow stream is to pass it to other Arrow
    facilities that will make use of it, such as Arrow IPC routines.

    Be aware that there are subtle differences with regular Python files,
    e.g. destroying a writable Arrow stream without closing it explicitly
    will not flush any pending data.
    """

    # Default chunk size for chunked reads.
    # Use a large enough value for networked filesystems.
    _default_chunk_size = 256 * 1024

    def __cinit__(self):
        self.own_file = False
        self.is_readable = False
        self.is_writable = False
        self.is_seekable = False
        self._is_appending = False

    def __dealloc__(self):
        if self.own_file:
            self.close()

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, tb):
        self.close()

    def __repr__(self):
        name = f"pyarrow.{self.__class__.__name__}"
        return (f"<{name} "
                f"closed={self.closed} "
                f"own_file={self.own_file} "
                f"is_seekable={self.is_seekable} "
                f"is_writable={self.is_writable} "
                f"is_readable={self.is_readable}>")

    @property
    def mode(self):
        """
        The file mode. Currently instances of NativeFile may support:

        * rb: binary read
        * wb: binary write
        * rb+: binary read and write
        * ab: binary append
        """
        # Emulate built-in file modes
        if self.is_readable and self.is_writable:
            return 'rb+'
        elif self.is_readable:
            return 'rb'
        elif self.is_writable and self._is_appending:
            return 'ab'
        elif self.is_writable:
            return 'wb'
        else:
            raise ValueError('File object is malformed, has no mode')

    def readable(self):
        self._assert_open()
        return self.is_readable

    def writable(self):
        self._assert_open()
        return self.is_writable

    def seekable(self):
        self._assert_open()
        return self.is_seekable

    def isatty(self):
        self._assert_open()
        return False

    def fileno(self):
        """
        NOT IMPLEMENTED
        """
        raise UnsupportedOperation()

    @property
    def closed(self):
        if self.is_readable:
            return self.input_stream.get().closed()
        elif self.is_writable:
            return self.output_stream.get().closed()
        else:
            return True

    def close(self):
        if not self.closed:
            with nogil:
                if self.is_readable:
                    check_status(self.input_stream.get().Close())
                else:
                    check_status(self.output_stream.get().Close())

    cdef set_random_access_file(self, shared_ptr[CRandomAccessFile] handle):
        self.input_stream = <shared_ptr[CInputStream]> handle
        self.random_access = handle
        self.is_seekable = True

    cdef set_input_stream(self, shared_ptr[CInputStream] handle):
        self.input_stream = handle
        self.random_access.reset()
        self.is_seekable = False

    cdef set_output_stream(self, shared_ptr[COutputStream] handle):
        self.output_stream = handle

    cdef shared_ptr[CRandomAccessFile] get_random_access_file(self) except *:
        self._assert_readable()
        self._assert_seekable()
        return self.random_access

    cdef shared_ptr[CInputStream] get_input_stream(self) except *:
        self._assert_readable()
        return self.input_stream

    cdef shared_ptr[COutputStream] get_output_stream(self) except *:
        self._assert_writable()
        return self.output_stream

    def _assert_open(self):
        if self.closed:
            raise ValueError("I/O operation on closed file")

    def _assert_readable(self):
        self._assert_open()
        if not self.is_readable:
            # XXX UnsupportedOperation
            raise IOError("only valid on readable files")

    def _assert_writable(self):
        self._assert_open()
        if not self.is_writable:
            raise IOError("only valid on writable files")

    def _assert_seekable(self):
        self._assert_open()
        if not self.is_seekable:
            raise IOError("only valid on seekable files")

    def size(self):
        """
        Return file size
        """
        cdef int64_t size

        handle = self.get_random_access_file()
        with nogil:
            size = GetResultValue(handle.get().GetSize())

        return size

    def metadata(self):
        """
        Return file metadata
        """
        cdef:
            shared_ptr[const CKeyValueMetadata] c_metadata

        handle = self.get_input_stream()
        with nogil:
            c_metadata = GetResultValue(handle.get().ReadMetadata())

        metadata = {}
        if c_metadata.get() != nullptr:
            for i in range(c_metadata.get().size()):
                metadata[frombytes(c_metadata.get().key(i))] = \
                    c_metadata.get().value(i)
        return metadata

    def tell(self):
        """
        Return current stream position
        """
        cdef int64_t position

        if self.is_readable:
            rd_handle = self.get_random_access_file()
            with nogil:
                position = GetResultValue(rd_handle.get().Tell())
        else:
            wr_handle = self.get_output_stream()
            with nogil:
                position = GetResultValue(wr_handle.get().Tell())

        return position

    def seek(self, int64_t position, int whence=0):
        """
        Change current file stream position

        Parameters
        ----------
        position : int
            Byte offset, interpreted relative to value of whence argument
        whence : int, default 0
            Point of reference for seek offset

        Notes
        -----
        Values of whence:
        * 0 -- start of stream (the default); offset should be zero or positive
        * 1 -- current stream position; offset may be negative
        * 2 -- end of stream; offset is usually negative

        Returns
        -------
        int
            The new absolute stream position.
        """
        cdef int64_t offset
        handle = self.get_random_access_file()

        with nogil:
            if whence == 0:
                offset = position
            elif whence == 1:
                offset = GetResultValue(handle.get().Tell())
                offset = offset + position
            elif whence == 2:
                offset = GetResultValue(handle.get().GetSize())
                offset = offset + position
            else:
                with gil:
                    raise ValueError("Invalid value of whence: {0}"
                                     .format(whence))
            check_status(handle.get().Seek(offset))

        return self.tell()

    def flush(self):
Loading ...