Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

neilisaac / torch   python

Repository URL to install this package:

/ python / layers / feature_sparse_to_dense.py

# @package sparse_to_dense
# Module caffe2.python.layers.sparse_to_dense


from collections import defaultdict

import numpy as np
from caffe2.python import schema
from caffe2.python.layers.layers import AccessedFeatures, ModelLayer


class FeatureSparseToDense(ModelLayer):
    def __init__(
        self,
        model,
        input_record,
        input_specs,
        name="feature_sparse_to_dense",
        default_dense_value=None,
        **kwargs
    ):
        """
        `input_specs` follows the format of FeatureSpec from schema. To be more
        precise it's a namedtuple that should have:
            'feature_type', 'feature_names', 'feature_ids'
        Default_dense_value can only be 0.0 or float("NaN"). Any input that isn't
        None will be NaN.
        """
        super(FeatureSparseToDense, self).__init__(model, name, input_record, **kwargs)
        if default_dense_value is None:
            default_dense_value = 0.0
        default_dense_value = float(default_dense_value)
        assert (
            np.isnan(default_dense_value) or default_dense_value == 0.0
        ), "default_dense_value can only be 0.0 or NaN"

        self.input_specs = input_specs
        self.default_float_value = (
            model.global_constants["NAN"]
            if np.isnan(default_dense_value)
            else model.global_constants["ZERO"]
        )
        self.zero_range = model.global_constants["ZERO_RANGE"]

        outputs = []
        for field, feature_specs in self.input_specs:
            assert len(feature_specs.feature_names) == len(feature_specs.feature_ids)
            if feature_specs.feature_type == "FLOAT":
                outputs.append(
                    (
                        field,
                        schema.Scalar(
                            (np.float32, (len(feature_specs.feature_ids),)),
                            self.get_next_blob_reference(field + "_output"),
                        ),
                    )
                )
            elif feature_specs.feature_type == "ID_LIST":
                outputs.append(
                    (
                        field,
                        schema.Struct(
                            (
                                "ranges",
                                schema.Scalar(
                                    (np.int32, (len(feature_specs.feature_ids), 2)),
                                    self.get_next_blob_reference(field + "_ranges"),
                                ),
                            ),
                            (
                                "values",
                                schema.Scalar(
                                    np.int64,
                                    self.get_next_blob_reference(field + "_values"),
                                ),
                            ),
                        ),
                    )
                )
            elif feature_specs.feature_type == "ID_SCORE_LIST":
                outputs.append(
                    (
                        field,
                        schema.Struct(
                            (
                                "ranges",
                                schema.Scalar(
                                    (np.int32, (len(feature_specs.feature_ids), 2)),
                                    self.get_next_blob_reference(field + "_ranges"),
                                ),
                            ),
                            (
                                "ids",
                                schema.Scalar(
                                    np.int64,
                                    self.get_next_blob_reference(field + "_ids"),
                                ),
                            ),
                            (
                                "scores",
                                schema.Scalar(
                                    np.float32,
                                    self.get_next_blob_reference(field + "_scores"),
                                ),
                            ),
                        ),
                    )
                )
            elif feature_specs.feature_type == "EMBEDDING":
                # We don't know dimensions of embeddings in input data.
                # Even though they should match dimensions from feature config,
                # we keep ranges blob to check input data later.
                outputs.append(
                    (
                        field,
                        schema.Struct(
                            (
                                "ranges",
                                schema.Scalar(
                                    (np.int32, (len(feature_specs.feature_ids), 2)),
                                    self.get_next_blob_reference(field + "_ranges"),
                                ),
                            ),
                            (
                                "values",
                                schema.Scalar(
                                    np.float32,
                                    self.get_next_blob_reference(field + "_values"),
                                ),
                            ),
                        ),
                    )
                )
            elif feature_specs.feature_type == "GENERIC_FEATURE":
                # We don't know dimensions of embeddings in input data.
                # Even though they should match dimensions from feature config,
                # we keep ranges blob to check input data later.
                # Currently this schema with ranges and values is only for
                # generic type enum 1. If new types are implemented, we need to
                # modify the ParseGeneric operator, and this part accordingly
                outputs.append(
                    (
                        field,
                        schema.Struct(
                            (
                                "ranges",
                                schema.Scalar(
                                    (np.int32, (len(feature_specs.feature_ids), 2)),
                                    self.get_next_blob_reference(field + "_ranges"),
                                ),
                            ),
                            (
                                "values",
                                schema.Scalar(
                                    np.float32,
                                    self.get_next_blob_reference(field + "_values"),
                                ),
                            ),
                        ),
                    )
                )
            else:
                raise TypeError(
                    "Unsupported input type: {0}".format(feature_specs.feature_type)
                )

        # TODO(amalevich): This schema is producing ranges. And thus if there is
        # something using it it should support ranges as well. It might be
        # confusing, if we don't add better support for ranges/have it as a
        # first layer
        self.output_schema = schema.Struct(*outputs)

        # TODO(amalevich): Consider moving this data to schema, instead
        # Structs doesn't support attaching metadata to them and clonning
        # will break things badly, but this is the most elegant way to pass
        # this info around. Should we change it or it'll be too much work and
        # not worse it?
        for field, feature_specs in input_specs:
            schema.attach_metadata_to_scalars(
                self.output_schema[field], schema.Metadata(feature_specs=feature_specs)
            )

    # Add operators to all types that need to be densified
    def add_ops(self, net):
        record = self.input_record
        for field, feature_specs in self.input_specs:
            if feature_specs.feature_type == "FLOAT":
                net.SparseToDenseMask(
                    [
                        record[field].keys(),
                        record[field].values(),
                        self.default_float_value,
                        record[field].lengths(),
                    ],
                    [self.output_schema[field]()],
                    mask=feature_specs.feature_ids,
                )
            elif feature_specs.feature_type == "ID_LIST":
                id_list_ranges = net.LengthsToRanges(
                    record[field].values.lengths(), net.NextScopedBlob("id_list_ranges")
                )
                net.SparseToDenseMask(
                    [
                        record[field].keys(),
                        id_list_ranges,
                        self.zero_range,
                        record[field].lengths(),
                    ],
                    self.output_schema[field].ranges(),
                    mask=feature_specs.feature_ids,
                )
                # Alias helps to enforce the fact that all SparseToDense calls
                # produce new blobs.
                # Reusing blob names might result in some weird consequences
                # during the delivery time, when content of the blobs is
                # generated based on the inputSpecs.
                net.Alias(
                    record[field].values.items(), self.output_schema[field].values()
                )
            elif feature_specs.feature_type == "ID_SCORE_LIST":
                # TODO: merge this to the case above?
                id_list_ranges = net.LengthsToRanges(
                    record[field].values.lengths(),
                    net.NextScopedBlob("id_score_list_ranges"),
                )
                net.SparseToDenseMask(
                    [
                        record[field].keys(),
                        id_list_ranges,
                        self.zero_range,
                        record[field].lengths(),
                    ],
                    self.output_schema[field].ranges(),
                    mask=feature_specs.feature_ids,
                )
                # Alias helps to enforce the fact that all SparseToDense calls
                # produce new blobs.
                # Reusing blob names might result in some weird consequences
                # during the delivery time, when content of the blobs is
                # generated based on the inputSpecs.
                net.Alias(record[field].values.keys(), self.output_schema[field].ids())
                net.Alias(
                    record[field].values.values(), self.output_schema[field].scores()
                )
            elif feature_specs.feature_type == "EMBEDDING":
                ranges = net.LengthsToRanges(
                    record[field].values.lengths(),
                    net.NextScopedBlob("embeddings_ranges"),
                )
                net.SparseToDenseMask(
                    [
                        record[field].keys(),
                        ranges,
                        self.zero_range,
                        record[field].lengths(),
                    ],
                    self.output_schema[field].ranges(),
                    mask=feature_specs.feature_ids,
                )
                # Alias helps to enforce the fact that all SparseToDense calls
                # produce new blobs.
                # Reusing blob names might result in some weird consequences
                # during the delivery time, when content of the blobs is
                # generated based on the inputSpecs.
                net.Alias(
                    record[field].values.items(), self.output_schema[field].values()
                )
            elif feature_specs.feature_type == "GENERIC_FEATURE":
                (
                    feature_lengths_blob,
                    feature_ids_blob,
                    value_lengths_blob,
                    value_values_blob,
                ) = net.ParseGeneric(
                    [record[field]()],
                    ["feature_lengths", "feature_ids", "value_lengths", "value_values"],
                    feature_type_enum=1,
                )
                # Currently our implementation only supports
                # generic type enum 1. If new types are implemented, we need to
                # modify the ParseGeneric operator, the schema above,
                # and this part accordingly to parse the generic feature strings
                # into input_record

                ranges = net.LengthsToRanges(
                    value_lengths_blob, net.NextScopedBlob("generics_ranges")
                )
                net.SparseToDenseMask(
                    [feature_ids_blob, ranges, self.zero_range, feature_lengths_blob],
                    self.output_schema[field].ranges(),
                    mask=feature_specs.feature_ids,
                )
                # Alias helps to enforce the fact that all SparseToDense calls
                # produce new blobs.
                # Reusing blob names might result in some weird consequences
                # during the delivery time, when content of the blobs is
                # generated based on the inputSpecs.
                net.Alias(value_values_blob, self.output_schema[field].values())

    def get_metadata(self):
        metadata = []
        for field, feature_specs in self.input_specs:
            metadata.append(
                (
                    {
                        "type": feature_specs.feature_type,
                        "names": feature_specs.feature_names,
                        "ids": feature_specs.feature_ids,
                    },
                    self.output_schema[field].field_blobs(),
                    self.output_schema[field].field_types(),
                )
            )
            if feature_specs.feature_type == "FLOAT":
                metadata[-1][0]["cardinality"] = 1
        return metadata

    def get_accessed_features(self):
        accessed_features = defaultdict(list)

        # The features that are accessed are just those features that appear in
        # the input specs
        for field, feature_specs in self.input_specs:
            accessed_features[field].append(
                AccessedFeatures(
                    feature_specs.feature_type, set(feature_specs.feature_ids)
                )
            )

        return accessed_features