Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

arrow-nightlies / pyarrow   python

Repository URL to install this package:

Version: 19.0.0.dev246 

/ src / arrow / python / io.cc

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#include "io.h"

#include <cstdint>
#include <cstdlib>
#include <memory>
#include <mutex>
#include <string>

#include "arrow/io/memory.h"
#include "arrow/memory_pool.h"
#include "arrow/status.h"
#include "arrow/util/logging.h"

#include "arrow/python/common.h"
#include "arrow/python/pyarrow.h"

namespace arrow {

using arrow::io::TransformInputStream;

namespace py {

// ----------------------------------------------------------------------
// Python file

// A common interface to a Python file-like object. Must acquire GIL before
// calling any methods
class PythonFile {
 public:
  explicit PythonFile(PyObject* file) : file_(file), checked_read_buffer_(false) {
    Py_INCREF(file);
  }

  Status CheckClosed() const {
    if (!file_) {
      return Status::Invalid("operation on closed Python file");
    }
    return Status::OK();
  }

  Status Close() {
    if (file_) {
      PyObject* result = cpp_PyObject_CallMethod(file_.obj(), "close", "()");
      Py_XDECREF(result);
      file_.reset();
      PY_RETURN_IF_ERROR(StatusCode::IOError);
    }
    return Status::OK();
  }

  Status Abort() {
    file_.reset();
    return Status::OK();
  }

  bool closed() const {
    if (!file_) {
      return true;
    }
    PyObject* result = PyObject_GetAttrString(file_.obj(), "closed");
    if (result == NULL) {
      // Can't propagate the error, so write it out and return an arbitrary value
      PyErr_WriteUnraisable(NULL);
      return true;
    }
    int ret = PyObject_IsTrue(result);
    Py_XDECREF(result);
    if (ret < 0) {
      PyErr_WriteUnraisable(NULL);
      return true;
    }
    return ret != 0;
  }

  Status Seek(int64_t position, int whence) {
    RETURN_NOT_OK(CheckClosed());

    // NOTE: `long long` is at least 64 bits in the C standard, the cast below is
    // therefore safe.

    // whence: 0 for relative to start of file, 2 for end of file
    PyObject* result = cpp_PyObject_CallMethod(file_.obj(), "seek", "(Li)",
                                               static_cast<long long>(position), whence);
    Py_XDECREF(result);
    PY_RETURN_IF_ERROR(StatusCode::IOError);
    return Status::OK();
  }

  Status Read(int64_t nbytes, PyObject** out) {
    RETURN_NOT_OK(CheckClosed());

    PyObject* result = cpp_PyObject_CallMethod(file_.obj(), "read", "(L)",
                                               static_cast<long long>(nbytes));
    PY_RETURN_IF_ERROR(StatusCode::IOError);
    *out = result;
    return Status::OK();
  }

  Status ReadBuffer(int64_t nbytes, PyObject** out) {
    PyObject* result = cpp_PyObject_CallMethod(file_.obj(), "read_buffer", "(L)",
                                               static_cast<long long>(nbytes));
    PY_RETURN_IF_ERROR(StatusCode::IOError);
    *out = result;
    return Status::OK();
  }

  Status Write(const void* data, int64_t nbytes) {
    RETURN_NOT_OK(CheckClosed());

    // Since the data isn't owned, we have to make a copy
    PyObject* py_data =
        PyBytes_FromStringAndSize(reinterpret_cast<const char*>(data), nbytes);
    PY_RETURN_IF_ERROR(StatusCode::IOError);

    PyObject* result = cpp_PyObject_CallMethod(file_.obj(), "write", "(O)", py_data);
    Py_XDECREF(py_data);
    Py_XDECREF(result);
    PY_RETURN_IF_ERROR(StatusCode::IOError);
    return Status::OK();
  }

  Status Write(const std::shared_ptr<Buffer>& buffer) {
    RETURN_NOT_OK(CheckClosed());

    PyObject* py_data = wrap_buffer(buffer);
    PY_RETURN_IF_ERROR(StatusCode::IOError);

    PyObject* result = cpp_PyObject_CallMethod(file_.obj(), "write", "(O)", py_data);
    Py_XDECREF(py_data);
    Py_XDECREF(result);
    PY_RETURN_IF_ERROR(StatusCode::IOError);
    return Status::OK();
  }

  Result<int64_t> Tell() {
    RETURN_NOT_OK(CheckClosed());

    PyObject* result = cpp_PyObject_CallMethod(file_.obj(), "tell", "()");
    PY_RETURN_IF_ERROR(StatusCode::IOError);

    int64_t position = PyLong_AsLongLong(result);
    Py_DECREF(result);

    // PyLong_AsLongLong can raise OverflowError
    PY_RETURN_IF_ERROR(StatusCode::IOError);
    return position;
  }

  std::mutex& lock() { return lock_; }

  bool HasReadBuffer() {
    if (!checked_read_buffer_) {  // we don't want to check this each time
      has_read_buffer_ = PyObject_HasAttrString(file_.obj(), "read_buffer") == 1;
      checked_read_buffer_ = true;
    }
    return has_read_buffer_;
  }

 private:
  std::mutex lock_;
  OwnedRefNoGIL file_;
  bool has_read_buffer_;
  bool checked_read_buffer_;
};

// ----------------------------------------------------------------------
// Seekable input stream

PyReadableFile::PyReadableFile(PyObject* file) { file_.reset(new PythonFile(file)); }

// The destructor does not close the underlying Python file object, as
// there may be multiple references to it.  Instead let the Python
// destructor do its job.
PyReadableFile::~PyReadableFile() {}

Status PyReadableFile::Abort() {
  return SafeCallIntoPython([this]() { return file_->Abort(); });
}

Status PyReadableFile::Close() {
  return SafeCallIntoPython([this]() { return file_->Close(); });
}

bool PyReadableFile::closed() const {
  bool res;
  Status st = SafeCallIntoPython([this, &res]() {
    res = file_->closed();
    return Status::OK();
  });
  return res;
}

Status PyReadableFile::Seek(int64_t position) {
  return SafeCallIntoPython([=] { return file_->Seek(position, 0); });
}

Result<int64_t> PyReadableFile::Tell() const {
  return SafeCallIntoPython([=]() -> Result<int64_t> { return file_->Tell(); });
}

Result<int64_t> PyReadableFile::Read(int64_t nbytes, void* out) {
  return SafeCallIntoPython([=]() -> Result<int64_t> {
    OwnedRef bytes;
    RETURN_NOT_OK(file_->Read(nbytes, bytes.ref()));
    PyObject* bytes_obj = bytes.obj();
    DCHECK(bytes_obj != NULL);

    Py_buffer py_buf;
    if (!PyObject_GetBuffer(bytes_obj, &py_buf, PyBUF_ANY_CONTIGUOUS)) {
      const uint8_t* data = reinterpret_cast<const uint8_t*>(py_buf.buf);
      std::memcpy(out, data, py_buf.len);
      int64_t len = py_buf.len;
      PyBuffer_Release(&py_buf);
      return len;
    } else {
      return Status::TypeError(
          "Python file read() should have returned a bytes object or an object "
          "supporting the buffer protocol, got '",
          Py_TYPE(bytes_obj)->tp_name, "' (did you open the file in binary mode?)");
    }
  });
}

Result<std::shared_ptr<Buffer>> PyReadableFile::Read(int64_t nbytes) {
  return SafeCallIntoPython([=]() -> Result<std::shared_ptr<Buffer>> {
    OwnedRef buffer_obj;
    if (file_->HasReadBuffer()) {
      RETURN_NOT_OK(file_->ReadBuffer(nbytes, buffer_obj.ref()));
    } else {
      RETURN_NOT_OK(file_->Read(nbytes, buffer_obj.ref()));
    }
    DCHECK(buffer_obj.obj() != NULL);

    return PyBuffer::FromPyObject(buffer_obj.obj());
  });
}

Result<int64_t> PyReadableFile::ReadAt(int64_t position, int64_t nbytes, void* out) {
  std::lock_guard<std::mutex> guard(file_->lock());
  return SafeCallIntoPython([=]() -> Result<int64_t> {
    RETURN_NOT_OK(Seek(position));
    return Read(nbytes, out);
  });
}

Result<std::shared_ptr<Buffer>> PyReadableFile::ReadAt(int64_t position, int64_t nbytes) {
  std::lock_guard<std::mutex> guard(file_->lock());
  return SafeCallIntoPython([=]() -> Result<std::shared_ptr<Buffer>> {
    RETURN_NOT_OK(Seek(position));
    return Read(nbytes);
  });
}

Result<int64_t> PyReadableFile::GetSize() {
  return SafeCallIntoPython([=]() -> Result<int64_t> {
    ARROW_ASSIGN_OR_RAISE(int64_t current_position, file_->Tell());
    RETURN_NOT_OK(file_->Seek(0, 2));

    ARROW_ASSIGN_OR_RAISE(int64_t file_size, file_->Tell());
    // Restore previous file position
    RETURN_NOT_OK(file_->Seek(current_position, 0));

    return file_size;
  });
}

// ----------------------------------------------------------------------
// Output stream

PyOutputStream::PyOutputStream(PyObject* file) : position_(0) {
  file_.reset(new PythonFile(file));
}

// The destructor does not close the underlying Python file object, as
// there may be multiple references to it.  Instead let the Python
// destructor do its job.
PyOutputStream::~PyOutputStream() {}

Status PyOutputStream::Abort() {
  return SafeCallIntoPython([=]() { return file_->Abort(); });
}

Status PyOutputStream::Close() {
  return SafeCallIntoPython([=]() { return file_->Close(); });
}

bool PyOutputStream::closed() const {
  bool res;
  Status st = SafeCallIntoPython([this, &res]() {
    res = file_->closed();
    return Status::OK();
  });
  return res;
}

Result<int64_t> PyOutputStream::Tell() const { return position_; }

Status PyOutputStream::Write(const void* data, int64_t nbytes) {
  return SafeCallIntoPython([=]() {
    position_ += nbytes;
    return file_->Write(data, nbytes);
  });
}

Status PyOutputStream::Write(const std::shared_ptr<Buffer>& buffer) {
  return SafeCallIntoPython([=]() {
    position_ += buffer->size();
    return file_->Write(buffer);
  });
}

// ----------------------------------------------------------------------
// Foreign buffer

Status PyForeignBuffer::Make(const uint8_t* data, int64_t size, PyObject* base,
                             std::shared_ptr<Buffer>* out) {
  PyForeignBuffer* buf = new PyForeignBuffer(data, size, base);
  if (buf == NULL) {
    return Status::OutOfMemory("could not allocate foreign buffer object");
  } else {
    *out = std::shared_ptr<Buffer>(buf);
    return Status::OK();
  }
}

// ----------------------------------------------------------------------
// TransformInputStream::TransformFunc wrapper

struct TransformFunctionWrapper {
  TransformFunctionWrapper(TransformCallback cb, PyObject* arg)
      : cb_(std::move(cb)), arg_(std::make_shared<OwnedRefNoGIL>(arg)) {
    Py_INCREF(arg);
  }

  Result<std::shared_ptr<Buffer>> operator()(const std::shared_ptr<Buffer>& src) {
    return SafeCallIntoPython([=]() -> Result<std::shared_ptr<Buffer>> {
      std::shared_ptr<Buffer> dest;
      cb_(arg_->obj(), src, &dest);
      RETURN_NOT_OK(CheckPyError());
      return dest;
    });
  }

 protected:
  // Need to wrap OwnedRefNoGIL because std::function needs the callable
  // to be copy-constructible...
  TransformCallback cb_;
  std::shared_ptr<OwnedRefNoGIL> arg_;
};

std::shared_ptr<::arrow::io::InputStream> MakeTransformInputStream(
    std::shared_ptr<::arrow::io::InputStream> wrapped, TransformInputStreamVTable vtable,
    PyObject* handler) {
  TransformInputStream::TransformFunc transform(
      TransformFunctionWrapper{std::move(vtable.transform), handler});
  return std::make_shared<TransformInputStream>(std::move(wrapped), std::move(transform));
}

std::shared_ptr<StreamWrapFunc> MakeStreamTransformFunc(TransformInputStreamVTable vtable,
                                                        PyObject* handler) {
  TransformInputStream::TransformFunc transform(
      TransformFunctionWrapper{std::move(vtable.transform), handler});
  StreamWrapFunc func = [transform](std::shared_ptr<::arrow::io::InputStream> wrapped) {
    return std::make_shared<TransformInputStream>(wrapped, transform);
  };
  return std::make_shared<StreamWrapFunc>(func);
}

}  // namespace py
}  // namespace arrow