# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# cython: profile=False
# distutils: language = c++
# cython: language_level = 3
from cython.operator cimport dereference as deref
from collections import namedtuple
from collections.abc import Mapping
from pyarrow.includes.common cimport *
from pyarrow.includes.libarrow cimport *
from pyarrow.includes.libarrow_python cimport *
from pyarrow.lib cimport (check_status, Field, MemoryPool, Schema,
RecordBatchReader, ensure_type,
maybe_unbox_memory_pool, get_input_stream,
get_writer, native_transcoding_input_stream,
pyarrow_unwrap_batch, pyarrow_unwrap_schema,
pyarrow_unwrap_table, pyarrow_wrap_schema,
pyarrow_wrap_table, pyarrow_wrap_data_type,
pyarrow_unwrap_data_type, Table, RecordBatch,
StopToken, _CRecordBatchWriter)
from pyarrow.lib import frombytes, tobytes, SignalStopHandler
cdef unsigned char _single_char(s) except 0:
val = ord(s)
if val == 0 or val > 127:
raise ValueError("Expecting an ASCII character")
return <unsigned char> val
_InvalidRow = namedtuple(
"_InvalidRow", ("expected_columns", "actual_columns", "number", "text"),
class InvalidRow(_InvalidRow):
Description of an invalid row in a CSV file.
expected_columns : int
The expected number of columns in the row.
actual_columns : int
The actual number of columns in the row.
number : int or None
The physical row number if known, otherwise None.
text : str
The contents of the row.
__slots__ = ()
cdef CInvalidRowResult _handle_invalid_row(
handler, const CCSVInvalidRow& c_row) except CInvalidRowResult_Error:
# A negative row number means undetermined (because of parallel reading)
row_number = c_row.number if c_row.number >= 0 else None
row = InvalidRow(c_row.expected_columns, c_row.actual_columns,
row_number, frombytes(<c_string> c_row.text))
result = handler(row)
if result == 'error':
return CInvalidRowResult_Error
elif result == 'skip':
return CInvalidRowResult_Skip
raise ValueError("Invalid return value for invalid row handler: "
f"expected 'error' or 'skip', got {result!r}")
cdef class ReadOptions(_Weakrefable):
Options for reading CSV files.
use_threads : bool, optional (default True)
Whether to use multiple threads to accelerate reading
block_size : int, optional
How much bytes to process at a time from the input stream.
This will determine multi-threading granularity as well as
the size of individual record batches or table chunks.
Minimum valid value for block size is 1
skip_rows : int, optional (default 0)
The number of rows to skip before the column names (if any)
and the CSV data.
skip_rows_after_names : int, optional (default 0)
The number of rows to skip after the column names.
This number can be larger than the number of rows in one
block, and empty rows are counted.
The order of application is as follows:
- `skip_rows` is applied (if non-zero);
- column names are read (unless `column_names` is set);
- `skip_rows_after_names` is applied (if non-zero).
column_names : list, optional
The column names of the target table. If empty, fall back on
autogenerate_column_names : bool, optional (default False)
Whether to autogenerate column names if `column_names` is empty.
If true, column names will be of the form "f0", "f1"...
If false, column names will be read from the first CSV row
after `skip_rows`.
encoding : str, optional (default 'utf8')
The character encoding of the CSV data. Columns that cannot
decode using this encoding can still be read as Binary.
Defining an example data:
>>> import io
>>> s = "1,2,3\\nFlamingo,2,2022-03-01\\nHorse,4,2022-03-02\\nBrittle stars,5,2022-03-03\\nCentipede,100,2022-03-04"
>>> print(s)
Brittle stars,5,2022-03-03
Ignore the first numbered row and substitute it with defined
or autogenerated column names:
>>> from pyarrow import csv
>>> read_options = csv.ReadOptions(
... column_names=["animals", "n_legs", "entry"],
... skip_rows=1)
>>> csv.read_csv(io.BytesIO(s.encode()), read_options=read_options)
animals: string
n_legs: int64
entry: date32[day]
animals: [["Flamingo","Horse","Brittle stars","Centipede"]]
n_legs: [[2,4,5,100]]
entry: [[2022-03-01,2022-03-02,2022-03-03,2022-03-04]]
>>> read_options = csv.ReadOptions(autogenerate_column_names=True,
... skip_rows=1)
>>> csv.read_csv(io.BytesIO(s.encode()), read_options=read_options)
f0: string
f1: int64
f2: date32[day]
f0: [["Flamingo","Horse","Brittle stars","Centipede"]]
f1: [[2,4,5,100]]
f2: [[2022-03-01,2022-03-02,2022-03-03,2022-03-04]]
Remove the first 2 rows of the data:
>>> read_options = csv.ReadOptions(skip_rows_after_names=2)
>>> csv.read_csv(io.BytesIO(s.encode()), read_options=read_options)
1: string
2: int64
3: date32[day]
1: [["Brittle stars","Centipede"]]
2: [[5,100]]
3: [[2022-03-03,2022-03-04]]
# Avoid mistakingly creating attributes
__slots__ = ()
# __init__() is not called when unpickling, initialize storage here
def __cinit__(self, *argw, **kwargs):
self.options.reset(new CCSVReadOptions(CCSVReadOptions.Defaults()))
def __init__(self, *, use_threads=None, block_size=None, skip_rows=None,
skip_rows_after_names=None, column_names=None,
autogenerate_column_names=None, encoding='utf8'):
if use_threads is not None:
self.use_threads = use_threads
if block_size is not None:
self.block_size = block_size
if skip_rows is not None:
self.skip_rows = skip_rows
if skip_rows_after_names is not None:
self.skip_rows_after_names = skip_rows_after_names
if column_names is not None:
self.column_names = column_names
if autogenerate_column_names is not None:
self.autogenerate_column_names= autogenerate_column_names
# Python-specific option
self.encoding = encoding
def use_threads(self):
Whether to use multiple threads to accelerate reading.
return deref(self.options).use_threads
def use_threads(self, value):
deref(self.options).use_threads = value
def block_size(self):
How much bytes to process at a time from the input stream.
This will determine multi-threading granularity as well as
the size of individual record batches or table chunks.
return deref(self.options).block_size
def block_size(self, value):
deref(self.options).block_size = value
def skip_rows(self):
The number of rows to skip before the column names (if any)
and the CSV data.
See `skip_rows_after_names` for interaction description
return deref(self.options).skip_rows
def skip_rows(self, value):
deref(self.options).skip_rows = value
def skip_rows_after_names(self):
The number of rows to skip after the column names.
This number can be larger than the number of rows in one
block, and empty rows are counted.
The order of application is as follows:
- `skip_rows` is applied (if non-zero);
- column names are read (unless `column_names` is set);
- `skip_rows_after_names` is applied (if non-zero).
return deref(self.options).skip_rows_after_names
def skip_rows_after_names(self, value):
deref(self.options).skip_rows_after_names = value
def column_names(self):
The column names of the target table. If empty, fall back on
return [frombytes(s) for s in deref(self.options).column_names]
def column_names(self, value):
for item in value:
def autogenerate_column_names(self):
Whether to autogenerate column names if `column_names` is empty.
If true, column names will be of the form "f0", "f1"...
If false, column names will be read from the first CSV row
after `skip_rows`.
return deref(self.options).autogenerate_column_names
def autogenerate_column_names(self, value):
deref(self.options).autogenerate_column_names = value
def validate(self):
def equals(self, ReadOptions other):
other : pyarrow.csv.ReadOptions
return (
self.use_threads == other.use_threads and
self.block_size == other.block_size and
self.skip_rows == other.skip_rows and
self.skip_rows_after_names == other.skip_rows_after_names and
self.column_names == other.column_names and
self.autogenerate_column_names ==
other.autogenerate_column_names and
self.encoding == other.encoding
cdef ReadOptions wrap(CCSVReadOptions options):
out = ReadOptions()
out.options.reset(new CCSVReadOptions(move(options)))
out.encoding = 'utf8' # No way to know this
return out
def __getstate__(self):
return (self.use_threads, self.block_size, self.skip_rows,
self.column_names, self.autogenerate_column_names,
self.encoding, self.skip_rows_after_names)
def __setstate__(self, state):
(self.use_threads, self.block_size, self.skip_rows,
self.column_names, self.autogenerate_column_names,
self.encoding, self.skip_rows_after_names) = state
def __eq__(self, other):
return self.equals(other)
except TypeError:
return False
cdef class ParseOptions(_Weakrefable):
Options for parsing CSV files.
delimiter : 1-character string, optional (default ',')
The character delimiting individual cells in the CSV data.
quote_char : 1-character string or False, optional (default '"')
The character used optionally for quoting CSV values
Loading ...