Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

arrow-nightlies / pyarrow   python

Repository URL to install this package:

Version: 19.0.0.dev65 

/ _csv.pyx

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

# cython: profile=False
# distutils: language = c++
# cython: language_level = 3

from cython.operator cimport dereference as deref

from collections import namedtuple
from collections.abc import Mapping

from pyarrow.includes.common cimport *
from pyarrow.includes.libarrow cimport *
from pyarrow.includes.libarrow_python cimport *
from pyarrow.lib cimport (check_status, Field, MemoryPool, Schema,
                          RecordBatchReader, ensure_type,
                          maybe_unbox_memory_pool, get_input_stream,
                          get_writer, native_transcoding_input_stream,
                          pyarrow_unwrap_batch, pyarrow_unwrap_schema,
                          pyarrow_unwrap_table, pyarrow_wrap_schema,
                          pyarrow_wrap_table, pyarrow_wrap_data_type,
                          pyarrow_unwrap_data_type, Table, RecordBatch,
                          StopToken, _CRecordBatchWriter)
from pyarrow.lib import frombytes, tobytes, SignalStopHandler


cdef unsigned char _single_char(s) except 0:
    val = ord(s)
    if val == 0 or val > 127:
        raise ValueError("Expecting an ASCII character")
    return <unsigned char> val


_InvalidRow = namedtuple(
    "_InvalidRow", ("expected_columns", "actual_columns", "number", "text"),
    module=__name__)


class InvalidRow(_InvalidRow):
    """
    Description of an invalid row in a CSV file.

    Parameters
    ----------
    expected_columns : int
        The expected number of columns in the row.
    actual_columns : int
        The actual number of columns in the row.
    number : int or None
        The physical row number if known, otherwise None.
    text : str
        The contents of the row.
    """
    __slots__ = ()


cdef CInvalidRowResult _handle_invalid_row(
        handler, const CCSVInvalidRow& c_row) except CInvalidRowResult_Error:
    # A negative row number means undetermined (because of parallel reading)
    row_number = c_row.number if c_row.number >= 0 else None
    row = InvalidRow(c_row.expected_columns, c_row.actual_columns,
                     row_number, frombytes(<c_string> c_row.text))
    result = handler(row)
    if result == 'error':
        return CInvalidRowResult_Error
    elif result == 'skip':
        return CInvalidRowResult_Skip
    else:
        raise ValueError("Invalid return value for invalid row handler: "
                         f"expected 'error' or 'skip', got {result!r}")


cdef class ReadOptions(_Weakrefable):
    """
    Options for reading CSV files.

    Parameters
    ----------
    use_threads : bool, optional (default True)
        Whether to use multiple threads to accelerate reading
    block_size : int, optional
        How much bytes to process at a time from the input stream.
        This will determine multi-threading granularity as well as
        the size of individual record batches or table chunks.
        Minimum valid value for block size is 1
    skip_rows : int, optional (default 0)
        The number of rows to skip before the column names (if any)
        and the CSV data.
    skip_rows_after_names : int, optional (default 0)
        The number of rows to skip after the column names.
        This number can be larger than the number of rows in one
        block, and empty rows are counted.
        The order of application is as follows:
        - `skip_rows` is applied (if non-zero);
        - column names are read (unless `column_names` is set);
        - `skip_rows_after_names` is applied (if non-zero).
    column_names : list, optional
        The column names of the target table.  If empty, fall back on
        `autogenerate_column_names`.
    autogenerate_column_names : bool, optional (default False)
        Whether to autogenerate column names if `column_names` is empty.
        If true, column names will be of the form "f0", "f1"...
        If false, column names will be read from the first CSV row
        after `skip_rows`.
    encoding : str, optional (default 'utf8')
        The character encoding of the CSV data.  Columns that cannot
        decode using this encoding can still be read as Binary.

    Examples
    --------

    Defining an example data:

    >>> import io
    >>> s = "1,2,3\\nFlamingo,2,2022-03-01\\nHorse,4,2022-03-02\\nBrittle stars,5,2022-03-03\\nCentipede,100,2022-03-04"
    >>> print(s)
    1,2,3
    Flamingo,2,2022-03-01
    Horse,4,2022-03-02
    Brittle stars,5,2022-03-03
    Centipede,100,2022-03-04

    Ignore the first numbered row and substitute it with defined
    or autogenerated column names:

    >>> from pyarrow import csv
    >>> read_options = csv.ReadOptions(
    ...                column_names=["animals", "n_legs", "entry"],
    ...                skip_rows=1)
    >>> csv.read_csv(io.BytesIO(s.encode()), read_options=read_options)
    pyarrow.Table
    animals: string
    n_legs: int64
    entry: date32[day]
    ----
    animals: [["Flamingo","Horse","Brittle stars","Centipede"]]
    n_legs: [[2,4,5,100]]
    entry: [[2022-03-01,2022-03-02,2022-03-03,2022-03-04]]

    >>> read_options = csv.ReadOptions(autogenerate_column_names=True,
    ...                                skip_rows=1)
    >>> csv.read_csv(io.BytesIO(s.encode()), read_options=read_options)
    pyarrow.Table
    f0: string
    f1: int64
    f2: date32[day]
    ----
    f0: [["Flamingo","Horse","Brittle stars","Centipede"]]
    f1: [[2,4,5,100]]
    f2: [[2022-03-01,2022-03-02,2022-03-03,2022-03-04]]

    Remove the first 2 rows of the data:

    >>> read_options = csv.ReadOptions(skip_rows_after_names=2)
    >>> csv.read_csv(io.BytesIO(s.encode()), read_options=read_options)
    pyarrow.Table
    1: string
    2: int64
    3: date32[day]
    ----
    1: [["Brittle stars","Centipede"]]
    2: [[5,100]]
    3: [[2022-03-03,2022-03-04]]
    """

    # Avoid mistakingly creating attributes
    __slots__ = ()

    # __init__() is not called when unpickling, initialize storage here
    def __cinit__(self, *argw, **kwargs):
        self.options.reset(new CCSVReadOptions(CCSVReadOptions.Defaults()))

    def __init__(self, *, use_threads=None, block_size=None, skip_rows=None,
                 skip_rows_after_names=None, column_names=None,
                 autogenerate_column_names=None, encoding='utf8'):
        if use_threads is not None:
            self.use_threads = use_threads
        if block_size is not None:
            self.block_size = block_size
        if skip_rows is not None:
            self.skip_rows = skip_rows
        if skip_rows_after_names is not None:
            self.skip_rows_after_names = skip_rows_after_names
        if column_names is not None:
            self.column_names = column_names
        if autogenerate_column_names is not None:
            self.autogenerate_column_names= autogenerate_column_names
        # Python-specific option
        self.encoding = encoding

    @property
    def use_threads(self):
        """
        Whether to use multiple threads to accelerate reading.
        """
        return deref(self.options).use_threads

    @use_threads.setter
    def use_threads(self, value):
        deref(self.options).use_threads = value

    @property
    def block_size(self):
        """
        How much bytes to process at a time from the input stream.
        This will determine multi-threading granularity as well as
        the size of individual record batches or table chunks.
        """
        return deref(self.options).block_size

    @block_size.setter
    def block_size(self, value):
        deref(self.options).block_size = value

    @property
    def skip_rows(self):
        """
        The number of rows to skip before the column names (if any)
        and the CSV data.
        See `skip_rows_after_names` for interaction description
        """
        return deref(self.options).skip_rows

    @skip_rows.setter
    def skip_rows(self, value):
        deref(self.options).skip_rows = value

    @property
    def skip_rows_after_names(self):
        """
        The number of rows to skip after the column names.
        This number can be larger than the number of rows in one
        block, and empty rows are counted.
        The order of application is as follows:
        - `skip_rows` is applied (if non-zero);
        - column names are read (unless `column_names` is set);
        - `skip_rows_after_names` is applied (if non-zero).
        """
        return deref(self.options).skip_rows_after_names

    @skip_rows_after_names.setter
    def skip_rows_after_names(self, value):
        deref(self.options).skip_rows_after_names = value

    @property
    def column_names(self):
        """
        The column names of the target table.  If empty, fall back on
        `autogenerate_column_names`.
        """
        return [frombytes(s) for s in deref(self.options).column_names]

    @column_names.setter
    def column_names(self, value):
        deref(self.options).column_names.clear()
        for item in value:
            deref(self.options).column_names.push_back(tobytes(item))

    @property
    def autogenerate_column_names(self):
        """
        Whether to autogenerate column names if `column_names` is empty.
        If true, column names will be of the form "f0", "f1"...
        If false, column names will be read from the first CSV row
        after `skip_rows`.
        """
        return deref(self.options).autogenerate_column_names

    @autogenerate_column_names.setter
    def autogenerate_column_names(self, value):
        deref(self.options).autogenerate_column_names = value

    def validate(self):
        check_status(deref(self.options).Validate())

    def equals(self, ReadOptions other):
        """
        Parameters
        ----------
        other : pyarrow.csv.ReadOptions

        Returns
        -------
        bool
        """
        return (
            self.use_threads == other.use_threads and
            self.block_size == other.block_size and
            self.skip_rows == other.skip_rows and
            self.skip_rows_after_names == other.skip_rows_after_names and
            self.column_names == other.column_names and
            self.autogenerate_column_names ==
            other.autogenerate_column_names and
            self.encoding == other.encoding
        )

    @staticmethod
    cdef ReadOptions wrap(CCSVReadOptions options):
        out = ReadOptions()
        out.options.reset(new CCSVReadOptions(move(options)))
        out.encoding = 'utf8'  # No way to know this
        return out

    def __getstate__(self):
        return (self.use_threads, self.block_size, self.skip_rows,
                self.column_names, self.autogenerate_column_names,
                self.encoding, self.skip_rows_after_names)

    def __setstate__(self, state):
        (self.use_threads, self.block_size, self.skip_rows,
         self.column_names, self.autogenerate_column_names,
         self.encoding, self.skip_rows_after_names) = state

    def __eq__(self, other):
        try:
            return self.equals(other)
        except TypeError:
            return False


cdef class ParseOptions(_Weakrefable):
    """
    Options for parsing CSV files.

    Parameters
    ----------
    delimiter : 1-character string, optional (default ',')
        The character delimiting individual cells in the CSV data.
    quote_char : 1-character string or False, optional (default '"')
        The character used optionally for quoting CSV values
Loading ...