Repository URL to install this package:
|
Version:
0.1.1 ▾
|
"""testing parquet to/from pyspark"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import datetime
import numpy as np
import os
import pandas as pd
import pytest
import fastparquet
from fastparquet import write
from fastparquet.compression import compressions
from fastparquet.test.util import sql, s3, TEST_DATA
from fastparquet.util import tempdir, default_mkdirs
def test_map_array(sql):
"""
from pyspark.sql.types import *
df_schema = StructType([
StructField('map_op_op', MapType(StringType(), StringType(), True), True),
StructField('map_op_req', MapType(StringType(), StringType(), False), True),
StructField('map_req_op', MapType(StringType(), StringType(), True), False),
StructField('map_req_req', MapType(StringType(), StringType(), False), False),
StructField('arr_op_op', ArrayType(StringType(), True), True),
StructField('arr_op_req', ArrayType(StringType(), False), True),
StructField('arr_req_op', ArrayType(StringType(), True), False),
StructField('arr_req_req', ArrayType(StringType(), False), False)])
keys = ['fred', 'wilma', 'barney', 'betty']
vals = ['franky', 'benji', 'mighty']
out = []
for i in range(1000):
part = []
for field in [f.name for f in df_schema.fields]:
sort, nullable, nullvalue = field.split('_')
if nullable == 'op' and np.random.random() < 0.3:
part.append(None)
continue
N = np.random.randint(5)
ks = np.random.choice(keys, size=N).tolist()
vs = np.random.choice(vals + [None] if nullvalue == 'op' else vals,
size=N).tolist()
if sort == 'map':
part.append({k: v for (k, v) in zip(ks, vs)})
else:
part.append(vs)
out.append(part)
df = sql.createDataFrame(out, df_schema)
"""
fn = os.path.join(TEST_DATA, 'map_array.parq')
expected = sql.read.parquet(fn).toPandas()
pf = fastparquet.ParquetFile(fn)
data = pf.to_pandas()
pd.util.testing.assert_frame_equal(data, expected)
def test_nested_list(sql):
"""
j = {'nest': {'thing': ['hi', 'world']}}
open('temp.json', 'w').write('\n'.join([json.dumps(j)] * 10))
df = sql.read.json('temp.json')
"""
fn = os.path.join(TEST_DATA, 'nested.parq')
pf = fastparquet.ParquetFile(fn)
assert pf.columns == ['nest.thing'] # NOT contain 'nest'
out = pf.to_pandas(columns=['nest.thing'])
assert all([o == ['hi', 'world'] for o in out['nest.thing']])
@pytest.mark.parametrize('scheme', ['simple', 'hive', 'drill'])
@pytest.mark.parametrize('row_groups', [[0], [0, 500]])
@pytest.mark.parametrize('comp', [None] + list(compressions))
def test_pyspark_roundtrip(tempdir, scheme, row_groups, comp, sql):
if comp == 'BROTLI':
pytest.xfail("spark doesn't support BROTLI compression")
data = pd.DataFrame({'i32': np.random.randint(-2**17, 2**17, size=1001,
dtype=np.int32),
'i64': np.random.randint(-2**33, 2**33, size=1001,
dtype=np.int64),
'f': np.random.randn(1001),
'bhello': np.random.choice([b'hello', b'you',
b'people'], size=1001).astype("O"),
't': [datetime.datetime.now()]*1001})
data['t'] += pd.to_timedelta('1ns')
data['hello'] = data.bhello.str.decode('utf8')
data.loc[100, 'f'] = np.nan
data['bcat'] = data.bhello.astype('category')
data['cat'] = data.hello.astype('category')
fname = os.path.join(tempdir, 'test.parquet')
write(fname, data, file_scheme=scheme, row_group_offsets=row_groups,
compression=comp, times='int96', write_index=True)
df = sql.read.parquet(fname)
ddf = df.sort('index').toPandas()
for col in data:
if data[col].dtype.kind == "M":
# pyspark auto-converts timezones
offset = round((datetime.datetime.utcnow() -
datetime.datetime.now()).seconds / 3600)
ddf[col] + datetime.timedelta(hours=offset) == data[col]
else:
assert (ddf[col] == data[col])[~ddf[col].isnull()].all()
def test_empty_row_groups(tempdir, sql):
fn = os.path.join(tempdir, 'output.parquet')
d0 = pd.DataFrame({'name': ['alice'], 'age': [20]})
df = sql.createDataFrame(d0)
df.write.parquet(fn)
import glob
files = glob.glob(os.path.join(fn, '*.parquet'))
sizes = [os.stat(p).st_size for p in files]
msize = max(sizes)
pf = fastparquet.ParquetFile(files) # don't necessarily have metadata
assert len(files) > 1 # more than one worker was writing
d = pf.to_pandas(index=False)
pd.util.testing.assert_frame_equal(d, d0)
# destroy empty files
[os.unlink(f) for (f, s) in zip(files, sizes) if s < msize]
# loads anyway, since empty row-groups are not touched
d = pf.to_pandas()
pd.util.testing.assert_frame_equal(d, d0)