Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
pyarrow / _csv.pyx
Size: Mime:
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

# cython: profile=False
# distutils: language = c++
# cython: language_level = 3

from cython.operator cimport dereference as deref

from collections import namedtuple
from collections.abc import Mapping

from pyarrow.includes.common cimport *
from pyarrow.includes.libarrow cimport *
from pyarrow.includes.libarrow_python cimport *
from pyarrow.lib cimport (check_status, Field, MemoryPool, Schema,
                          RecordBatchReader, ensure_type,
                          maybe_unbox_memory_pool, get_input_stream,
                          get_writer, native_transcoding_input_stream,
                          pyarrow_unwrap_batch, pyarrow_unwrap_schema,
                          pyarrow_unwrap_table, pyarrow_wrap_schema,
                          pyarrow_wrap_table, pyarrow_wrap_data_type,
                          pyarrow_unwrap_data_type, Table, RecordBatch,
                          StopToken, _CRecordBatchWriter)
from pyarrow.lib import frombytes, tobytes, SignalStopHandler


cdef unsigned char _single_char(s) except 0:
    val = ord(s)
    if val == 0 or val > 127:
        raise ValueError("Expecting an ASCII character")
    return <unsigned char> val


_InvalidRow = namedtuple(
    "_InvalidRow", ("expected_columns", "actual_columns", "number", "text"),
    module=__name__)


class InvalidRow(_InvalidRow):
    """
    Description of an invalid row in a CSV file.

    Parameters
    ----------
    expected_columns : int
        The expected number of columns in the row.
    actual_columns : int
        The actual number of columns in the row.
    number : int or None
        The physical row number if known, otherwise None.
    text : str
        The contents of the row.
    """
    __slots__ = ()


cdef CInvalidRowResult _handle_invalid_row(
        handler, const CCSVInvalidRow& c_row) except CInvalidRowResult_Error:
    # A negative row number means undetermined (because of parallel reading)
    row_number = c_row.number if c_row.number >= 0 else None
    row = InvalidRow(c_row.expected_columns, c_row.actual_columns,
                     row_number, frombytes(<c_string> c_row.text))
    result = handler(row)
    if result == 'error':
        return CInvalidRowResult_Error
    elif result == 'skip':
        return CInvalidRowResult_Skip
    else:
        raise ValueError("Invalid return value for invalid row handler: "
                         f"expected 'error' or 'skip', got {result!r}")


cdef class ReadOptions(_Weakrefable):
    """
    Options for reading CSV files.

    Parameters
    ----------
    use_threads : bool, optional (default True)
        Whether to use multiple threads to accelerate reading
    block_size : int, optional
        How much bytes to process at a time from the input stream.
        This will determine multi-threading granularity as well as
        the size of individual record batches or table chunks.
        Minimum valid value for block size is 1
    skip_rows : int, optional (default 0)
        The number of rows to skip before the column names (if any)
        and the CSV data.
    skip_rows_after_names : int, optional (default 0)
        The number of rows to skip after the column names.
        This number can be larger than the number of rows in one
        block, and empty rows are counted.
        The order of application is as follows:
        - `skip_rows` is applied (if non-zero);
        - column names are read (unless `column_names` is set);
        - `skip_rows_after_names` is applied (if non-zero).
    column_names : list, optional
        The column names of the target table.  If empty, fall back on
        `autogenerate_column_names`.
    autogenerate_column_names : bool, optional (default False)
        Whether to autogenerate column names if `column_names` is empty.
        If true, column names will be of the form "f0", "f1"...
        If false, column names will be read from the first CSV row
        after `skip_rows`.
    encoding : str, optional (default 'utf8')
        The character encoding of the CSV data.  Columns that cannot
        decode using this encoding can still be read as Binary.

    Examples
    --------

    Defining an example data:

    >>> import io
    >>> s = "1,2,3\\nFlamingo,2,2022-03-01\\nHorse,4,2022-03-02\\nBrittle stars,5,2022-03-03\\nCentipede,100,2022-03-04"
    >>> print(s)
    1,2,3
    Flamingo,2,2022-03-01
    Horse,4,2022-03-02
    Brittle stars,5,2022-03-03
    Centipede,100,2022-03-04

    Ignore the first numbered row and substitute it with defined
    or autogenerated column names:

    >>> from pyarrow import csv
    >>> read_options = csv.ReadOptions(
    ...                column_names=["animals", "n_legs", "entry"],
    ...                skip_rows=1)
    >>> csv.read_csv(io.BytesIO(s.encode()), read_options=read_options)
    pyarrow.Table
    animals: string
    n_legs: int64
    entry: date32[day]
    ----
    animals: [["Flamingo","Horse","Brittle stars","Centipede"]]
    n_legs: [[2,4,5,100]]
    entry: [[2022-03-01,2022-03-02,2022-03-03,2022-03-04]]

    >>> read_options = csv.ReadOptions(autogenerate_column_names=True,
    ...                                skip_rows=1)
    >>> csv.read_csv(io.BytesIO(s.encode()), read_options=read_options)
    pyarrow.Table
    f0: string
    f1: int64
    f2: date32[day]
    ----
    f0: [["Flamingo","Horse","Brittle stars","Centipede"]]
    f1: [[2,4,5,100]]
    f2: [[2022-03-01,2022-03-02,2022-03-03,2022-03-04]]

    Remove the first 2 rows of the data:

    >>> read_options = csv.ReadOptions(skip_rows_after_names=2)
    >>> csv.read_csv(io.BytesIO(s.encode()), read_options=read_options)
    pyarrow.Table
    1: string
    2: int64
    3: date32[day]
    ----
    1: [["Brittle stars","Centipede"]]
    2: [[5,100]]
    3: [[2022-03-03,2022-03-04]]
    """

    # Avoid mistakingly creating attributes
    __slots__ = ()

    # __init__() is not called when unpickling, initialize storage here
    def __cinit__(self, *argw, **kwargs):
        self.options.reset(new CCSVReadOptions(CCSVReadOptions.Defaults()))

    def __init__(self, *, use_threads=None, block_size=None, skip_rows=None,
                 skip_rows_after_names=None, column_names=None,
                 autogenerate_column_names=None, encoding='utf8'):
        if use_threads is not None:
            self.use_threads = use_threads
        if block_size is not None:
            self.block_size = block_size
        if skip_rows is not None:
            self.skip_rows = skip_rows
        if skip_rows_after_names is not None:
            self.skip_rows_after_names = skip_rows_after_names
        if column_names is not None:
            self.column_names = column_names
        if autogenerate_column_names is not None:
            self.autogenerate_column_names= autogenerate_column_names
        # Python-specific option
        self.encoding = encoding

    @property
    def use_threads(self):
        """
        Whether to use multiple threads to accelerate reading.
        """
        return deref(self.options).use_threads

    @use_threads.setter
    def use_threads(self, value):
        deref(self.options).use_threads = value

    @property
    def block_size(self):
        """
        How much bytes to process at a time from the input stream.
        This will determine multi-threading granularity as well as
        the size of individual record batches or table chunks.
        """
        return deref(self.options).block_size

    @block_size.setter
    def block_size(self, value):
        deref(self.options).block_size = value

    @property
    def skip_rows(self):
        """
        The number of rows to skip before the column names (if any)
        and the CSV data.
        See `skip_rows_after_names` for interaction description
        """
        return deref(self.options).skip_rows

    @skip_rows.setter
    def skip_rows(self, value):
        deref(self.options).skip_rows = value

    @property
    def skip_rows_after_names(self):
        """
        The number of rows to skip after the column names.
        This number can be larger than the number of rows in one
        block, and empty rows are counted.
        The order of application is as follows:
        - `skip_rows` is applied (if non-zero);
        - column names are read (unless `column_names` is set);
        - `skip_rows_after_names` is applied (if non-zero).
        """
        return deref(self.options).skip_rows_after_names

    @skip_rows_after_names.setter
    def skip_rows_after_names(self, value):
        deref(self.options).skip_rows_after_names = value

    @property
    def column_names(self):
        """
        The column names of the target table.  If empty, fall back on
        `autogenerate_column_names`.
        """
        return [frombytes(s) for s in deref(self.options).column_names]

    @column_names.setter
    def column_names(self, value):
        deref(self.options).column_names.clear()
        for item in value:
            deref(self.options).column_names.push_back(tobytes(item))

    @property
    def autogenerate_column_names(self):
        """
        Whether to autogenerate column names if `column_names` is empty.
        If true, column names will be of the form "f0", "f1"...
        If false, column names will be read from the first CSV row
        after `skip_rows`.
        """
        return deref(self.options).autogenerate_column_names

    @autogenerate_column_names.setter
    def autogenerate_column_names(self, value):
        deref(self.options).autogenerate_column_names = value

    def validate(self):
        check_status(deref(self.options).Validate())

    def equals(self, ReadOptions other):
        """
        Parameters
        ----------
        other : pyarrow.csv.ReadOptions

        Returns
        -------
        bool
        """
        return (
            self.use_threads == other.use_threads and
            self.block_size == other.block_size and
            self.skip_rows == other.skip_rows and
            self.skip_rows_after_names == other.skip_rows_after_names and
            self.column_names == other.column_names and
            self.autogenerate_column_names ==
            other.autogenerate_column_names and
            self.encoding == other.encoding
        )

    @staticmethod
    cdef ReadOptions wrap(CCSVReadOptions options):
        out = ReadOptions()
        out.options.reset(new CCSVReadOptions(move(options)))
        out.encoding = 'utf8'  # No way to know this
        return out

    def __getstate__(self):
        return (self.use_threads, self.block_size, self.skip_rows,
                self.column_names, self.autogenerate_column_names,
                self.encoding, self.skip_rows_after_names)

    def __setstate__(self, state):
        (self.use_threads, self.block_size, self.skip_rows,
         self.column_names, self.autogenerate_column_names,
         self.encoding, self.skip_rows_after_names) = state

    def __eq__(self, other):
        try:
            return self.equals(other)
        except TypeError:
            return False


cdef class ParseOptions(_Weakrefable):
    """
    Options for parsing CSV files.

    Parameters
    ----------
    delimiter : 1-character string, optional (default ',')
        The character delimiting individual cells in the CSV data.
    quote_char : 1-character string or False, optional (default '"')
        The character used optionally for quoting CSV values
        (False if quoting is not allowed).
    double_quote : bool, optional (default True)
        Whether two quotes in a quoted CSV value denote a single quote
        in the data.
    escape_char : 1-character string or False, optional (default False)
        The character used optionally for escaping special characters
        (False if escaping is not allowed).
    newlines_in_values : bool, optional (default False)
        Whether newline characters are allowed in CSV values.
        Setting this to True reduces the performance of multi-threaded
        CSV reading.
    ignore_empty_lines : bool, optional (default True)
        Whether empty lines are ignored in CSV input.
        If False, an empty line is interpreted as containing a single empty
        value (assuming a one-column CSV file).
    invalid_row_handler : callable, optional (default None)
        If not None, this object is called for each CSV row that fails
        parsing (because of a mismatching number of columns).
        It should accept a single InvalidRow argument and return either
        "skip" or "error" depending on the desired outcome.

    Examples
    --------

    Defining an example file from bytes object:

    >>> import io
    >>> s = (
    ...     "animals;n_legs;entry\\n"
    ...     "Flamingo;2;2022-03-01\\n"
    ...     "# Comment here:\\n"
    ...     "Horse;4;2022-03-02\\n"
    ...     "Brittle stars;5;2022-03-03\\n"
    ...     "Centipede;100;2022-03-04"
    ... )
    >>> print(s)
    animals;n_legs;entry
    Flamingo;2;2022-03-01
    # Comment here:
    Horse;4;2022-03-02
    Brittle stars;5;2022-03-03
    Centipede;100;2022-03-04
    >>> source = io.BytesIO(s.encode())

    Read the data from a file skipping rows with comments
    and defining the delimiter:

    >>> from pyarrow import csv
    >>> def skip_comment(row):
    ...     if row.text.startswith("# "):
    ...         return 'skip'
    ...     else:
    ...         return 'error'
    ...
    >>> parse_options = csv.ParseOptions(delimiter=";", invalid_row_handler=skip_comment)
    >>> csv.read_csv(source, parse_options=parse_options)
    pyarrow.Table
    animals: string
    n_legs: int64
    entry: date32[day]
    ----
    animals: [["Flamingo","Horse","Brittle stars","Centipede"]]
    n_legs: [[2,4,5,100]]
    entry: [[2022-03-01,2022-03-02,2022-03-03,2022-03-04]]
    """
    __slots__ = ()

    def __cinit__(self, *argw, **kwargs):
        self._invalid_row_handler = None
        self.options.reset(new CCSVParseOptions(CCSVParseOptions.Defaults()))

    def __init__(self, *, delimiter=None, quote_char=None, double_quote=None,
                 escape_char=None, newlines_in_values=None,
                 ignore_empty_lines=None, invalid_row_handler=None):
        if delimiter is not None:
            self.delimiter = delimiter
        if quote_char is not None:
            self.quote_char = quote_char
        if double_quote is not None:
            self.double_quote = double_quote
        if escape_char is not None:
            self.escape_char = escape_char
        if newlines_in_values is not None:
            self.newlines_in_values = newlines_in_values
        if ignore_empty_lines is not None:
            self.ignore_empty_lines = ignore_empty_lines
        if invalid_row_handler is not None:
            self.invalid_row_handler = invalid_row_handler

    @property
    def delimiter(self):
        """
        The character delimiting individual cells in the CSV data.
        """
        return chr(deref(self.options).delimiter)

    @delimiter.setter
    def delimiter(self, value):
        deref(self.options).delimiter = _single_char(value)

    @property
    def quote_char(self):
        """
        The character used optionally for quoting CSV values
        (False if quoting is not allowed).
        """
        if deref(self.options).quoting:
            return chr(deref(self.options).quote_char)
        else:
            return False

    @quote_char.setter
    def quote_char(self, value):
        if value is False:
            deref(self.options).quoting = False
        else:
            deref(self.options).quote_char = _single_char(value)
            deref(self.options).quoting = True

    @property
    def double_quote(self):
        """
        Whether two quotes in a quoted CSV value denote a single quote
        in the data.
        """
        return deref(self.options).double_quote

    @double_quote.setter
    def double_quote(self, value):
        deref(self.options).double_quote = value

    @property
    def escape_char(self):
        """
        The character used optionally for escaping special characters
        (False if escaping is not allowed).
        """
        if deref(self.options).escaping:
            return chr(deref(self.options).escape_char)
        else:
            return False

    @escape_char.setter
    def escape_char(self, value):
        if value is False:
            deref(self.options).escaping = False
        else:
            deref(self.options).escape_char = _single_char(value)
            deref(self.options).escaping = True

    @property
    def newlines_in_values(self):
        """
        Whether newline characters are allowed in CSV values.
        Setting this to True reduces the performance of multi-threaded
        CSV reading.
        """
        return deref(self.options).newlines_in_values

    @newlines_in_values.setter
    def newlines_in_values(self, value):
        deref(self.options).newlines_in_values = value

    @property
    def ignore_empty_lines(self):
        """
        Whether empty lines are ignored in CSV input.
        If False, an empty line is interpreted as containing a single empty
        value (assuming a one-column CSV file).
        """
        return deref(self.options).ignore_empty_lines

    @property
    def invalid_row_handler(self):
        """
        Optional handler for invalid rows.

        If not None, this object is called for each CSV row that fails
        parsing (because of a mismatching number of columns).
        It should accept a single InvalidRow argument and return either
        "skip" or "error" depending on the desired outcome.
        """
        return self._invalid_row_handler

    @invalid_row_handler.setter
    def invalid_row_handler(self, value):
        if value is not None and not callable(value):
            raise TypeError("Expected callable or None, "
                            f"got instance of {type(value)!r}")
        self._invalid_row_handler = value
        deref(self.options).invalid_row_handler = MakeInvalidRowHandler(
            <function[PyInvalidRowCallback]> &_handle_invalid_row, value)

    @ignore_empty_lines.setter
    def ignore_empty_lines(self, value):
        deref(self.options).ignore_empty_lines = value

    def validate(self):
        check_status(deref(self.options).Validate())

    def equals(self, ParseOptions other):
        """
        Parameters
        ----------
        other : pyarrow.csv.ParseOptions

        Returns
        -------
        bool
        """
        return (
            self.delimiter == other.delimiter and
            self.quote_char == other.quote_char and
            self.double_quote == other.double_quote and
            self.escape_char == other.escape_char and
            self.newlines_in_values == other.newlines_in_values and
            self.ignore_empty_lines == other.ignore_empty_lines and
            self._invalid_row_handler == other._invalid_row_handler
        )

    @staticmethod
    cdef ParseOptions wrap(CCSVParseOptions options):
        out = ParseOptions()
        out.options.reset(new CCSVParseOptions(move(options)))
        return out

    def __getstate__(self):
        return (self.delimiter, self.quote_char, self.double_quote,
                self.escape_char, self.newlines_in_values,
                self.ignore_empty_lines, self.invalid_row_handler)

    def __setstate__(self, state):
        (self.delimiter, self.quote_char, self.double_quote,
         self.escape_char, self.newlines_in_values,
         self.ignore_empty_lines, self.invalid_row_handler) = state

    def __eq__(self, other):
        try:
            return self.equals(other)
        except TypeError:
            return False


cdef class _ISO8601(_Weakrefable):
    """
    A special object indicating ISO-8601 parsing.
    """
    __slots__ = ()

    def __str__(self):
        return 'ISO8601'

    def __eq__(self, other):
        return isinstance(other, _ISO8601)


ISO8601 = _ISO8601()


cdef class ConvertOptions(_Weakrefable):
    """
    Options for converting CSV data.

    Parameters
    ----------
    check_utf8 : bool, optional (default True)
        Whether to check UTF8 validity of string columns.
    column_types : pyarrow.Schema or dict, optional
        Explicitly map column names to column types. Passing this argument
        disables type inference on the defined columns.
    null_values : list, optional
        A sequence of strings that denote nulls in the data
        (defaults are appropriate in most cases). Note that by default,
        string columns are not checked for null values. To enable
        null checking for those, specify ``strings_can_be_null=True``.
    true_values : list, optional
        A sequence of strings that denote true booleans in the data
        (defaults are appropriate in most cases).
    false_values : list, optional
        A sequence of strings that denote false booleans in the data
        (defaults are appropriate in most cases).
    decimal_point : 1-character string, optional (default '.')
        The character used as decimal point in floating-point and decimal
        data.
    strings_can_be_null : bool, optional (default False)
        Whether string / binary columns can have null values.
        If true, then strings in null_values are considered null for
        string columns.
        If false, then all strings are valid string values.
    quoted_strings_can_be_null : bool, optional (default True)
        Whether quoted values can be null.
        If true, then strings in "null_values" are also considered null
        when they appear quoted in the CSV file. Otherwise, quoted values
        are never considered null.
    include_columns : list, optional
        The names of columns to include in the Table.
        If empty, the Table will include all columns from the CSV file.
        If not empty, only these columns will be included, in this order.
    include_missing_columns : bool, optional (default False)
        If false, columns in `include_columns` but not in the CSV file will
        error out.
        If true, columns in `include_columns` but not in the CSV file will
        produce a column of nulls (whose type is selected using
        `column_types`, or null by default).
        This option is ignored if `include_columns` is empty.
    auto_dict_encode : bool, optional (default False)
        Whether to try to automatically dict-encode string / binary data.
        If true, then when type inference detects a string or binary column,
        it it dict-encoded up to `auto_dict_max_cardinality` distinct values
        (per chunk), after which it switches to regular encoding.
        This setting is ignored for non-inferred columns (those in
        `column_types`).
    auto_dict_max_cardinality : int, optional
        The maximum dictionary cardinality for `auto_dict_encode`.
        This value is per chunk.
    timestamp_parsers : list, optional
        A sequence of strptime()-compatible format strings, tried in order
        when attempting to infer or convert timestamp values (the special
        value ISO8601() can also be given).  By default, a fast built-in
        ISO-8601 parser is used.

    Examples
    --------

    Defining an example data:

    >>> import io
    >>> s = (
    ...     "animals,n_legs,entry,fast\\n"
    ...     "Flamingo,2,01/03/2022,Yes\\n"
    ...     "Horse,4,02/03/2022,Yes\\n"
    ...     "Brittle stars,5,03/03/2022,No\\n"
    ...     "Centipede,100,04/03/2022,No\\n"
    ...     ",6,05/03/2022,"
    ... )
    >>> print(s)
    animals,n_legs,entry,fast
    Flamingo,2,01/03/2022,Yes
    Horse,4,02/03/2022,Yes
    Brittle stars,5,03/03/2022,No
    Centipede,100,04/03/2022,No
    ,6,05/03/2022,

    Change the type of a column:

    >>> import pyarrow as pa
    >>> from pyarrow import csv
    >>> convert_options = csv.ConvertOptions(column_types={"n_legs": pa.float64()})
    >>> csv.read_csv(io.BytesIO(s.encode()), convert_options=convert_options)
    pyarrow.Table
    animals: string
    n_legs: double
    entry: string
    fast: string
    ----
    animals: [["Flamingo","Horse","Brittle stars","Centipede",""]]
    n_legs: [[2,4,5,100,6]]
    entry: [["01/03/2022","02/03/2022","03/03/2022","04/03/2022","05/03/2022"]]
    fast: [["Yes","Yes","No","No",""]]

    Define a date parsing format to get a timestamp type column
    (in case dates are not in ISO format and not converted by default):

    >>> convert_options = csv.ConvertOptions(
    ...                   timestamp_parsers=["%m/%d/%Y", "%m-%d-%Y"])
    >>> csv.read_csv(io.BytesIO(s.encode()), convert_options=convert_options)
    pyarrow.Table
    animals: string
    n_legs: int64
    entry: timestamp[s]
    fast: string
    ----
    animals: [["Flamingo","Horse","Brittle stars","Centipede",""]]
    n_legs: [[2,4,5,100,6]]
    entry: [[2022-01-03 00:00:00,2022-02-03 00:00:00,2022-03-03 00:00:00,2022-04-03 00:00:00,2022-05-03 00:00:00]]
    fast: [["Yes","Yes","No","No",""]]

    Specify a subset of columns to be read:

    >>> convert_options = csv.ConvertOptions(
    ...                   include_columns=["animals", "n_legs"])
    >>> csv.read_csv(io.BytesIO(s.encode()), convert_options=convert_options)
    pyarrow.Table
    animals: string
    n_legs: int64
    ----
    animals: [["Flamingo","Horse","Brittle stars","Centipede",""]]
    n_legs: [[2,4,5,100,6]]

    List additional column to be included as a null typed column:

    >>> convert_options = csv.ConvertOptions(
    ...                   include_columns=["animals", "n_legs", "location"],
    ...                   include_missing_columns=True)
    >>> csv.read_csv(io.BytesIO(s.encode()), convert_options=convert_options)
    pyarrow.Table
    animals: string
    n_legs: int64
    location: null
    ----
    animals: [["Flamingo","Horse","Brittle stars","Centipede",""]]
    n_legs: [[2,4,5,100,6]]
    location: [5 nulls]

    Define columns as dictionary type (by default only the
    string/binary columns are dictionary encoded):

    >>> convert_options = csv.ConvertOptions(
    ...                   timestamp_parsers=["%m/%d/%Y", "%m-%d-%Y"],
    ...                   auto_dict_encode=True)
    >>> csv.read_csv(io.BytesIO(s.encode()), convert_options=convert_options)
    pyarrow.Table
    animals: dictionary<values=string, indices=int32, ordered=0>
    n_legs: int64
    entry: timestamp[s]
    fast: dictionary<values=string, indices=int32, ordered=0>
    ----
    animals: [  -- dictionary:
    ["Flamingo","Horse","Brittle stars","Centipede",""]  -- indices:
    [0,1,2,3,4]]
    n_legs: [[2,4,5,100,6]]
    entry: [[2022-01-03 00:00:00,2022-02-03 00:00:00,2022-03-03 00:00:00,2022-04-03 00:00:00,2022-05-03 00:00:00]]
    fast: [  -- dictionary:
    ["Yes","No",""]  -- indices:
    [0,0,1,1,2]]

    Set upper limit for the number of categories. If the categories
    is more than the limit, the conversion to dictionary will not
    happen:

    >>> convert_options = csv.ConvertOptions(
    ...                   include_columns=["animals"],
    ...                   auto_dict_encode=True,
    ...                   auto_dict_max_cardinality=2)
    >>> csv.read_csv(io.BytesIO(s.encode()), convert_options=convert_options)
    pyarrow.Table
    animals: string
    ----
    animals: [["Flamingo","Horse","Brittle stars","Centipede",""]]

    Set empty strings to missing values:

    >>> convert_options = csv.ConvertOptions(include_columns=["animals", "n_legs"],
    ...                   strings_can_be_null=True)
    >>> csv.read_csv(io.BytesIO(s.encode()), convert_options=convert_options)
    pyarrow.Table
    animals: string
    n_legs: int64
    ----
    animals: [["Flamingo","Horse","Brittle stars","Centipede",null]]
    n_legs: [[2,4,5,100,6]]

    Define values to be True and False when converting a column
    into a bool type:

    >>> convert_options = csv.ConvertOptions(
    ...                   include_columns=["fast"],
    ...                   false_values=["No"],
    ...                   true_values=["Yes"])
    >>> csv.read_csv(io.BytesIO(s.encode()), convert_options=convert_options)
    pyarrow.Table
    fast: bool
    ----
    fast: [[true,true,false,false,null]]
    """

    # Avoid mistakingly creating attributes
    __slots__ = ()

    def __cinit__(self, *argw, **kwargs):
        self.options.reset(
            new CCSVConvertOptions(CCSVConvertOptions.Defaults()))

    def __init__(self, *, check_utf8=None, column_types=None, null_values=None,
                 true_values=None, false_values=None, decimal_point=None,
                 strings_can_be_null=None, quoted_strings_can_be_null=None,
                 include_columns=None, include_missing_columns=None,
                 auto_dict_encode=None, auto_dict_max_cardinality=None,
                 timestamp_parsers=None):
        if check_utf8 is not None:
            self.check_utf8 = check_utf8
        if column_types is not None:
            self.column_types = column_types
        if null_values is not None:
            self.null_values = null_values
        if true_values is not None:
            self.true_values = true_values
        if false_values is not None:
            self.false_values = false_values
        if decimal_point is not None:
            self.decimal_point = decimal_point
        if strings_can_be_null is not None:
            self.strings_can_be_null = strings_can_be_null
        if quoted_strings_can_be_null is not None:
            self.quoted_strings_can_be_null = quoted_strings_can_be_null
        if include_columns is not None:
            self.include_columns = include_columns
        if include_missing_columns is not None:
            self.include_missing_columns = include_missing_columns
        if auto_dict_encode is not None:
            self.auto_dict_encode = auto_dict_encode
        if auto_dict_max_cardinality is not None:
            self.auto_dict_max_cardinality = auto_dict_max_cardinality
        if timestamp_parsers is not None:
            self.timestamp_parsers = timestamp_parsers

    @property
    def check_utf8(self):
        """
        Whether to check UTF8 validity of string columns.
        """
        return deref(self.options).check_utf8

    @check_utf8.setter
    def check_utf8(self, value):
        deref(self.options).check_utf8 = value

    @property
    def strings_can_be_null(self):
        """
        Whether string / binary columns can have null values.
        """
        return deref(self.options).strings_can_be_null

    @strings_can_be_null.setter
    def strings_can_be_null(self, value):
        deref(self.options).strings_can_be_null = value

    @property
    def quoted_strings_can_be_null(self):
        """
        Whether quoted values can be null.
        """
        return deref(self.options).quoted_strings_can_be_null

    @quoted_strings_can_be_null.setter
    def quoted_strings_can_be_null(self, value):
        deref(self.options).quoted_strings_can_be_null = value

    @property
    def column_types(self):
        """
        Explicitly map column names to column types.
        """
        d = {frombytes(item.first): pyarrow_wrap_data_type(item.second)
             for item in deref(self.options).column_types}
        return d

    @column_types.setter
    def column_types(self, value):
        cdef:
            shared_ptr[CDataType] typ

        if isinstance(value, Mapping):
            value = value.items()

        deref(self.options).column_types.clear()
        for item in value:
            if isinstance(item, Field):
                k = item.name
                v = item.type
            else:
                k, v = item
            typ = pyarrow_unwrap_data_type(ensure_type(v))
            assert typ != NULL
            deref(self.options).column_types[tobytes(k)] = typ

    @property
    def null_values(self):
        """
        A sequence of strings that denote nulls in the data.
        """
        return [frombytes(x) for x in deref(self.options).null_values]

    @null_values.setter
    def null_values(self, value):
        deref(self.options).null_values = [tobytes(x) for x in value]

    @property
    def true_values(self):
        """
        A sequence of strings that denote true booleans in the data.
        """
        return [frombytes(x) for x in deref(self.options).true_values]

    @true_values.setter
    def true_values(self, value):
        deref(self.options).true_values = [tobytes(x) for x in value]

    @property
    def false_values(self):
        """
        A sequence of strings that denote false booleans in the data.
        """
        return [frombytes(x) for x in deref(self.options).false_values]

    @false_values.setter
    def false_values(self, value):
        deref(self.options).false_values = [tobytes(x) for x in value]

    @property
    def decimal_point(self):
        """
        The character used as decimal point in floating-point and decimal
        data.
        """
        return chr(deref(self.options).decimal_point)

    @decimal_point.setter
    def decimal_point(self, value):
        deref(self.options).decimal_point = _single_char(value)

    @property
    def auto_dict_encode(self):
        """
        Whether to try to automatically dict-encode string / binary data.
        """
        return deref(self.options).auto_dict_encode

    @auto_dict_encode.setter
    def auto_dict_encode(self, value):
        deref(self.options).auto_dict_encode = value

    @property
    def auto_dict_max_cardinality(self):
        """
        The maximum dictionary cardinality for `auto_dict_encode`.

        This value is per chunk.
        """
        return deref(self.options).auto_dict_max_cardinality

    @auto_dict_max_cardinality.setter
    def auto_dict_max_cardinality(self, value):
        deref(self.options).auto_dict_max_cardinality = value

    @property
    def include_columns(self):
        """
        The names of columns to include in the Table.

        If empty, the Table will include all columns from the CSV file.
        If not empty, only these columns will be included, in this order.
        """
        return [frombytes(s) for s in deref(self.options).include_columns]

    @include_columns.setter
    def include_columns(self, value):
        deref(self.options).include_columns.clear()
        for item in value:
            deref(self.options).include_columns.push_back(tobytes(item))

    @property
    def include_missing_columns(self):
        """
        If false, columns in `include_columns` but not in the CSV file will
        error out.
        If true, columns in `include_columns` but not in the CSV file will
        produce a null column (whose type is selected using `column_types`,
        or null by default).
        This option is ignored if `include_columns` is empty.
        """
        return deref(self.options).include_missing_columns

    @include_missing_columns.setter
    def include_missing_columns(self, value):
        deref(self.options).include_missing_columns = value

    @property
    def timestamp_parsers(self):
        """
        A sequence of strptime()-compatible format strings, tried in order
        when attempting to infer or convert timestamp values (the special
        value ISO8601() can also be given).  By default, a fast built-in
        ISO-8601 parser is used.
        """
        cdef:
            shared_ptr[CTimestampParser] c_parser
            c_string kind

        parsers = []
        for c_parser in deref(self.options).timestamp_parsers:
            kind = deref(c_parser).kind()
            if kind == b'strptime':
                parsers.append(frombytes(deref(c_parser).format()))
            else:
                assert kind == b'iso8601'
                parsers.append(ISO8601)

        return parsers

    @timestamp_parsers.setter
    def timestamp_parsers(self, value):
        cdef:
            vector[shared_ptr[CTimestampParser]] c_parsers

        for v in value:
            if isinstance(v, str):
                c_parsers.push_back(CTimestampParser.MakeStrptime(tobytes(v)))
            elif v == ISO8601:
                c_parsers.push_back(CTimestampParser.MakeISO8601())
            else:
                raise TypeError("Expected list of str or ISO8601 objects")

        deref(self.options).timestamp_parsers = move(c_parsers)

    @staticmethod
    cdef ConvertOptions wrap(CCSVConvertOptions options):
        out = ConvertOptions()
        out.options.reset(new CCSVConvertOptions(move(options)))
        return out

    def validate(self):
        check_status(deref(self.options).Validate())

    def equals(self, ConvertOptions other):
        """
        Parameters
        ----------
        other : pyarrow.csv.ConvertOptions

        Returns
        -------
        bool
        """
        return (
            self.check_utf8 == other.check_utf8 and
            self.column_types == other.column_types and
            self.null_values == other.null_values and
            self.true_values == other.true_values and
            self.false_values == other.false_values and
            self.decimal_point == other.decimal_point and
            self.timestamp_parsers == other.timestamp_parsers and
            self.strings_can_be_null == other.strings_can_be_null and
            self.quoted_strings_can_be_null ==
            other.quoted_strings_can_be_null and
            self.auto_dict_encode == other.auto_dict_encode and
            self.auto_dict_max_cardinality ==
            other.auto_dict_max_cardinality and
            self.include_columns == other.include_columns and
            self.include_missing_columns == other.include_missing_columns
        )

    def __getstate__(self):
        return (self.check_utf8, self.column_types, self.null_values,
                self.true_values, self.false_values, self.decimal_point,
                self.timestamp_parsers, self.strings_can_be_null,
                self.quoted_strings_can_be_null, self.auto_dict_encode,
                self.auto_dict_max_cardinality, self.include_columns,
                self.include_missing_columns)

    def __setstate__(self, state):
        (self.check_utf8, self.column_types, self.null_values,
         self.true_values, self.false_values, self.decimal_point,
         self.timestamp_parsers, self.strings_can_be_null,
         self.quoted_strings_can_be_null, self.auto_dict_encode,
         self.auto_dict_max_cardinality, self.include_columns,
         self.include_missing_columns) = state

    def __eq__(self, other):
        try:
            return self.equals(other)
        except TypeError:
            return False


cdef _get_reader(input_file, ReadOptions read_options,
                 shared_ptr[CInputStream]* out):
    use_memory_map = False
    get_input_stream(input_file, use_memory_map, out)
    if read_options is not None:
        out[0] = native_transcoding_input_stream(out[0],
                                                 read_options.encoding,
                                                 'utf8')


cdef _get_read_options(ReadOptions read_options, CCSVReadOptions* out):
    if read_options is None:
        out[0] = CCSVReadOptions.Defaults()
    else:
        out[0] = deref(read_options.options)


cdef _get_parse_options(ParseOptions parse_options, CCSVParseOptions* out):
    if parse_options is None:
        out[0] = CCSVParseOptions.Defaults()
    else:
        out[0] = deref(parse_options.options)


cdef _get_convert_options(ConvertOptions convert_options,
                          CCSVConvertOptions* out):
    if convert_options is None:
        out[0] = CCSVConvertOptions.Defaults()
    else:
        out[0] = deref(convert_options.options)


cdef class CSVStreamingReader(RecordBatchReader):
    """An object that reads record batches incrementally from a CSV file.

    Should not be instantiated directly by user code.
    """
    cdef readonly:
        Schema schema

    def __init__(self):
        raise TypeError("Do not call {}'s constructor directly, "
                        "use pyarrow.csv.open_csv() instead."
                        .format(self.__class__.__name__))

    # Note about cancellation: we cannot create a SignalStopHandler
    # by default here, as several CSVStreamingReader instances may be
    # created (including by the same thread).  Handling cancellation
    # would require having the user pass the SignalStopHandler.
    # (in addition to solving ARROW-11853)

    cdef _open(self, shared_ptr[CInputStream] stream,
               CCSVReadOptions c_read_options,
               CCSVParseOptions c_parse_options,
               CCSVConvertOptions c_convert_options,
               MemoryPool memory_pool):
        cdef:
            shared_ptr[CSchema] c_schema
            CIOContext io_context

        io_context = CIOContext(maybe_unbox_memory_pool(memory_pool))

        with nogil:
            self.reader = <shared_ptr[CRecordBatchReader]> GetResultValue(
                CCSVStreamingReader.Make(
                    io_context, stream,
                    move(c_read_options), move(c_parse_options),
                    move(c_convert_options)))
            c_schema = self.reader.get().schema()

        self.schema = pyarrow_wrap_schema(c_schema)


def read_csv(input_file, read_options=None, parse_options=None,
             convert_options=None, MemoryPool memory_pool=None):
    """
    Read a Table from a stream of CSV data.

    Parameters
    ----------
    input_file : string, path or file-like object
        The location of CSV data.  If a string or path, and if it ends
        with a recognized compressed file extension (e.g. ".gz" or ".bz2"),
        the data is automatically decompressed when reading.
    read_options : pyarrow.csv.ReadOptions, optional
        Options for the CSV reader (see pyarrow.csv.ReadOptions constructor
        for defaults)
    parse_options : pyarrow.csv.ParseOptions, optional
        Options for the CSV parser
        (see pyarrow.csv.ParseOptions constructor for defaults)
    convert_options : pyarrow.csv.ConvertOptions, optional
        Options for converting CSV data
        (see pyarrow.csv.ConvertOptions constructor for defaults)
    memory_pool : MemoryPool, optional
        Pool to allocate Table memory from

    Returns
    -------
    :class:`pyarrow.Table`
        Contents of the CSV file as a in-memory table.

    Examples
    --------

    Defining an example file from bytes object:

    >>> import io
    >>> s = (
    ...     "animals,n_legs,entry\\n"
    ...     "Flamingo,2,2022-03-01\\n"
    ...     "Horse,4,2022-03-02\\n"
    ...     "Brittle stars,5,2022-03-03\\n"
    ...     "Centipede,100,2022-03-04"
    ... )
    >>> print(s)
    animals,n_legs,entry
    Flamingo,2,2022-03-01
    Horse,4,2022-03-02
    Brittle stars,5,2022-03-03
    Centipede,100,2022-03-04
    >>> source = io.BytesIO(s.encode())

    Reading from the file

    >>> from pyarrow import csv
    >>> csv.read_csv(source)
    pyarrow.Table
    animals: string
    n_legs: int64
    entry: date32[day]
    ----
    animals: [["Flamingo","Horse","Brittle stars","Centipede"]]
    n_legs: [[2,4,5,100]]
    entry: [[2022-03-01,2022-03-02,2022-03-03,2022-03-04]]
    """
    cdef:
        shared_ptr[CInputStream] stream
        CCSVReadOptions c_read_options
        CCSVParseOptions c_parse_options
        CCSVConvertOptions c_convert_options
        CIOContext io_context
        SharedPtrNoGIL[CCSVReader] reader
        shared_ptr[CTable] table

    _get_reader(input_file, read_options, &stream)
    _get_read_options(read_options, &c_read_options)
    _get_parse_options(parse_options, &c_parse_options)
    _get_convert_options(convert_options, &c_convert_options)

    with SignalStopHandler() as stop_handler:
        io_context = CIOContext(
            maybe_unbox_memory_pool(memory_pool),
            (<StopToken> stop_handler.stop_token).stop_token)
        reader = GetResultValue(CCSVReader.Make(
            io_context, stream,
            c_read_options, c_parse_options, c_convert_options))

        with nogil:
            table = GetResultValue(reader.get().Read())

    return pyarrow_wrap_table(table)


def open_csv(input_file, read_options=None, parse_options=None,
             convert_options=None, MemoryPool memory_pool=None):
    """
    Open a streaming reader of CSV data.

    Reading using this function is always single-threaded.

    Parameters
    ----------
    input_file : string, path or file-like object
        The location of CSV data.  If a string or path, and if it ends
        with a recognized compressed file extension (e.g. ".gz" or ".bz2"),
        the data is automatically decompressed when reading.
    read_options : pyarrow.csv.ReadOptions, optional
        Options for the CSV reader (see pyarrow.csv.ReadOptions constructor
        for defaults)
    parse_options : pyarrow.csv.ParseOptions, optional
        Options for the CSV parser
        (see pyarrow.csv.ParseOptions constructor for defaults)
    convert_options : pyarrow.csv.ConvertOptions, optional
        Options for converting CSV data
        (see pyarrow.csv.ConvertOptions constructor for defaults)
    memory_pool : MemoryPool, optional
        Pool to allocate Table memory from

    Returns
    -------
    :class:`pyarrow.csv.CSVStreamingReader`
    """
    cdef:
        shared_ptr[CInputStream] stream
        CCSVReadOptions c_read_options
        CCSVParseOptions c_parse_options
        CCSVConvertOptions c_convert_options
        CSVStreamingReader reader

    _get_reader(input_file, read_options, &stream)
    _get_read_options(read_options, &c_read_options)
    _get_parse_options(parse_options, &c_parse_options)
    _get_convert_options(convert_options, &c_convert_options)

    reader = CSVStreamingReader.__new__(CSVStreamingReader)
    reader._open(stream, move(c_read_options), move(c_parse_options),
                 move(c_convert_options), memory_pool)
    return reader


def _raise_invalid_function_option(value, description, *,
                                   exception_class=ValueError):
    raise exception_class(f"\"{value}\" is not a valid {description}")


cdef CQuotingStyle unwrap_quoting_style(quoting_style) except *:
    if quoting_style == "needed":
        return CQuotingStyle_Needed
    elif quoting_style == "all_valid":
        return CQuotingStyle_AllValid
    elif quoting_style == "none":
        return CQuotingStyle_None
    _raise_invalid_function_option(quoting_style, "quoting style")


cdef wrap_quoting_style(quoting_style):
    if quoting_style == CQuotingStyle_Needed:
        return 'needed'
    elif quoting_style == CQuotingStyle_AllValid:
        return 'all_valid'
    elif quoting_style == CQuotingStyle_None:
        return 'none'


cdef class WriteOptions(_Weakrefable):
    """
    Options for writing CSV files.

    Parameters
    ----------
    include_header : bool, optional (default True)
        Whether to write an initial header line with column names
    batch_size : int, optional (default 1024)
        How many rows to process together when converting and writing
        CSV data
    delimiter : 1-character string, optional (default ",")
        The character delimiting individual cells in the CSV data.
    quoting_style : str, optional (default "needed")
        Whether to quote values, and if so, which quoting style to use.
        The following values are accepted:

        - "needed" (default): only enclose values in quotes when needed.
        - "all_valid": enclose all valid values in quotes; nulls are not quoted.
        - "none": do not enclose any values in quotes; values containing
          special characters (such as quotes, cell delimiters or line endings)
          will raise an error.
    """

    # Avoid mistakingly creating attributes
    __slots__ = ()

    def __init__(self, *, include_header=None, batch_size=None,
                 delimiter=None, quoting_style=None):
        self.options.reset(new CCSVWriteOptions(CCSVWriteOptions.Defaults()))
        if include_header is not None:
            self.include_header = include_header
        if batch_size is not None:
            self.batch_size = batch_size
        if delimiter is not None:
            self.delimiter = delimiter
        if quoting_style is not None:
            self.quoting_style = quoting_style

    @property
    def include_header(self):
        """
        Whether to write an initial header line with column names.
        """
        return deref(self.options).include_header

    @include_header.setter
    def include_header(self, value):
        deref(self.options).include_header = value

    @property
    def batch_size(self):
        """
        How many rows to process together when converting and writing
        CSV data.
        """
        return deref(self.options).batch_size

    @batch_size.setter
    def batch_size(self, value):
        deref(self.options).batch_size = value

    @property
    def delimiter(self):
        """
        The character delimiting individual cells in the CSV data.
        """
        return chr(deref(self.options).delimiter)

    @delimiter.setter
    def delimiter(self, value):
        deref(self.options).delimiter = _single_char(value)

    @property
    def quoting_style(self):
        """
        Whether to quote values, and if so, which quoting style to use.
        The following values are accepted:

        - "needed" (default): only enclose values in quotes when needed.
        - "all_valid": enclose all valid values in quotes; nulls are not quoted.
        - "none": do not enclose any values in quotes; values containing
          special characters (such as quotes, cell delimiters or line endings)
          will raise an error.
        """
        return wrap_quoting_style(deref(self.options).quoting_style)

    @quoting_style.setter
    def quoting_style(self, value):
        deref(self.options).quoting_style = unwrap_quoting_style(value)

    @staticmethod
    cdef WriteOptions wrap(CCSVWriteOptions options):
        out = WriteOptions()
        out.options.reset(new CCSVWriteOptions(move(options)))
        return out

    def validate(self):
        check_status(self.options.get().Validate())


cdef _get_write_options(WriteOptions write_options, CCSVWriteOptions* out):
    if write_options is None:
        out[0] = CCSVWriteOptions.Defaults()
    else:
        out[0] = deref(write_options.options)


def write_csv(data, output_file, write_options=None,
              MemoryPool memory_pool=None):
    """
    Write record batch or table to a CSV file.

    Parameters
    ----------
    data : pyarrow.RecordBatch or pyarrow.Table
        The data to write.
    output_file : string, path, pyarrow.NativeFile, or file-like object
        The location where to write the CSV data.
    write_options : pyarrow.csv.WriteOptions
        Options to configure writing the CSV data.
    memory_pool : MemoryPool, optional
        Pool for temporary allocations.

    Examples
    --------

    >>> import pyarrow as pa
    >>> from pyarrow import csv

    >>> legs = pa.array([2, 4, 5, 100])
    >>> animals = pa.array(["Flamingo", "Horse", "Brittle stars", "Centipede"])
    >>> entry_date = pa.array(["01/03/2022", "02/03/2022",
    ...                        "03/03/2022", "04/03/2022"])
    >>> table = pa.table([animals, legs, entry_date],
    ...                  names=["animals", "n_legs", "entry"])

    >>> csv.write_csv(table, "animals.csv")

    >>> write_options = csv.WriteOptions(include_header=False)
    >>> csv.write_csv(table, "animals.csv", write_options=write_options)

    >>> write_options = csv.WriteOptions(delimiter=";")
    >>> csv.write_csv(table, "animals.csv", write_options=write_options)
    """
    cdef:
        shared_ptr[COutputStream] stream
        CCSVWriteOptions c_write_options
        CMemoryPool* c_memory_pool
        CRecordBatch* batch
        CTable* table
    _get_write_options(write_options, &c_write_options)

    get_writer(output_file, &stream)
    c_memory_pool = maybe_unbox_memory_pool(memory_pool)
    c_write_options.io_context = CIOContext(c_memory_pool)
    if isinstance(data, RecordBatch):
        batch = pyarrow_unwrap_batch(data).get()
        with nogil:
            check_status(WriteCSV(deref(batch), c_write_options, stream.get()))
    elif isinstance(data, Table):
        table = pyarrow_unwrap_table(data).get()
        with nogil:
            check_status(WriteCSV(deref(table), c_write_options, stream.get()))
    else:
        raise TypeError(f"Expected Table or RecordBatch, got '{type(data)}'")


cdef class CSVWriter(_CRecordBatchWriter):
    """
    Writer to create a CSV file.

    Parameters
    ----------
    sink : str, path, pyarrow.OutputStream or file-like object
        The location where to write the CSV data.
    schema : pyarrow.Schema
        The schema of the data to be written.
    write_options : pyarrow.csv.WriteOptions
        Options to configure writing the CSV data.
    memory_pool : MemoryPool, optional
        Pool for temporary allocations.
    """

    def __init__(self, sink, Schema schema, *,
                 WriteOptions write_options=None, MemoryPool memory_pool=None):
        cdef:
            shared_ptr[COutputStream] c_stream
            shared_ptr[CSchema] c_schema = pyarrow_unwrap_schema(schema)
            CCSVWriteOptions c_write_options
            CMemoryPool* c_memory_pool = maybe_unbox_memory_pool(memory_pool)
        _get_write_options(write_options, &c_write_options)
        c_write_options.io_context = CIOContext(c_memory_pool)
        get_writer(sink, &c_stream)
        with nogil:
            self.writer = GetResultValue(MakeCSVWriter(
                c_stream, c_schema, c_write_options))