Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
import datetime as dt
import typing as t

import pandas as pd
import pyarrow as pa
import pyarrow.compute as pc

from sarus_data_spec.constants import DATA, PUBLIC, PU_COLUMN, WEIGHTS
import sarus_data_spec.typing as st

ADMIN_COLS = "admin_cols"


def str_datetime_to_int(
    string: str, format: str, base: st.DatetimeBase
) -> int:
    if base == st.DatetimeBase.INT64_MS:
        return int(
            pd.to_datetime(
                string,
                format=format,
            ).asm8.astype("int64")
            * 1000
        )

    return int(
        pd.to_datetime(
            string,
            format=format,
        ).asm8.astype("int64")
    )


def str_time_to_int(string: str, format: str, base: st.TimeBase) -> int:
    if base == st.TimeBase.INT64_NS:
        time_base = pa.time64("ns")
    elif base == st.TimeBase.INT32_MS:
        time_base = pa.time32("ms")
    else:
        time_base = pa.time64("us")

    return int(
        pc.cast(
            pa.scalar(
                dt.datetime.strptime(string, format).time(),
                time_base,
            ),
            pa.int64(),
        ).as_py()
    )


def str_date_to_int(string: str, format: str, base: st.DateBase) -> int:
    return int(
        pc.cast(
            pa.scalar(
                pd.to_datetime(string, format=format),
                pa.date32(),
            ),
            pa.int32(),
        ).as_py()
    )


def split_arrow_iterator_admin_data(
    iterator: t.Iterator[pa.RecordBatch],
) -> t.Dict[str, pa.Array]:
    """Recovers the arrow data and splits it in the administrative
    and data part: the administrative part is stored
    in a table while the data in a struct Array"""
    arrow_data = pa.concat_arrays(
        [batch.to_struct_array() for batch in iterator]
    )
    idx_data = arrow_data.type.get_field_index(DATA)
    flattened = arrow_data.flatten()
    data = flattened.pop(idx_data)
    return {
        DATA: data,
        ADMIN_COLS: pa.Table.from_arrays(
            flattened, [PUBLIC, PU_COLUMN, WEIGHTS]
        ),
    }