import functools
import operator
import string
import hypothesis.strategies as st
import numpy as np
import numpy.testing as npt
from caffe2.python import core, dataset, workspace
from caffe2.python.dataset import Const
from caffe2.python.schema import (
FeedRecord,
FetchRecord,
Field,
List,
Map,
NewRecord,
Scalar,
Struct,
from_blob_list,
)
from caffe2.python.test_util import TestCase
from hypothesis import given
def _assert_arrays_equal(actual, ref, err_msg):
if ref.dtype.kind in ("S", "O", "U"):
np.testing.assert_array_equal(actual, ref, err_msg=err_msg)
else:
np.testing.assert_allclose(actual, ref, atol=1e-4, rtol=1e-4, err_msg=err_msg)
def _assert_records_equal(actual, ref):
assert isinstance(actual, Field)
assert isinstance(ref, Field)
b1 = actual.field_blobs()
b2 = ref.field_blobs()
assert len(b1) == len(b2), "Records have different lengths: %d vs. %d" % (
len(b1),
len(b2),
)
for name, d1, d2 in zip(ref.field_names(), b1, b2):
_assert_arrays_equal(d1, d2, err_msg="Mismatch in field %s." % name)
@st.composite
def _sparse_features_map(draw, num_records, **kwargs):
sparse_maps_lengths = draw(
st.lists(
st.integers(min_value=1, max_value=10),
min_size=num_records,
max_size=num_records,
)
)
sparse_maps_total_length = sum(sparse_maps_lengths)
sparse_keys = draw(
st.lists(
st.integers(min_value=1, max_value=100),
min_size=sparse_maps_total_length,
max_size=sparse_maps_total_length,
unique=True,
)
)
sparse_values_lengths = draw(
st.lists(
st.integers(min_value=1, max_value=10),
min_size=sparse_maps_total_length,
max_size=sparse_maps_total_length,
)
)
total_sparse_values_lengths = sum(sparse_values_lengths)
sparse_values = draw(
# max_value is max int64
st.lists(
st.integers(min_value=1, max_value=9223372036854775807),
min_size=total_sparse_values_lengths,
max_size=total_sparse_values_lengths,
)
)
return [
sparse_maps_lengths,
sparse_keys,
sparse_values_lengths,
sparse_values,
]
@st.composite
def _dense_features_map(draw, num_records, **kwargs):
float_lengths = draw(
st.lists(
st.integers(min_value=1, max_value=10),
min_size=num_records,
max_size=num_records,
)
)
total_length = sum(float_lengths)
float_keys = draw(
st.lists(
st.integers(min_value=1, max_value=100),
min_size=total_length,
max_size=total_length,
unique=True,
)
)
float_values = draw(
st.lists(st.floats(), min_size=total_length, max_size=total_length)
)
return [float_lengths, float_keys, float_values]
@st.composite
def _dataset(draw, min_elements=3, max_elements=10, **kwargs):
schema = Struct(
# Dense Features Map
("floats", Map(Scalar(np.int32), Scalar(np.float32))),
# Sparse Features Map
(
"int_lists",
Map(
Scalar(np.int32),
List(Scalar(np.int64)),
),
),
# Complex Type
("text", Scalar(str)),
)
num_records = draw(st.integers(min_value=min_elements, max_value=max_elements))
raw_dense_features_map_contents = draw(_dense_features_map(num_records))
raw_sparse_features_map_contents = draw(_sparse_features_map(num_records))
raw_text_contents = [
draw(
st.lists(
st.text(alphabet=string.ascii_lowercase),
min_size=num_records,
max_size=num_records,
)
)
]
# Concatenate all raw contents to a single one
contents_raw = (
raw_dense_features_map_contents
+ raw_sparse_features_map_contents
+ raw_text_contents
)
contents = from_blob_list(schema, contents_raw)
return (schema, contents, num_records)
class TestDatasetOps(TestCase):
@given(_dataset())
def test_pack_unpack(self, input):
"""
Tests if packing and unpacking of the whole dataset is an identity.
"""
(schema, contents, num_records) = input
dataset_fields = schema.field_names()
for pack_to_single_shared_ptr in (True, False):
net = core.Net("pack_unpack_net")
batch = NewRecord(net, contents)
FeedRecord(batch, contents)
packed = net.PackRecords(
batch.field_blobs(),
1,
fields=dataset_fields,
pack_to_single_shared_ptr=pack_to_single_shared_ptr,
)
unpacked = packed.UnPackRecords(
[], len(dataset_fields), fields=dataset_fields
)
workspace.RunNetOnce(net)
for initial_tensor, unpacked_tensor in zip(batch.field_blobs(), unpacked):
npt.assert_array_equal(
workspace.FetchBlob(initial_tensor),
workspace.FetchBlob(unpacked_tensor),
)
def test_dataset_ops(self):
"""
1. Defining the schema of our dataset.
This example schema could represent, for example, a search query log.
"""
schema = Struct(
# fixed size vector, which will be stored as a matrix when batched
("dense", Scalar((np.float32, 3))),
# could represent a feature map from feature ID to float value
("floats", Map(Scalar(np.int32), Scalar(np.float32))),
# could represent a multi-valued categorical feature map
(
"int_lists",
Map(
Scalar(np.int32),
List(Scalar(np.int64)),
),
),
# could represent a multi-valued, weighted categorical feature map
(
"id_score_pairs",
Map(
Scalar(np.int32),
Map(
Scalar(np.int64),
Scalar(np.float32),
keys_name="ids",
values_name="scores",
),
),
),
# additional scalar information
(
"metadata",
Struct(
("user_id", Scalar(np.int64)),
("user_embed", Scalar((np.float32, 2))),
("query", Scalar(str)),
),
),
)
"""
This is what the flattened fields for this schema look like, along
with its type. Each one of these fields will be stored, read and
written as a tensor.
"""
expected_fields = [
("dense", (np.float32, 3)),
("floats:lengths", np.int32),
("floats:values:keys", np.int32),
("floats:values:values", np.float32),
("int_lists:lengths", np.int32),
("int_lists:values:keys", np.int32),
("int_lists:values:values:lengths", np.int32),
("int_lists:values:values:values", np.int64),
("id_score_pairs:lengths", np.int32),
("id_score_pairs:values:keys", np.int32),
("id_score_pairs:values:values:lengths", np.int32),
("id_score_pairs:values:values:values:ids", np.int64),
("id_score_pairs:values:values:values:scores", np.float32),
("metadata:user_id", np.int64),
("metadata:user_embed", (np.float32, 2)),
("metadata:query", str),
]
zipped = zip(expected_fields, schema.field_names(), schema.field_types())
for (ref_name, ref_type), name, dtype in zipped:
self.assertEquals(ref_name, name)
self.assertEquals(np.dtype(ref_type), dtype)
"""
2. The contents of our dataset.
Contents as defined below could represent, for example, a log of
search queries along with dense, sparse features and metadata.
The dataset below has 3 top-level entries.
"""
contents_raw = [
# dense
[[1.1, 1.2, 1.3], [2.1, 2.2, 2.3], [3.1, 3.2, 3.3]],
# floats
[1, 2, 3], # len
[11, 21, 22, 31, 32, 33], # key
[1.1, 2.1, 2.2, 3.1, 3.2, 3.3], # value
# int lists
[2, 0, 1], # len
[11, 12, 31], # key
[2, 4, 3], # value:len
[111, 112, 121, 122, 123, 124, 311, 312, 313], # value:value
# id score pairs
[1, 2, 2], # len
[11, 21, 22, 31, 32], # key
[1, 1, 2, 2, 3], # value:len
[111, 211, 221, 222, 311, 312, 321, 322, 323], # value:ids
[11.1, 21.1, 22.1, 22.2, 31.1, 31.2, 32.1, 32.2, 32.3], # val:score
# metadata
[123, 234, 456], # user_id
[[0.2, 0.8], [0.5, 0.5], [0.7, 0.3]], # user_embed
["dog posts", "friends who like to", "posts about ca"], # query
]
# convert the above content to ndarrays, checking against the schema
contents = from_blob_list(schema, contents_raw)
"""
3. Creating and appending to the dataset.
We first create an empty dataset with the given schema.
Then, a Writer is used to append these entries to the dataset.
"""
ds = dataset.Dataset(schema)
net = core.Net("init")
with core.NameScope("init"):
ds.init_empty(net)
content_blobs = NewRecord(net, contents)
FeedRecord(content_blobs, contents)
writer = ds.writer(init_net=net)
writer.write_record(net, content_blobs)
workspace.RunNetOnce(net)
"""
4. Iterating through the dataset contents.
If we were to iterate through the top level entries of our dataset,
this is what we should expect to see:
"""
entries_raw = [
(
[[1.1, 1.2, 1.3]], # dense
[1],
[11],
[1.1], # floats
[2],
[11, 12],
[2, 4],
[111, 112, 121, 122, 123, 124], # intlst
[1],
[11],
[1],
[111],
[11.1], # id score pairs
[123],
[[0.2, 0.8]],
["dog posts"], # metadata
),
(
[[2.1, 2.2, 2.3]], # dense
[2],
[21, 22],
[2.1, 2.2], # floats
Loading ...