Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

agriconnect / pandas   python

Repository URL to install this package:

Version: 0.24.2 

/ tests / io / sas / test_sas7bdat.py

import io
import os

import numpy as np
import pytest

from pandas.compat import PY2
from pandas.errors import EmptyDataError
import pandas.util._test_decorators as td

import pandas as pd
import pandas.util.testing as tm


# https://github.com/cython/cython/issues/1720
@pytest.mark.filterwarnings("ignore:can't resolve package:ImportWarning")
class TestSAS7BDAT(object):

    @pytest.fixture(autouse=True)
    def setup_method(self, datapath):
        self.dirpath = datapath("io", "sas", "data")
        self.data = []
        self.test_ix = [list(range(1, 16)), [16]]
        for j in 1, 2:
            fname = os.path.join(
                self.dirpath, "test_sas7bdat_{j}.csv".format(j=j))
            df = pd.read_csv(fname)
            epoch = pd.datetime(1960, 1, 1)
            t1 = pd.to_timedelta(df["Column4"], unit='d')
            df["Column4"] = epoch + t1
            t2 = pd.to_timedelta(df["Column12"], unit='d')
            df["Column12"] = epoch + t2
            for k in range(df.shape[1]):
                col = df.iloc[:, k]
                if col.dtype == np.int64:
                    df.iloc[:, k] = df.iloc[:, k].astype(np.float64)
                elif col.dtype == np.dtype('O'):
                    if PY2:
                        f = lambda x: (x.decode('utf-8') if
                                       isinstance(x, str) else x)
                        df.iloc[:, k] = df.iloc[:, k].apply(f)
            self.data.append(df)

    def test_from_file(self):
        for j in 0, 1:
            df0 = self.data[j]
            for k in self.test_ix[j]:
                fname = os.path.join(
                    self.dirpath, "test{k}.sas7bdat".format(k=k))
                df = pd.read_sas(fname, encoding='utf-8')
                tm.assert_frame_equal(df, df0)

    def test_from_buffer(self):
        for j in 0, 1:
            df0 = self.data[j]
            for k in self.test_ix[j]:
                fname = os.path.join(
                    self.dirpath, "test{k}.sas7bdat".format(k=k))
                with open(fname, 'rb') as f:
                    byts = f.read()
                buf = io.BytesIO(byts)
                rdr = pd.read_sas(buf, format="sas7bdat",
                                  iterator=True, encoding='utf-8')
                df = rdr.read()
                tm.assert_frame_equal(df, df0, check_exact=False)
                rdr.close()

    def test_from_iterator(self):
        for j in 0, 1:
            df0 = self.data[j]
            for k in self.test_ix[j]:
                fname = os.path.join(
                    self.dirpath, "test{k}.sas7bdat".format(k=k))
                rdr = pd.read_sas(fname, iterator=True, encoding='utf-8')
                df = rdr.read(2)
                tm.assert_frame_equal(df, df0.iloc[0:2, :])
                df = rdr.read(3)
                tm.assert_frame_equal(df, df0.iloc[2:5, :])
                rdr.close()

    @td.skip_if_no('pathlib')
    def test_path_pathlib(self):
        from pathlib import Path
        for j in 0, 1:
            df0 = self.data[j]
            for k in self.test_ix[j]:
                fname = Path(os.path.join(
                    self.dirpath, "test{k}.sas7bdat".format(k=k)))
                df = pd.read_sas(fname, encoding='utf-8')
                tm.assert_frame_equal(df, df0)

    @td.skip_if_no('py.path')
    def test_path_localpath(self):
        from py.path import local as LocalPath
        for j in 0, 1:
            df0 = self.data[j]
            for k in self.test_ix[j]:
                fname = LocalPath(os.path.join(
                    self.dirpath, "test{k}.sas7bdat".format(k=k)))
                df = pd.read_sas(fname, encoding='utf-8')
                tm.assert_frame_equal(df, df0)

    def test_iterator_loop(self):
        # github #13654
        for j in 0, 1:
            for k in self.test_ix[j]:
                for chunksize in 3, 5, 10, 11:
                    fname = os.path.join(
                        self.dirpath, "test{k}.sas7bdat".format(k=k))
                    rdr = pd.read_sas(fname, chunksize=10, encoding='utf-8')
                    y = 0
                    for x in rdr:
                        y += x.shape[0]
                    assert y == rdr.row_count
                    rdr.close()

    def test_iterator_read_too_much(self):
        # github #14734
        k = self.test_ix[0][0]
        fname = os.path.join(self.dirpath, "test{k}.sas7bdat".format(k=k))
        rdr = pd.read_sas(fname, format="sas7bdat",
                          iterator=True, encoding='utf-8')
        d1 = rdr.read(rdr.row_count + 20)
        rdr.close()

        rdr = pd.read_sas(fname, iterator=True, encoding="utf-8")
        d2 = rdr.read(rdr.row_count + 20)
        tm.assert_frame_equal(d1, d2)
        rdr.close()


def test_encoding_options(datapath):
    fname = datapath("io", "sas", "data", "test1.sas7bdat")
    df1 = pd.read_sas(fname)
    df2 = pd.read_sas(fname, encoding='utf-8')
    for col in df1.columns:
        try:
            df1[col] = df1[col].str.decode('utf-8')
        except AttributeError:
            pass
    tm.assert_frame_equal(df1, df2)

    from pandas.io.sas.sas7bdat import SAS7BDATReader
    rdr = SAS7BDATReader(fname, convert_header_text=False)
    df3 = rdr.read()
    rdr.close()
    for x, y in zip(df1.columns, df3.columns):
        assert(x == y.decode())


def test_productsales(datapath):
    fname = datapath("io", "sas", "data", "productsales.sas7bdat")
    df = pd.read_sas(fname, encoding='utf-8')
    fname = datapath("io", "sas", "data", "productsales.csv")
    df0 = pd.read_csv(fname, parse_dates=['MONTH'])
    vn = ["ACTUAL", "PREDICT", "QUARTER", "YEAR"]
    df0[vn] = df0[vn].astype(np.float64)
    tm.assert_frame_equal(df, df0)


def test_12659(datapath):
    fname = datapath("io", "sas", "data", "test_12659.sas7bdat")
    df = pd.read_sas(fname)
    fname = datapath("io", "sas", "data", "test_12659.csv")
    df0 = pd.read_csv(fname)
    df0 = df0.astype(np.float64)
    tm.assert_frame_equal(df, df0)


def test_airline(datapath):
    fname = datapath("io", "sas", "data", "airline.sas7bdat")
    df = pd.read_sas(fname)
    fname = datapath("io", "sas", "data", "airline.csv")
    df0 = pd.read_csv(fname)
    df0 = df0.astype(np.float64)
    tm.assert_frame_equal(df, df0, check_exact=False)


def test_date_time(datapath):
    # Support of different SAS date/datetime formats (PR #15871)
    fname = datapath("io", "sas", "data", "datetime.sas7bdat")
    df = pd.read_sas(fname)
    fname = datapath("io", "sas", "data", "datetime.csv")
    df0 = pd.read_csv(fname, parse_dates=['Date1', 'Date2', 'DateTime',
                                          'DateTimeHi', 'Taiw'])
    # GH 19732: Timestamps imported from sas will incur floating point errors
    df.iloc[:, 3] = df.iloc[:, 3].dt.round('us')
    tm.assert_frame_equal(df, df0)


def test_compact_numerical_values(datapath):
    # Regression test for #21616
    fname = datapath("io", "sas", "data", "cars.sas7bdat")
    df = pd.read_sas(fname, encoding='latin-1')
    # The two columns CYL and WGT in cars.sas7bdat have column
    # width < 8 and only contain integral values.
    # Test that pandas doesn't corrupt the numbers by adding
    # decimals.
    result = df['WGT']
    expected = df['WGT'].round()
    tm.assert_series_equal(result, expected, check_exact=True)
    result = df['CYL']
    expected = df['CYL'].round()
    tm.assert_series_equal(result, expected, check_exact=True)


def test_many_columns(datapath):
    # Test for looking for column information in more places (PR #22628)
    fname = datapath("io", "sas", "data", "many_columns.sas7bdat")
    df = pd.read_sas(fname, encoding='latin-1')
    fname = datapath("io", "sas", "data", "many_columns.csv")
    df0 = pd.read_csv(fname, encoding='latin-1')
    tm.assert_frame_equal(df, df0)


def test_inconsistent_number_of_rows(datapath):
    # Regression test for issue #16615. (PR #22628)
    fname = datapath("io", "sas", "data", "load_log.sas7bdat")
    df = pd.read_sas(fname, encoding='latin-1')
    assert len(df) == 2097


def test_zero_variables(datapath):
    # Check if the SAS file has zero variables (PR #18184)
    fname = datapath("io", "sas", "data", "zero_variables.sas7bdat")
    with pytest.raises(EmptyDataError):
        pd.read_sas(fname)