""" Non-negative matrix factorization
"""
# Author: Vlad Niculae
# Lars Buitinck
# Mathieu Blondel <mathieu@mblondel.org>
# Tom Dupre la Tour
# License: BSD 3 clause
import numbers
import numpy as np
import scipy.sparse as sp
import time
import warnings
from math import sqrt
from ._cdnmf_fast import _update_cdnmf_fast
from ..base import BaseEstimator, TransformerMixin
from ..exceptions import ConvergenceWarning
from ..utils import check_random_state, check_array
from ..utils.extmath import randomized_svd, safe_sparse_dot, squared_norm
from ..utils.validation import check_is_fitted, check_non_negative
from ..utils.validation import _deprecate_positional_args
EPSILON = np.finfo(np.float32).eps
def norm(x):
"""Dot product-based Euclidean norm implementation
See: http://fseoane.net/blog/2011/computing-the-vector-norm/
Parameters
----------
x : array-like
Vector for which to compute the norm
"""
return sqrt(squared_norm(x))
def trace_dot(X, Y):
"""Trace of np.dot(X, Y.T).
Parameters
----------
X : array-like
First matrix
Y : array-like
Second matrix
"""
return np.dot(X.ravel(), Y.ravel())
def _check_init(A, shape, whom):
A = check_array(A)
if np.shape(A) != shape:
raise ValueError('Array with wrong shape passed to %s. Expected %s, '
'but got %s ' % (whom, shape, np.shape(A)))
check_non_negative(A, whom)
if np.max(A) == 0:
raise ValueError('Array passed to %s is full of zeros.' % whom)
def _beta_divergence(X, W, H, beta, square_root=False):
"""Compute the beta-divergence of X and dot(W, H).
Parameters
----------
X : float or array-like, shape (n_samples, n_features)
W : float or dense array-like, shape (n_samples, n_components)
H : float or dense array-like, shape (n_components, n_features)
beta : float, string in {'frobenius', 'kullback-leibler', 'itakura-saito'}
Parameter of the beta-divergence.
If beta == 2, this is half the Frobenius *squared* norm.
If beta == 1, this is the generalized Kullback-Leibler divergence.
If beta == 0, this is the Itakura-Saito divergence.
Else, this is the general beta-divergence.
square_root : boolean, default False
If True, return np.sqrt(2 * res)
For beta == 2, it corresponds to the Frobenius norm.
Returns
-------
res : float
Beta divergence of X and np.dot(X, H)
"""
beta = _beta_loss_to_float(beta)
# The method can be called with scalars
if not sp.issparse(X):
X = np.atleast_2d(X)
W = np.atleast_2d(W)
H = np.atleast_2d(H)
# Frobenius norm
if beta == 2:
# Avoid the creation of the dense np.dot(W, H) if X is sparse.
if sp.issparse(X):
norm_X = np.dot(X.data, X.data)
norm_WH = trace_dot(np.dot(np.dot(W.T, W), H), H)
cross_prod = trace_dot((X * H.T), W)
res = (norm_X + norm_WH - 2. * cross_prod) / 2.
else:
res = squared_norm(X - np.dot(W, H)) / 2.
if square_root:
return np.sqrt(res * 2)
else:
return res
if sp.issparse(X):
# compute np.dot(W, H) only where X is nonzero
WH_data = _special_sparse_dot(W, H, X).data
X_data = X.data
else:
WH = np.dot(W, H)
WH_data = WH.ravel()
X_data = X.ravel()
# do not affect the zeros: here 0 ** (-1) = 0 and not infinity
indices = X_data > EPSILON
WH_data = WH_data[indices]
X_data = X_data[indices]
# used to avoid division by zero
WH_data[WH_data == 0] = EPSILON
# generalized Kullback-Leibler divergence
if beta == 1:
# fast and memory efficient computation of np.sum(np.dot(W, H))
sum_WH = np.dot(np.sum(W, axis=0), np.sum(H, axis=1))
# computes np.sum(X * log(X / WH)) only where X is nonzero
div = X_data / WH_data
res = np.dot(X_data, np.log(div))
# add full np.sum(np.dot(W, H)) - np.sum(X)
res += sum_WH - X_data.sum()
# Itakura-Saito divergence
elif beta == 0:
div = X_data / WH_data
res = np.sum(div) - np.product(X.shape) - np.sum(np.log(div))
# beta-divergence, beta not in (0, 1, 2)
else:
if sp.issparse(X):
# slow loop, but memory efficient computation of :
# np.sum(np.dot(W, H) ** beta)
sum_WH_beta = 0
for i in range(X.shape[1]):
sum_WH_beta += np.sum(np.dot(W, H[:, i]) ** beta)
else:
sum_WH_beta = np.sum(WH ** beta)
sum_X_WH = np.dot(X_data, WH_data ** (beta - 1))
res = (X_data ** beta).sum() - beta * sum_X_WH
res += sum_WH_beta * (beta - 1)
res /= beta * (beta - 1)
if square_root:
return np.sqrt(2 * res)
else:
return res
def _special_sparse_dot(W, H, X):
"""Computes np.dot(W, H), only where X is non zero."""
if sp.issparse(X):
ii, jj = X.nonzero()
n_vals = ii.shape[0]
dot_vals = np.empty(n_vals)
n_components = W.shape[1]
batch_size = max(n_components, n_vals // n_components)
for start in range(0, n_vals, batch_size):
batch = slice(start, start + batch_size)
dot_vals[batch] = np.multiply(W[ii[batch], :],
H.T[jj[batch], :]).sum(axis=1)
WH = sp.coo_matrix((dot_vals, (ii, jj)), shape=X.shape)
return WH.tocsr()
else:
return np.dot(W, H)
def _compute_regularization(alpha, l1_ratio, regularization):
"""Compute L1 and L2 regularization coefficients for W and H"""
alpha_H = 0.
alpha_W = 0.
if regularization in ('both', 'components'):
alpha_H = float(alpha)
if regularization in ('both', 'transformation'):
alpha_W = float(alpha)
l1_reg_W = alpha_W * l1_ratio
l1_reg_H = alpha_H * l1_ratio
l2_reg_W = alpha_W * (1. - l1_ratio)
l2_reg_H = alpha_H * (1. - l1_ratio)
return l1_reg_W, l1_reg_H, l2_reg_W, l2_reg_H
def _check_string_param(solver, regularization, beta_loss, init):
allowed_solver = ('cd', 'mu')
if solver not in allowed_solver:
raise ValueError(
'Invalid solver parameter: got %r instead of one of %r' %
(solver, allowed_solver))
allowed_regularization = ('both', 'components', 'transformation', None)
if regularization not in allowed_regularization:
raise ValueError(
'Invalid regularization parameter: got %r instead of one of %r' %
(regularization, allowed_regularization))
# 'mu' is the only solver that handles other beta losses than 'frobenius'
if solver != 'mu' and beta_loss not in (2, 'frobenius'):
raise ValueError(
'Invalid beta_loss parameter: solver %r does not handle beta_loss'
' = %r' % (solver, beta_loss))
if solver == 'mu' and init == 'nndsvd':
warnings.warn("The multiplicative update ('mu') solver cannot update "
"zeros present in the initialization, and so leads to "
"poorer results when used jointly with init='nndsvd'. "
"You may try init='nndsvda' or init='nndsvdar' instead.",
UserWarning)
beta_loss = _beta_loss_to_float(beta_loss)
return beta_loss
def _beta_loss_to_float(beta_loss):
"""Convert string beta_loss to float"""
allowed_beta_loss = {'frobenius': 2,
'kullback-leibler': 1,
'itakura-saito': 0}
if isinstance(beta_loss, str) and beta_loss in allowed_beta_loss:
beta_loss = allowed_beta_loss[beta_loss]
if not isinstance(beta_loss, numbers.Number):
raise ValueError('Invalid beta_loss parameter: got %r instead '
'of one of %r, or a float.' %
(beta_loss, allowed_beta_loss.keys()))
return beta_loss
def _initialize_nmf(X, n_components, init=None, eps=1e-6,
random_state=None):
"""Algorithms for NMF initialization.
Computes an initial guess for the non-negative
rank k matrix approximation for X: X = WH
Parameters
----------
X : array-like, shape (n_samples, n_features)
The data matrix to be decomposed.
n_components : integer
The number of components desired in the approximation.
init : None | 'random' | 'nndsvd' | 'nndsvda' | 'nndsvdar'
Method used to initialize the procedure.
Default: None.
Valid options:
- None: 'nndsvd' if n_components <= min(n_samples, n_features),
otherwise 'random'.
- 'random': non-negative random matrices, scaled with:
sqrt(X.mean() / n_components)
- 'nndsvd': Nonnegative Double Singular Value Decomposition (NNDSVD)
initialization (better for sparseness)
- 'nndsvda': NNDSVD with zeros filled with the average of X
(better when sparsity is not desired)
- 'nndsvdar': NNDSVD with zeros filled with small random values
(generally faster, less accurate alternative to NNDSVDa
for when sparsity is not desired)
- 'custom': use custom matrices W and H
eps : float
Truncate all values less then this in output to zero.
random_state : int, RandomState instance, default=None
Used when ``init`` == 'nndsvdar' or 'random'. Pass an int for
reproducible results across multiple function calls.
See :term:`Glossary <random_state>`.
Returns
-------
W : array-like, shape (n_samples, n_components)
Initial guesses for solving X ~= WH
H : array-like, shape (n_components, n_features)
Initial guesses for solving X ~= WH
References
----------
C. Boutsidis, E. Gallopoulos: SVD based initialization: A head start for
nonnegative matrix factorization - Pattern Recognition, 2008
http://tinyurl.com/nndsvd
"""
check_non_negative(X, "NMF initialization")
n_samples, n_features = X.shape
if (init is not None and init != 'random'
and n_components > min(n_samples, n_features)):
raise ValueError("init = '{}' can only be used when "
"n_components <= min(n_samples, n_features)"
.format(init))
if init is None:
if n_components <= min(n_samples, n_features):
init = 'nndsvd'
else:
init = 'random'
# Random initialization
if init == 'random':
avg = np.sqrt(X.mean() / n_components)
rng = check_random_state(random_state)
H = avg * rng.randn(n_components, n_features).astype(X.dtype,
copy=False)
W = avg * rng.randn(n_samples, n_components).astype(X.dtype,
copy=False)
np.abs(H, out=H)
np.abs(W, out=W)
return W, H
# NNDSVD initialization
U, S, V = randomized_svd(X, n_components, random_state=random_state)
W = np.zeros_like(U)
H = np.zeros_like(V)
# The leading singular triplet is non-negative
# so it can be used as is for initialization.
W[:, 0] = np.sqrt(S[0]) * np.abs(U[:, 0])
H[0, :] = np.sqrt(S[0]) * np.abs(V[0, :])
Loading ...