Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
pandas / tests / io / parser / test_textreader.py
Size: Mime:
# -*- coding: utf-8 -*-

"""
Tests the TextReader class in parsers.pyx, which
is integral to the C engine in parsers.py
"""

import pytest

from pandas.compat import StringIO, BytesIO, map
from pandas import compat

import os
import sys

from numpy import nan
import numpy as np

from pandas import DataFrame
from pandas.io.parsers import (read_csv, TextFileReader)
from pandas.util.testing import assert_frame_equal

import pandas.util.testing as tm

from pandas._libs.parsers import TextReader
import pandas._libs.parsers as parser


class TestTextReader(object):

    @pytest.fixture(autouse=True)
    def setup_method(self, datapath):
        self.dirpath = datapath('io', 'parser', 'data')
        self.csv1 = os.path.join(self.dirpath, 'test1.csv')
        self.csv2 = os.path.join(self.dirpath, 'test2.csv')
        self.xls1 = os.path.join(self.dirpath, 'test.xls')

    def test_file_handle(self):
        with open(self.csv1, 'rb') as f:
            reader = TextReader(f)
            reader.read()

    def test_string_filename(self):
        reader = TextReader(self.csv1, header=None)
        reader.read()

    def test_file_handle_mmap(self):
        with open(self.csv1, 'rb') as f:
            reader = TextReader(f, memory_map=True, header=None)
            reader.read()

    def test_StringIO(self):
        with open(self.csv1, 'rb') as f:
            text = f.read()
        src = BytesIO(text)
        reader = TextReader(src, header=None)
        reader.read()

    def test_string_factorize(self):
        # should this be optional?
        data = 'a\nb\na\nb\na'
        reader = TextReader(StringIO(data), header=None)
        result = reader.read()
        assert len(set(map(id, result[0]))) == 2

    def test_skipinitialspace(self):
        data = ('a,   b\n'
                'a,   b\n'
                'a,   b\n'
                'a,   b')

        reader = TextReader(StringIO(data), skipinitialspace=True,
                            header=None)
        result = reader.read()

        tm.assert_numpy_array_equal(result[0], np.array(['a', 'a', 'a', 'a'],
                                                        dtype=np.object_))
        tm.assert_numpy_array_equal(result[1], np.array(['b', 'b', 'b', 'b'],
                                                        dtype=np.object_))

    def test_parse_booleans(self):
        data = 'True\nFalse\nTrue\nTrue'

        reader = TextReader(StringIO(data), header=None)
        result = reader.read()

        assert result[0].dtype == np.bool_

    def test_delimit_whitespace(self):
        data = 'a  b\na\t\t "b"\n"a"\t \t b'

        reader = TextReader(StringIO(data), delim_whitespace=True,
                            header=None)
        result = reader.read()

        tm.assert_numpy_array_equal(result[0], np.array(['a', 'a', 'a'],
                                                        dtype=np.object_))
        tm.assert_numpy_array_equal(result[1], np.array(['b', 'b', 'b'],
                                                        dtype=np.object_))

    def test_embedded_newline(self):
        data = 'a\n"hello\nthere"\nthis'

        reader = TextReader(StringIO(data), header=None)
        result = reader.read()

        expected = np.array(['a', 'hello\nthere', 'this'], dtype=np.object_)
        tm.assert_numpy_array_equal(result[0], expected)

    def test_euro_decimal(self):
        data = '12345,67\n345,678'

        reader = TextReader(StringIO(data), delimiter=':',
                            decimal=',', header=None)
        result = reader.read()

        expected = np.array([12345.67, 345.678])
        tm.assert_almost_equal(result[0], expected)

    def test_integer_thousands(self):
        data = '123,456\n12,500'

        reader = TextReader(StringIO(data), delimiter=':',
                            thousands=',', header=None)
        result = reader.read()

        expected = np.array([123456, 12500], dtype=np.int64)
        tm.assert_almost_equal(result[0], expected)

    def test_integer_thousands_alt(self):
        data = '123.456\n12.500'

        reader = TextFileReader(StringIO(data), delimiter=':',
                                thousands='.', header=None)
        result = reader.read()

        expected = DataFrame([123456, 12500])
        tm.assert_frame_equal(result, expected)

    @tm.capture_stderr
    def test_skip_bad_lines(self):
        # too many lines, see #2430 for why
        data = ('a:b:c\n'
                'd:e:f\n'
                'g:h:i\n'
                'j:k:l:m\n'
                'l:m:n\n'
                'o:p:q:r')

        reader = TextReader(StringIO(data), delimiter=':',
                            header=None)
        pytest.raises(parser.ParserError, reader.read)

        reader = TextReader(StringIO(data), delimiter=':',
                            header=None,
                            error_bad_lines=False,
                            warn_bad_lines=False)
        result = reader.read()
        expected = {0: np.array(['a', 'd', 'g', 'l'], dtype=object),
                    1: np.array(['b', 'e', 'h', 'm'], dtype=object),
                    2: np.array(['c', 'f', 'i', 'n'], dtype=object)}
        assert_array_dicts_equal(result, expected)

        reader = TextReader(StringIO(data), delimiter=':',
                            header=None,
                            error_bad_lines=False,
                            warn_bad_lines=True)
        reader.read()
        val = sys.stderr.getvalue()

        assert 'Skipping line 4' in val
        assert 'Skipping line 6' in val

    def test_header_not_enough_lines(self):
        data = ('skip this\n'
                'skip this\n'
                'a,b,c\n'
                '1,2,3\n'
                '4,5,6')

        reader = TextReader(StringIO(data), delimiter=',', header=2)
        header = reader.header
        expected = [['a', 'b', 'c']]
        assert header == expected

        recs = reader.read()
        expected = {0: np.array([1, 4], dtype=np.int64),
                    1: np.array([2, 5], dtype=np.int64),
                    2: np.array([3, 6], dtype=np.int64)}
        assert_array_dicts_equal(recs, expected)

    def test_escapechar(self):
        data = ('\\"hello world\"\n'
                '\\"hello world\"\n'
                '\\"hello world\"')

        reader = TextReader(StringIO(data), delimiter=',', header=None,
                            escapechar='\\')
        result = reader.read()
        expected = {0: np.array(['"hello world"'] * 3, dtype=object)}
        assert_array_dicts_equal(result, expected)

    def test_eof_has_eol(self):
        # handling of new line at EOF
        pass

    def test_na_substitution(self):
        pass

    def test_numpy_string_dtype(self):
        data = """\
a,1
aa,2
aaa,3
aaaa,4
aaaaa,5"""

        def _make_reader(**kwds):
            return TextReader(StringIO(data), delimiter=',', header=None,
                              **kwds)

        reader = _make_reader(dtype='S5,i4')
        result = reader.read()

        assert result[0].dtype == 'S5'

        ex_values = np.array(['a', 'aa', 'aaa', 'aaaa', 'aaaaa'], dtype='S5')
        assert (result[0] == ex_values).all()
        assert result[1].dtype == 'i4'

        reader = _make_reader(dtype='S4')
        result = reader.read()
        assert result[0].dtype == 'S4'
        ex_values = np.array(['a', 'aa', 'aaa', 'aaaa', 'aaaa'], dtype='S4')
        assert (result[0] == ex_values).all()
        assert result[1].dtype == 'S4'

    def test_pass_dtype(self):
        data = """\
one,two
1,a
2,b
3,c
4,d"""

        def _make_reader(**kwds):
            return TextReader(StringIO(data), delimiter=',', **kwds)

        reader = _make_reader(dtype={'one': 'u1', 1: 'S1'})
        result = reader.read()
        assert result[0].dtype == 'u1'
        assert result[1].dtype == 'S1'

        reader = _make_reader(dtype={'one': np.uint8, 1: object})
        result = reader.read()
        assert result[0].dtype == 'u1'
        assert result[1].dtype == 'O'

        reader = _make_reader(dtype={'one': np.dtype('u1'),
                                     1: np.dtype('O')})
        result = reader.read()
        assert result[0].dtype == 'u1'
        assert result[1].dtype == 'O'

    def test_usecols(self):
        data = """\
a,b,c
1,2,3
4,5,6
7,8,9
10,11,12"""

        def _make_reader(**kwds):
            return TextReader(StringIO(data), delimiter=',', **kwds)

        reader = _make_reader(usecols=(1, 2))
        result = reader.read()

        exp = _make_reader().read()
        assert len(result) == 2
        assert (result[1] == exp[1]).all()
        assert (result[2] == exp[2]).all()

    def test_cr_delimited(self):
        def _test(text, **kwargs):
            nice_text = text.replace('\r', '\r\n')
            result = TextReader(StringIO(text), **kwargs).read()
            expected = TextReader(StringIO(nice_text), **kwargs).read()
            assert_array_dicts_equal(result, expected)

        data = 'a,b,c\r1,2,3\r4,5,6\r7,8,9\r10,11,12'
        _test(data, delimiter=',')

        data = 'a  b  c\r1  2  3\r4  5  6\r7  8  9\r10  11  12'
        _test(data, delim_whitespace=True)

        data = 'a,b,c\r1,2,3\r4,5,6\r,88,9\r10,11,12'
        _test(data, delimiter=',')

        sample = ('A,B,C,D,E,F,G,H,I,J,K,L,M,N,O\r'
                  'AAAAA,BBBBB,0,0,0,0,0,0,0,0,0,0,0,0,0\r'
                  ',BBBBB,0,0,0,0,0,0,0,0,0,0,0,0,0')
        _test(sample, delimiter=',')

        data = 'A  B  C\r  2  3\r4  5  6'
        _test(data, delim_whitespace=True)

        data = 'A B C\r2 3\r4 5 6'
        _test(data, delim_whitespace=True)

    def test_empty_field_eof(self):
        data = 'a,b,c\n1,2,3\n4,,'

        result = TextReader(StringIO(data), delimiter=',').read()

        expected = {0: np.array([1, 4], dtype=np.int64),
                    1: np.array(['2', ''], dtype=object),
                    2: np.array(['3', ''], dtype=object)}
        assert_array_dicts_equal(result, expected)

        # GH5664
        a = DataFrame([['b'], [nan]], columns=['a'], index=['a', 'c'])
        b = DataFrame([[1, 1, 1, 0], [1, 1, 1, 0]],
                      columns=list('abcd'),
                      index=[1, 1])
        c = DataFrame([[1, 2, 3, 4], [6, nan, nan, nan],
                       [8, 9, 10, 11], [13, 14, nan, nan]],
                      columns=list('abcd'),
                      index=[0, 5, 7, 12])

        for _ in range(100):
            df = read_csv(StringIO('a,b\nc\n'), skiprows=0,
                          names=['a'], engine='c')
            assert_frame_equal(df, a)

            df = read_csv(StringIO('1,1,1,1,0\n' * 2 + '\n' * 2),
                          names=list("abcd"), engine='c')
            assert_frame_equal(df, b)

            df = read_csv(StringIO('0,1,2,3,4\n5,6\n7,8,9,10,11\n12,13,14'),
                          names=list('abcd'), engine='c')
            assert_frame_equal(df, c)

    def test_empty_csv_input(self):
        # GH14867
        df = read_csv(StringIO(), chunksize=20, header=None,
                      names=['a', 'b', 'c'])
        assert isinstance(df, TextFileReader)


def assert_array_dicts_equal(left, right):
    for k, v in compat.iteritems(left):
        assert tm.assert_numpy_array_equal(np.asarray(v),
                                           np.asarray(right[k]))