# -*- coding: utf-8 -*-
"""
Module for formatting output data into CSV files.
"""
from __future__ import print_function
import csv as csvlib
import os
import warnings
from zipfile import ZipFile
import numpy as np
from pandas._libs import writers as libwriters
from pandas.compat import StringIO, range, zip
from pandas.core.dtypes.generic import (
ABCDatetimeIndex, ABCIndexClass, ABCMultiIndex, ABCPeriodIndex)
from pandas.core.dtypes.missing import notna
from pandas import compat
from pandas.io.common import (
UnicodeWriter, _get_handle, _infer_compression, get_filepath_or_buffer)
class CSVFormatter(object):
def __init__(self, obj, path_or_buf=None, sep=",", na_rep='',
float_format=None, cols=None, header=True, index=True,
index_label=None, mode='w', nanRep=None, encoding=None,
compression='infer', quoting=None, line_terminator='\n',
chunksize=None, tupleize_cols=False, quotechar='"',
date_format=None, doublequote=True, escapechar=None,
decimal='.'):
self.obj = obj
if path_or_buf is None:
path_or_buf = StringIO()
self.path_or_buf, _, _, _ = get_filepath_or_buffer(
path_or_buf, encoding=encoding, compression=compression, mode=mode
)
self.sep = sep
self.na_rep = na_rep
self.float_format = float_format
self.decimal = decimal
self.header = header
self.index = index
self.index_label = index_label
self.mode = mode
if encoding is None:
encoding = 'ascii' if compat.PY2 else 'utf-8'
self.encoding = encoding
self.compression = _infer_compression(self.path_or_buf, compression)
if quoting is None:
quoting = csvlib.QUOTE_MINIMAL
self.quoting = quoting
if quoting == csvlib.QUOTE_NONE:
# prevents crash in _csv
quotechar = None
self.quotechar = quotechar
self.doublequote = doublequote
self.escapechar = escapechar
self.line_terminator = line_terminator or os.linesep
self.date_format = date_format
self.tupleize_cols = tupleize_cols
self.has_mi_columns = (isinstance(obj.columns, ABCMultiIndex) and
not self.tupleize_cols)
# validate mi options
if self.has_mi_columns:
if cols is not None:
raise TypeError("cannot specify cols with a MultiIndex on the "
"columns")
if cols is not None:
if isinstance(cols, ABCIndexClass):
cols = cols.to_native_types(na_rep=na_rep,
float_format=float_format,
date_format=date_format,
quoting=self.quoting)
else:
cols = list(cols)
self.obj = self.obj.loc[:, cols]
# update columns to include possible multiplicity of dupes
# and make sure sure cols is just a list of labels
cols = self.obj.columns
if isinstance(cols, ABCIndexClass):
cols = cols.to_native_types(na_rep=na_rep,
float_format=float_format,
date_format=date_format,
quoting=self.quoting)
else:
cols = list(cols)
# save it
self.cols = cols
# preallocate data 2d list
self.blocks = self.obj._data.blocks
ncols = sum(b.shape[0] for b in self.blocks)
self.data = [None] * ncols
if chunksize is None:
chunksize = (100000 // (len(self.cols) or 1)) or 1
self.chunksize = int(chunksize)
self.data_index = obj.index
if (isinstance(self.data_index, (ABCDatetimeIndex, ABCPeriodIndex)) and
date_format is not None):
from pandas import Index
self.data_index = Index([x.strftime(date_format) if notna(x) else
'' for x in self.data_index])
self.nlevels = getattr(self.data_index, 'nlevels', 1)
if not index:
self.nlevels = 0
def save(self):
"""
Create the writer & save
"""
# GH21227 internal compression is not used when file-like passed.
if self.compression and hasattr(self.path_or_buf, 'write'):
msg = ("compression has no effect when passing file-like "
"object as input.")
warnings.warn(msg, RuntimeWarning, stacklevel=2)
# when zip compression is called.
is_zip = isinstance(self.path_or_buf, ZipFile) or (
not hasattr(self.path_or_buf, 'write')
and self.compression == 'zip')
if is_zip:
# zipfile doesn't support writing string to archive. uses string
# buffer to receive csv writing and dump into zip compression
# file handle. GH21241, GH21118
f = StringIO()
close = False
elif hasattr(self.path_or_buf, 'write'):
f = self.path_or_buf
close = False
else:
f, handles = _get_handle(self.path_or_buf, self.mode,
encoding=self.encoding,
compression=self.compression)
close = True
try:
writer_kwargs = dict(lineterminator=self.line_terminator,
delimiter=self.sep, quoting=self.quoting,
doublequote=self.doublequote,
escapechar=self.escapechar,
quotechar=self.quotechar)
if self.encoding == 'ascii':
self.writer = csvlib.writer(f, **writer_kwargs)
else:
writer_kwargs['encoding'] = self.encoding
self.writer = UnicodeWriter(f, **writer_kwargs)
self._save()
finally:
if is_zip:
# GH17778 handles zip compression separately.
buf = f.getvalue()
if hasattr(self.path_or_buf, 'write'):
self.path_or_buf.write(buf)
else:
f, handles = _get_handle(self.path_or_buf, self.mode,
encoding=self.encoding,
compression=self.compression)
f.write(buf)
close = True
if close:
f.close()
for _fh in handles:
_fh.close()
def _save_header(self):
writer = self.writer
obj = self.obj
index_label = self.index_label
cols = self.cols
has_mi_columns = self.has_mi_columns
header = self.header
encoded_labels = []
has_aliases = isinstance(header, (tuple, list, np.ndarray,
ABCIndexClass))
if not (has_aliases or self.header):
return
if has_aliases:
if len(header) != len(cols):
raise ValueError(('Writing {ncols} cols but got {nalias} '
'aliases'.format(ncols=len(cols),
nalias=len(header))))
else:
write_cols = header
else:
write_cols = cols
if self.index:
# should write something for index label
if index_label is not False:
if index_label is None:
if isinstance(obj.index, ABCMultiIndex):
index_label = []
for i, name in enumerate(obj.index.names):
if name is None:
name = ''
index_label.append(name)
else:
index_label = obj.index.name
if index_label is None:
index_label = ['']
else:
index_label = [index_label]
elif not isinstance(index_label,
(list, tuple, np.ndarray, ABCIndexClass)):
# given a string for a DF with Index
index_label = [index_label]
encoded_labels = list(index_label)
else:
encoded_labels = []
if not has_mi_columns or has_aliases:
encoded_labels += list(write_cols)
writer.writerow(encoded_labels)
else:
# write out the mi
columns = obj.columns
# write out the names for each level, then ALL of the values for
# each level
for i in range(columns.nlevels):
# we need at least 1 index column to write our col names
col_line = []
if self.index:
# name is the first column
col_line.append(columns.names[i])
if isinstance(index_label, list) and len(index_label) > 1:
col_line.extend([''] * (len(index_label) - 1))
col_line.extend(columns._get_level_values(i))
writer.writerow(col_line)
# Write out the index line if it's not empty.
# Otherwise, we will print out an extraneous
# blank line between the mi and the data rows.
if encoded_labels and set(encoded_labels) != {''}:
encoded_labels.extend([''] * len(columns))
writer.writerow(encoded_labels)
def _save(self):
self._save_header()
nrows = len(self.data_index)
# write in chunksize bites
chunksize = self.chunksize
chunks = int(nrows / chunksize) + 1
for i in range(chunks):
start_i = i * chunksize
end_i = min((i + 1) * chunksize, nrows)
if start_i >= end_i:
break
self._save_chunk(start_i, end_i)
def _save_chunk(self, start_i, end_i):
data_index = self.data_index
# create the data for a chunk
slicer = slice(start_i, end_i)
for i in range(len(self.blocks)):
b = self.blocks[i]
d = b.to_native_types(slicer=slicer, na_rep=self.na_rep,
float_format=self.float_format,
decimal=self.decimal,
date_format=self.date_format,
quoting=self.quoting)
for col_loc, col in zip(b.mgr_locs, d):
# self.data is a preallocated list
self.data[col_loc] = col
ix = data_index.to_native_types(slicer=slicer, na_rep=self.na_rep,
float_format=self.float_format,
decimal=self.decimal,
date_format=self.date_format,
quoting=self.quoting)
libwriters.write_csv_rows(self.data, ix, self.nlevels,
self.cols, self.writer)