Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
unicsv / unicsv / __init__.py
Size: Mime:
# -*- coding: utf-8 -*-
"""
    unicsv is a python module which wraps the standard csv module to provide
    unicode support. All encodings default to utf-8 however the module supports
    reading from and writing to any encoding that python's codecs module
    supports.
"""
import csv
import unicodedata

import codecs
from past.types import unicode
from six import PY2
from six.moves import cStringIO as StringIO

# Use Normal Form C for unicode normalization as per reccomendation by
# "The W3C Character Model for the World Wide Web, Part II: Normalization" and
# other W3C Specifications: http://unicode.org/reports/tr15/
NORMAL_FORM_C = "NFC"


def _encode_value_py2(value):
    return unicode(value).encode("utf-8") if value is not None else b""


def _encode_value_py3(value):
    return unicode(value) if value is not None else ""


def _decode_value_py2(value):
    return value.decode("utf-8")


def _decode_value_py3(value):
    return value


if PY2:
    _encode_value = _encode_value_py2
    _decode_value = _decode_value_py2

else:
    _encode_value = _encode_value_py3
    _decode_value = _decode_value_py3


class UTF8Recoder(object):
    """
    Iterator that reads an encoded stream and reencodes the input to UTF-8
    """

    def __init__(self, f, encoding):
        """
        Initialize the UTF8Recorder

        :param f: Input stream to turn into a reader
        :type f: file or stream

        :param encoding: The encoding of input f
        :type encoding: str or unicode
        """
        self.reader = codecs.getreader(encoding)(f)

    def __iter__(self):
        return self

    def __next__(self):
        return self._next()

    def next(self):
        return self._next()

    def _next(self):
        """
        Return next value from stream encoded as utf-8 byte string
        """
        return self.reader.next().encode("utf-8")


class UnicodeReader(object):
    """
    A CSV reader which will iterate over lines in the CSV file "f",
    which is encoded in the given encoding and yield lists of unicode values
    normalized to Normalization Form C
    """

    def __init__(self, f, dialect=csv.excel, encoding="utf-8", **kwargs):
        """
        Initialize the UnicodeReader

        :param f: Input stream to reader
        :type f: file or stream

        :param dialect: Define a set of parameters specific to a particular
                        CSV dialect
        :type dialect: csv.Dialect

        :param encoding: The encoding of input f
        :type encoding: str or unicode

        :param kwargs: Additional formatting parameters for csv.reader
        """
        f = UTF8Recoder(f, encoding)
        self.reader = csv.reader(f, dialect=dialect, **kwargs)
        self.line_num = 0

    def __next__(self):
        return self._next()

    def next(self):
        return self._next()

    def _next(self):
        """
        Return the next row of cells from reader

        :returns: List of unicode objects that represent the contents of
                  cells
        """
        row = next(self.reader)
        self.line_num = self.reader.line_num
        return [unicodedata.normalize(NORMAL_FORM_C, unicode(s, "utf-8")) for s in row]

    def __iter__(self):
        return self


class UnicodeWriter(object):
    """
    A CSV writer which will write rows to CSV file "f",
    which is encoded in the given encoding.
    """

    def __init__(self, f, dialect=csv.excel, encoding="utf-8", **kwargs):
        """
        Initialize the UnicodeWriter

        :param f: Output stream to write to
        :type f: file or stream

        :param dialect: Define a set of parameters specific to a particular
                        CSV dialect
        :type dialect: csv.Dialect

        :param encoding: The encoding of output stream f. It is *strongly*
                         encouraged to use the default, utf-8.
        :type encoding: str or unicode

        :param kwargs: Additional formatting parameters for csv.writer
        """
        # Redirect output to a queue
        self.queue = StringIO()
        self.writer = csv.writer(self.queue, dialect=dialect, **kwargs)
        self.stream = f
        self.encoder = codecs.getincrementalencoder(encoding)()

    def writerow(self, row):
        """
        Write a row to the UnicodeWriter's output stream
        The row is formatted according to the current dialect and encoded
        in the UnicodeWriter's specified encoding

        :param row: The row to write out to the stream
        :type row: list of unicode objects or objects that can be casted to
                   unicode
        """
        self.writer.writerow([_encode_value(u) for u in row])
        # Fetch UTF-8 output from the queue ...
        data = self.queue.getvalue()
        data = _decode_value(data)
        data = unicodedata.normalize(NORMAL_FORM_C, data)
        # ... and reencode it into the target encoding
        data = self.encoder.encode(data)
        # write to the target stream
        self.stream.write(data)
        # empty queue
        self.queue.truncate(0)
        self.queue.seek(0)

    def writerows(self, rows):
        """
        Write multiple rows of data to the UnicodeWriter's output stream

        :param rows: a list of csv rows to write
        :type rows: list of lists of unicode objects
        """
        for row in rows:
            self.writerow(row)


##########################################################################
# The following was copied from csv as of version 2.7.9. I've replaced   #
# instances of csv.reader with unicsv.UnicodeReader and instances of     #
# csv.writer with unicsv.UnicodeWriter                                   #
##########################################################################


class UnicodeDictReader(object):
    def __init__(
        self,
        f,
        fieldnames=None,
        restkey=None,
        restval=None,
        dialect=csv.excel,
        encoding="utf-8",
        *args,
        **kwds
    ):
        self._fieldnames = fieldnames  # list of keys for the dict
        self.restkey = restkey  # key to catch long rows
        self.restval = restval  # default value for short rows
        self.reader = UnicodeReader(f, dialect, encoding, *args, **kwds)
        self.dialect = dialect
        self.line_num = 0

    def __iter__(self):
        return self

    @property
    def fieldnames(self):
        if self._fieldnames is None:
            try:
                self._fieldnames = next(self.reader)
            except StopIteration:
                pass
        self.line_num = self.reader.line_num
        return self._fieldnames

    # Issue 20004: Because DictReader is a classic class, this setter is
    # ignored.  At this point in 2.7's lifecycle, it is too late to change the
    # base class for fear of breaking working code.  If you want to change
    # fieldnames without overwriting the getter, set _fieldnames directly.
    @fieldnames.setter
    def fieldnames(self, value):
        self._fieldnames = value

    def next(self):
        if self.line_num == 0:
            # Used only for its side effect.
            self.fieldnames
        row = next(self.reader)
        self.line_num = self.reader.line_num

        # unlike the basic reader, we prefer not to return blanks,
        # because we will typically wind up with a dict full of None
        # values
        while row == []:
            row = next(self.reader)
        d = dict(zip(self.fieldnames, row))
        lf = len(self.fieldnames)
        lr = len(row)
        if lf < lr:
            d[self.restkey] = row[lf:]
        elif lf > lr:
            for key in self.fieldnames[lr:]:
                d[key] = self.restval
        return d


class UnicodeDictWriter(object):
    def __init__(
        self,
        f,
        fieldnames,
        restval="",
        extrasaction="raise",
        dialect=csv.excel,
        encoding="utf-8",
        *args,
        **kwds
    ):
        self.fieldnames = fieldnames  # list of keys for the dict
        self.restval = restval  # for writing short dicts
        if extrasaction.lower() not in ("raise", "ignore"):
            raise ValueError(
                u"extrasaction ({0}) must be 'raise' or 'ignore'".format(extrasaction)
            )
        self.extrasaction = extrasaction
        self.writer = UnicodeWriter(f, dialect, encoding, *args, **kwds)

    def writeheader(self):
        header = dict(zip(self.fieldnames, self.fieldnames))
        self.writerow(header)

    def _dict_to_list(self, rowdict):
        if self.extrasaction == "raise":
            wrong_fields = [k for k in rowdict if k not in self.fieldnames]
            if wrong_fields:
                raise ValueError(
                    u"dict contains fields not in fieldnames: {0}".format(
                        u", ".join([repr(x) for x in wrong_fields])
                    )
                )
        return [rowdict.get(key, self.restval) for key in self.fieldnames]

    def writerow(self, rowdict):
        return self.writer.writerow(self._dict_to_list(rowdict))

    def writerows(self, rowdicts):
        rows = []
        for rowdict in rowdicts:
            rows.append(self._dict_to_list(rowdict))
        return self.writer.writerows(rows)


__all__ = [
    "UnicodeReader",
    "UnicodeWriter",
    "UTF8Recoder",
    "UnicodeDictReader",
    "UnicodeDictWriter",
]