Repository URL to install this package:
|
Version:
1.0.10 ▾
|
# -*- coding: utf-8 -*-
"""
unicsv is a python module which wraps the standard csv module to provide
unicode support. All encodings default to utf-8 however the module supports
reading from and writing to any encoding that python's codecs module
supports.
"""
import csv
import unicodedata
import codecs
from past.types import unicode
from six import PY2
from six.moves import cStringIO as StringIO
# Use Normal Form C for unicode normalization as per reccomendation by
# "The W3C Character Model for the World Wide Web, Part II: Normalization" and
# other W3C Specifications: http://unicode.org/reports/tr15/
NORMAL_FORM_C = "NFC"
def _encode_value_py2(value):
return unicode(value).encode("utf-8") if value is not None else b""
def _encode_value_py3(value):
return unicode(value) if value is not None else ""
def _decode_value_py2(value):
return value.decode("utf-8")
def _decode_value_py3(value):
return value
if PY2:
_encode_value = _encode_value_py2
_decode_value = _decode_value_py2
else:
_encode_value = _encode_value_py3
_decode_value = _decode_value_py3
class UTF8Recoder(object):
"""
Iterator that reads an encoded stream and reencodes the input to UTF-8
"""
def __init__(self, f, encoding):
"""
Initialize the UTF8Recorder
:param f: Input stream to turn into a reader
:type f: file or stream
:param encoding: The encoding of input f
:type encoding: str or unicode
"""
self.reader = codecs.getreader(encoding)(f)
def __iter__(self):
return self
def __next__(self):
return self._next()
def next(self):
return self._next()
def _next(self):
"""
Return next value from stream encoded as utf-8 byte string
"""
return self.reader.next().encode("utf-8")
class UnicodeReader(object):
"""
A CSV reader which will iterate over lines in the CSV file "f",
which is encoded in the given encoding and yield lists of unicode values
normalized to Normalization Form C
"""
def __init__(self, f, dialect=csv.excel, encoding="utf-8", **kwargs):
"""
Initialize the UnicodeReader
:param f: Input stream to reader
:type f: file or stream
:param dialect: Define a set of parameters specific to a particular
CSV dialect
:type dialect: csv.Dialect
:param encoding: The encoding of input f
:type encoding: str or unicode
:param kwargs: Additional formatting parameters for csv.reader
"""
f = UTF8Recoder(f, encoding)
self.reader = csv.reader(f, dialect=dialect, **kwargs)
self.line_num = 0
def __next__(self):
return self._next()
def next(self):
return self._next()
def _next(self):
"""
Return the next row of cells from reader
:returns: List of unicode objects that represent the contents of
cells
"""
row = next(self.reader)
self.line_num = self.reader.line_num
return [unicodedata.normalize(NORMAL_FORM_C, unicode(s, "utf-8")) for s in row]
def __iter__(self):
return self
class UnicodeWriter(object):
"""
A CSV writer which will write rows to CSV file "f",
which is encoded in the given encoding.
"""
def __init__(self, f, dialect=csv.excel, encoding="utf-8", **kwargs):
"""
Initialize the UnicodeWriter
:param f: Output stream to write to
:type f: file or stream
:param dialect: Define a set of parameters specific to a particular
CSV dialect
:type dialect: csv.Dialect
:param encoding: The encoding of output stream f. It is *strongly*
encouraged to use the default, utf-8.
:type encoding: str or unicode
:param kwargs: Additional formatting parameters for csv.writer
"""
# Redirect output to a queue
self.queue = StringIO()
self.writer = csv.writer(self.queue, dialect=dialect, **kwargs)
self.stream = f
self.encoder = codecs.getincrementalencoder(encoding)()
def writerow(self, row):
"""
Write a row to the UnicodeWriter's output stream
The row is formatted according to the current dialect and encoded
in the UnicodeWriter's specified encoding
:param row: The row to write out to the stream
:type row: list of unicode objects or objects that can be casted to
unicode
"""
self.writer.writerow([_encode_value(u) for u in row])
# Fetch UTF-8 output from the queue ...
data = self.queue.getvalue()
data = _decode_value(data)
data = unicodedata.normalize(NORMAL_FORM_C, data)
# ... and reencode it into the target encoding
data = self.encoder.encode(data)
# write to the target stream
self.stream.write(data)
# empty queue
self.queue.truncate(0)
self.queue.seek(0)
def writerows(self, rows):
"""
Write multiple rows of data to the UnicodeWriter's output stream
:param rows: a list of csv rows to write
:type rows: list of lists of unicode objects
"""
for row in rows:
self.writerow(row)
##########################################################################
# The following was copied from csv as of version 2.7.9. I've replaced #
# instances of csv.reader with unicsv.UnicodeReader and instances of #
# csv.writer with unicsv.UnicodeWriter #
##########################################################################
class UnicodeDictReader(object):
def __init__(
self,
f,
fieldnames=None,
restkey=None,
restval=None,
dialect=csv.excel,
encoding="utf-8",
*args,
**kwds
):
self._fieldnames = fieldnames # list of keys for the dict
self.restkey = restkey # key to catch long rows
self.restval = restval # default value for short rows
self.reader = UnicodeReader(f, dialect, encoding, *args, **kwds)
self.dialect = dialect
self.line_num = 0
def __iter__(self):
return self
@property
def fieldnames(self):
if self._fieldnames is None:
try:
self._fieldnames = next(self.reader)
except StopIteration:
pass
self.line_num = self.reader.line_num
return self._fieldnames
# Issue 20004: Because DictReader is a classic class, this setter is
# ignored. At this point in 2.7's lifecycle, it is too late to change the
# base class for fear of breaking working code. If you want to change
# fieldnames without overwriting the getter, set _fieldnames directly.
@fieldnames.setter
def fieldnames(self, value):
self._fieldnames = value
def next(self):
if self.line_num == 0:
# Used only for its side effect.
self.fieldnames
row = next(self.reader)
self.line_num = self.reader.line_num
# unlike the basic reader, we prefer not to return blanks,
# because we will typically wind up with a dict full of None
# values
while row == []:
row = next(self.reader)
d = dict(zip(self.fieldnames, row))
lf = len(self.fieldnames)
lr = len(row)
if lf < lr:
d[self.restkey] = row[lf:]
elif lf > lr:
for key in self.fieldnames[lr:]:
d[key] = self.restval
return d
class UnicodeDictWriter(object):
def __init__(
self,
f,
fieldnames,
restval="",
extrasaction="raise",
dialect=csv.excel,
encoding="utf-8",
*args,
**kwds
):
self.fieldnames = fieldnames # list of keys for the dict
self.restval = restval # for writing short dicts
if extrasaction.lower() not in ("raise", "ignore"):
raise ValueError(
u"extrasaction ({0}) must be 'raise' or 'ignore'".format(extrasaction)
)
self.extrasaction = extrasaction
self.writer = UnicodeWriter(f, dialect, encoding, *args, **kwds)
def writeheader(self):
header = dict(zip(self.fieldnames, self.fieldnames))
self.writerow(header)
def _dict_to_list(self, rowdict):
if self.extrasaction == "raise":
wrong_fields = [k for k in rowdict if k not in self.fieldnames]
if wrong_fields:
raise ValueError(
u"dict contains fields not in fieldnames: {0}".format(
u", ".join([repr(x) for x in wrong_fields])
)
)
return [rowdict.get(key, self.restval) for key in self.fieldnames]
def writerow(self, rowdict):
return self.writer.writerow(self._dict_to_list(rowdict))
def writerows(self, rowdicts):
rows = []
for rowdict in rowdicts:
rows.append(self._dict_to_list(rowdict))
return self.writer.writerows(rows)
__all__ = [
"UnicodeReader",
"UnicodeWriter",
"UTF8Recoder",
"UnicodeDictReader",
"UnicodeDictWriter",
]