Repository URL to install this package:
|
Version:
4.5.4.dev1 ▾
|
import datetime as dt
import typing as t
import pandas as pd
import pyarrow as pa
import pyarrow.compute as pc
from sarus_data_spec.constants import DATA, PUBLIC, PU_COLUMN, WEIGHTS
import sarus_data_spec.typing as st
ADMIN_COLS = "admin_cols"
def str_datetime_to_int(
string: str, format: str, base: st.DatetimeBase
) -> int:
if base == st.DatetimeBase.INT64_MS:
return int(
pd.to_datetime(
string,
format=format,
).asm8.astype("int64")
* 1000
)
return int(
pd.to_datetime(
string,
format=format,
).asm8.astype("int64")
)
def str_time_to_int(string: str, format: str, base: st.TimeBase) -> int:
if base == st.TimeBase.INT64_NS:
time_base = pa.time64("ns")
elif base == st.TimeBase.INT32_MS:
time_base = pa.time32("ms")
else:
time_base = pa.time64("us")
return int(
pc.cast(
pa.scalar(
dt.datetime.strptime(string, format).time(),
time_base,
),
pa.int64(),
).as_py()
)
def str_date_to_int(string: str, format: str, base: st.DateBase) -> int:
return int(
pc.cast(
pa.scalar(
pd.to_datetime(string, format=format),
pa.date32(),
),
pa.int32(),
).as_py()
)
def split_arrow_iterator_admin_data(
iterator: t.Iterator[pa.RecordBatch],
) -> t.Dict[str, pa.Array]:
"""Recovers the arrow data and splits it in the administrative
and data part: the administrative part is stored
in a table while the data in a struct Array"""
arrow_data = pa.concat_arrays(
[batch.to_struct_array() for batch in iterator]
)
idx_data = arrow_data.type.get_field_index(DATA)
flattened = arrow_data.flatten()
data = flattened.pop(idx_data)
return {
DATA: data,
ADMIN_COLS: pa.Table.from_arrays(
flattened, [PUBLIC, PU_COLUMN, WEIGHTS]
),
}