Repository URL to install this package:
|
Version:
1.14.0 ▾
|
# Copyright 2016 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""A canned Estimator for k-means clustering."""
# TODO(ccolby): Move clustering_ops.py into this file and streamline the code.
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import time
from tensorflow.contrib.factorization.python.ops import clustering_ops
from tensorflow.python.estimator import estimator
from tensorflow.python.estimator import model_fn as model_fn_lib
from tensorflow.python.estimator.export import export_output
from tensorflow.python.feature_column import feature_column_lib as fc
from tensorflow.python.framework import ops
from tensorflow.python.ops import array_ops
from tensorflow.python.ops import control_flow_ops
from tensorflow.python.ops import math_ops
from tensorflow.python.ops import metrics
from tensorflow.python.ops import state_ops
from tensorflow.python.platform import tf_logging as logging
from tensorflow.python.saved_model import signature_constants
from tensorflow.python.summary import summary
from tensorflow.python.training import session_run_hook
from tensorflow.python.training import training_util
class _LossRelativeChangeHook(session_run_hook.SessionRunHook):
"""Stops when the change in loss goes below a tolerance."""
def __init__(self, loss_tensor, tolerance):
"""Creates a _LossRelativeChangeHook.
Args:
loss_tensor: A scalar tensor of the loss value.
tolerance: A relative tolerance of loss change between iterations.
"""
self._loss_tensor = loss_tensor
self._tolerance = tolerance
self._prev_loss = None
def before_run(self, run_context):
del run_context # unused
return session_run_hook.SessionRunArgs(self._loss_tensor)
def after_run(self, run_context, run_values):
loss = run_values.results
assert loss is not None
if self._prev_loss:
relative_change = (
abs(loss - self._prev_loss) / (1 + abs(self._prev_loss)))
if relative_change < self._tolerance:
run_context.request_stop()
self._prev_loss = loss
class _InitializeClustersHook(session_run_hook.SessionRunHook):
"""Initializes the cluster centers.
The chief repeatedly invokes an initialization op until all cluster centers
are initialized. The workers wait for the initialization phase to complete.
"""
def __init__(self, init_op, is_initialized_var, is_chief):
"""Creates an _InitializeClustersHook.
Args:
init_op: An op that, when run, will choose some initial cluster centers.
This op may need to be run multiple times to choose all the centers.
is_initialized_var: A boolean variable reporting whether all initial
centers have been chosen.
is_chief: A boolean specifying whether this task is the chief.
"""
self._init_op = init_op
self._is_initialized_var = is_initialized_var
self._is_chief = is_chief
def after_create_session(self, session, coord):
del coord # unused
assert self._init_op.graph is ops.get_default_graph()
assert self._is_initialized_var.graph is self._init_op.graph
while True:
try:
if session.run(self._is_initialized_var):
break
elif self._is_chief:
session.run(self._init_op)
else:
time.sleep(1)
except RuntimeError as e:
logging.info(e)
def _parse_features_if_necessary(features, feature_columns):
"""Helper function to convert the input points into a usable format.
Args:
features: The input features.
feature_columns: An optionable iterable containing all the feature columns
used by the model. All items in the set should be feature column instances
that can be passed to `tf.compat.v1.feature_column.input_layer`. If this
is None, all features will be used.
Returns:
If `features` is a dict of `k` features (optionally filtered by
`feature_columns`), each of which is a vector of `n` scalars, the return
value is a Tensor of shape `(n, k)` representing `n` input points, where the
items in the `k` dimension are sorted lexicographically by `features` key.
If `features` is not a dict, it is returned unmodified.
"""
if not isinstance(features, dict):
return features
if feature_columns:
return fc.input_layer(features, feature_columns)
keys = sorted(features.keys())
with ops.colocate_with(features[keys[0]]):
return array_ops.concat([features[k] for k in keys], axis=1)
class _ModelFn(object):
"""Model function for the estimator."""
def __init__(self, num_clusters, initial_clusters, distance_metric,
random_seed, use_mini_batch, mini_batch_steps_per_iteration,
kmeans_plus_plus_num_retries, relative_tolerance,
feature_columns):
self._num_clusters = num_clusters
self._initial_clusters = initial_clusters
self._distance_metric = distance_metric
self._random_seed = random_seed
self._use_mini_batch = use_mini_batch
self._mini_batch_steps_per_iteration = mini_batch_steps_per_iteration
self._kmeans_plus_plus_num_retries = kmeans_plus_plus_num_retries
self._relative_tolerance = relative_tolerance
self._feature_columns = feature_columns
def model_fn(self, features, mode, config):
"""Model function for the estimator.
Note that this does not take a `labels` arg. This works, but `input_fn` must
return either `features` or, equivalently, `(features, None)`.
Args:
features: The input points. See `tf.estimator.Estimator`.
mode: See `tf.estimator.Estimator`.
config: See `tf.estimator.Estimator`.
Returns:
A `tf.estimator.EstimatorSpec` (see `tf.estimator.Estimator`) specifying
this behavior:
* `train_op`: Execute one mini-batch or full-batch run of Lloyd's
algorithm.
* `loss`: The sum of the squared distances from each input point to its
closest center.
* `eval_metric_ops`: Maps `SCORE` to `loss`.
* `predictions`: Maps `ALL_DISTANCES` to the distance from each input
point to each cluster center; maps `CLUSTER_INDEX` to the index of
the closest cluster center for each input point.
"""
# input_points is a single Tensor. Therefore, the sharding functionality
# in clustering_ops is unused, and some of the values below are lists of a
# single item.
input_points = _parse_features_if_necessary(features, self._feature_columns)
# Let N = the number of input_points.
# all_distances: A list of one matrix of shape (N, num_clusters). Each value
# is the distance from an input point to a cluster center.
# model_predictions: A list of one vector of shape (N). Each value is the
# cluster id of an input point.
# losses: Similar to cluster_idx but provides the distance to the cluster
# center.
# is_initialized: scalar indicating whether the initial cluster centers
# have been chosen; see init_op.
# init_op: an op to choose the initial cluster centers. A single worker
# repeatedly executes init_op until is_initialized becomes True.
# training_op: an op that runs an iteration of training, either an entire
# Lloyd iteration or a mini-batch of a Lloyd iteration. Multiple workers
# may execute this op, but only after is_initialized becomes True.
(all_distances, model_predictions, losses, is_initialized, init_op,
training_op) = clustering_ops.KMeans(
inputs=input_points,
num_clusters=self._num_clusters,
initial_clusters=self._initial_clusters,
distance_metric=self._distance_metric,
use_mini_batch=self._use_mini_batch,
mini_batch_steps_per_iteration=self._mini_batch_steps_per_iteration,
random_seed=self._random_seed,
kmeans_plus_plus_num_retries=self._kmeans_plus_plus_num_retries
).training_graph()
loss = math_ops.reduce_sum(losses)
summary.scalar('loss/raw', loss)
incr_step = state_ops.assign_add(training_util.get_global_step(), 1)
training_op = control_flow_ops.with_dependencies([training_op, incr_step],
loss)
training_hooks = [
_InitializeClustersHook(init_op, is_initialized, config.is_chief)
]
if self._relative_tolerance is not None:
training_hooks.append(
_LossRelativeChangeHook(loss, self._relative_tolerance))
export_outputs = {
KMeansClustering.ALL_DISTANCES:
export_output.PredictOutput(all_distances[0]),
KMeansClustering.CLUSTER_INDEX:
export_output.PredictOutput(model_predictions[0]),
signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY:
export_output.PredictOutput(model_predictions[0])
}
return model_fn_lib.EstimatorSpec(
mode=mode,
predictions={
KMeansClustering.ALL_DISTANCES: all_distances[0],
KMeansClustering.CLUSTER_INDEX: model_predictions[0],
},
loss=loss,
train_op=training_op,
eval_metric_ops={KMeansClustering.SCORE: metrics.mean(loss)},
training_hooks=training_hooks,
export_outputs=export_outputs)
# TODO(agarwal,ands): support sharded input.
class KMeansClustering(estimator.Estimator):
"""An Estimator for K-Means clustering.
Example:
```
import numpy as np
import tensorflow as tf
num_points = 100
dimensions = 2
points = np.random.uniform(0, 1000, [num_points, dimensions])
def input_fn():
return tf.compat.v1.train.limit_epochs(
tf.convert_to_tensor(points, dtype=tf.float32), num_epochs=1)
num_clusters = 5
kmeans = tf.contrib.factorization.KMeansClustering(
num_clusters=num_clusters, use_mini_batch=False)
# train
num_iterations = 10
previous_centers = None
for _ in xrange(num_iterations):
kmeans.train(input_fn)
cluster_centers = kmeans.cluster_centers()
if previous_centers is not None:
print 'delta:', cluster_centers - previous_centers
previous_centers = cluster_centers
print 'score:', kmeans.score(input_fn)
print 'cluster centers:', cluster_centers
# map the input points to their clusters
cluster_indices = list(kmeans.predict_cluster_index(input_fn))
for i, point in enumerate(points):
cluster_index = cluster_indices[i]
center = cluster_centers[cluster_index]
print 'point:', point, 'is in cluster', cluster_index, 'centered at', center
```
The `SavedModel` saved by the `export_savedmodel` method does not include the
cluster centers. However, the cluster centers may be retrieved by the
latest checkpoint saved during training. Specifically,
```
kmeans.cluster_centers()
```
is equivalent to
```
tf.train.load_variable(
kmeans.model_dir, KMeansClustering.CLUSTER_CENTERS_VAR_NAME)
```
"""
# Valid values for the distance_metric constructor argument.
SQUARED_EUCLIDEAN_DISTANCE = clustering_ops.SQUARED_EUCLIDEAN_DISTANCE
COSINE_DISTANCE = clustering_ops.COSINE_DISTANCE
# Values for initial_clusters constructor argument.
RANDOM_INIT = clustering_ops.RANDOM_INIT
KMEANS_PLUS_PLUS_INIT = clustering_ops.KMEANS_PLUS_PLUS_INIT
# Metric returned by evaluate(): The sum of the squared distances from each
# input point to its closest center.
SCORE = 'score'
# Keys returned by predict().
# ALL_DISTANCES: The distance from each input point to each cluster center.
# CLUSTER_INDEX: The index of the closest cluster center for each input point.
CLUSTER_INDEX = 'cluster_index'
ALL_DISTANCES = 'all_distances'
# Variable name used by cluster_centers().
CLUSTER_CENTERS_VAR_NAME = clustering_ops.CLUSTERS_VAR_NAME
def __init__(self,
num_clusters,
model_dir=None,
initial_clusters=RANDOM_INIT,
distance_metric=SQUARED_EUCLIDEAN_DISTANCE,
random_seed=0,
use_mini_batch=True,
mini_batch_steps_per_iteration=1,
kmeans_plus_plus_num_retries=2,
relative_tolerance=None,
config=None,
feature_columns=None):
"""Creates an Estimator for running KMeans training and inference.
This Estimator implements the following variants of the K-means algorithm:
If `use_mini_batch` is False, it runs standard full batch K-means. Each
training step runs a single iteration of K-Means and must process the full
input at once. To run in this mode, the `input_fn` passed to `train` must
return the entire input dataset.
If `use_mini_batch` is True, it runs a generalization of the mini-batch
K-means algorithm. It runs multiple iterations, where each iteration is
composed of `mini_batch_steps_per_iteration` steps. Each training step
accumulates the contribution from one mini-batch into temporary storage.
Every `mini_batch_steps_per_iteration` steps, the cluster centers are
updated and the temporary storage cleared for the next iteration. Note
that:
* If `mini_batch_steps_per_iteration=1`, the algorithm reduces to the
standard K-means mini-batch algorithm.
* If `mini_batch_steps_per_iteration = num_inputs / batch_size`, the
algorithm becomes an asynchronous version of the full-batch algorithm.
However, there is no guarantee by this implementation that each input
is seen exactly once per iteration. Also, different updates are applied
asynchronously without locking. So this asynchronous version may not
behave exactly like a full-batch version.
Args:
num_clusters: An integer tensor specifying the number of clusters. This
argument is ignored if `initial_clusters` is a tensor or numpy array.
model_dir: The directory to save the model results and log files.
initial_clusters: Specifies how the initial cluster centers are chosen.
One of the following: * a tensor or numpy array with the initial cluster
centers. * a callable `f(inputs, k)` that selects and returns up to
`k` centers from an input batch. `f` is free to return any number of
centers from `0` to `k`. It will be invoked on successive input
batches as necessary until all `num_clusters` centers are chosen.
* `KMeansClustering.RANDOM_INIT`: Choose centers randomly from an input
batch. If the batch size is less than `num_clusters` then the entire
batch is chosen to be initial cluster centers and the remaining
centers are chosen from successive input batches.
* `KMeansClustering.KMEANS_PLUS_PLUS_INIT`: Use kmeans++ to choose
centers from the first input batch. If the batch size is less than
`num_clusters`, a TensorFlow runtime error occurs.
distance_metric: The distance metric used for clustering. One of:
* `KMeansClustering.SQUARED_EUCLIDEAN_DISTANCE`: Euclidean distance
between vectors `u` and `v` is defined as \\(||u - v||_2\\) which is
the square root of the sum of the absolute squares of the elements'
difference.
* `KMeansClustering.COSINE_DISTANCE`: Cosine distance between vectors
`u` and `v` is defined as \\(1 - (u . v) / (||u||_2 ||v||_2)\\).
random_seed: Python integer. Seed for PRNG used to initialize centers.
use_mini_batch: A boolean specifying whether to use the mini-batch k-means
algorithm. See explanation above.
mini_batch_steps_per_iteration: The number of steps after which the
updated cluster centers are synced back to a master copy. Used only if
`use_mini_batch=True`. See explanation above.
kmeans_plus_plus_num_retries: For each point that is sampled during
kmeans++ initialization, this parameter specifies the number of
additional points to draw from the current distribution before selecting
the best. If a negative value is specified, a heuristic is used to
sample `O(log(num_to_sample))` additional points. Used only if
`initial_clusters=KMeansClustering.KMEANS_PLUS_PLUS_INIT`.
relative_tolerance: A relative tolerance of change in the loss between
iterations. Stops learning if the loss changes less than this amount.
This may not work correctly if `use_mini_batch=True`.
config: See `tf.estimator.Estimator`.
feature_columns: An optionable iterable containing all the feature columns
used by the model. All items in the set should be feature column
instances that can be passed to
`tf.compat.v1.feature_column.input_layer`. If this is None, all features
will be used.
Raises:
ValueError: An invalid argument was passed to `initial_clusters` or
`distance_metric`.
"""
if isinstance(initial_clusters, str) and initial_clusters not in [
KMeansClustering.RANDOM_INIT, KMeansClustering.KMEANS_PLUS_PLUS_INIT
]:
raise ValueError("Unsupported initialization algorithm '%s'" %
initial_clusters)
if distance_metric not in [
KMeansClustering.SQUARED_EUCLIDEAN_DISTANCE,
KMeansClustering.COSINE_DISTANCE
]:
raise ValueError("Unsupported distance metric '%s'" % distance_metric)
super(KMeansClustering, self).__init__(
model_fn=_ModelFn(num_clusters, initial_clusters, distance_metric,
random_seed, use_mini_batch,
mini_batch_steps_per_iteration,
kmeans_plus_plus_num_retries, relative_tolerance,
feature_columns).model_fn,
model_dir=model_dir,
config=config)
def _predict_one_key(self, input_fn, predict_key):
for result in self.predict(input_fn=input_fn, predict_keys=[predict_key]):
yield result[predict_key]
def predict_cluster_index(self, input_fn):
"""Finds the index of the closest cluster center to each input point.
Args:
input_fn: Input points. See `tf.estimator.Estimator.predict`.
Yields:
The index of the closest cluster center for each input point.
"""
for index in self._predict_one_key(input_fn,
KMeansClustering.CLUSTER_INDEX):
yield index
def score(self, input_fn):
"""Returns the sum of squared distances to nearest clusters.
Note that this function is different from the corresponding one in sklearn
which returns the negative sum.
Args:
input_fn: Input points. See `tf.estimator.Estimator.evaluate`. Only one
batch is retrieved.
Returns:
The sum of the squared distance from each point in the first batch of
inputs to its nearest cluster center.
"""
return self.evaluate(input_fn=input_fn, steps=1)[KMeansClustering.SCORE]
def transform(self, input_fn):
"""Transforms each input point to its distances to all cluster centers.
Note that if `distance_metric=KMeansClustering.SQUARED_EUCLIDEAN_DISTANCE`,
this
function returns the squared Euclidean distance while the corresponding
sklearn function returns the Euclidean distance.
Args:
input_fn: Input points. See `tf.estimator.Estimator.predict`.
Yields:
The distances from each input point to each cluster center.
"""
for distances in self._predict_one_key(input_fn,
KMeansClustering.ALL_DISTANCES):
yield distances
def cluster_centers(self):
"""Returns the cluster centers."""
return self.get_variable_value(KMeansClustering.CLUSTER_CENTERS_VAR_NAME)