""" Dictionary learning
"""
# Author: Vlad Niculae, Gael Varoquaux, Alexandre Gramfort
# License: BSD 3 clause
import time
import sys
import itertools
from math import ceil
import numpy as np
from scipy import linalg
from joblib import Parallel, delayed, effective_n_jobs
from ..base import BaseEstimator, TransformerMixin
from ..utils import (check_array, check_random_state, gen_even_slices,
gen_batches)
from ..utils.extmath import randomized_svd, row_norms
from ..utils.validation import check_is_fitted, _deprecate_positional_args
from ..linear_model import Lasso, orthogonal_mp_gram, LassoLars, Lars
def _check_positive_coding(method, positive):
if positive and method in ["omp", "lars"]:
raise ValueError(
"Positive constraint not supported for '{}' "
"coding method.".format(method)
)
def _sparse_encode(X, dictionary, gram, cov=None, algorithm='lasso_lars',
regularization=None, copy_cov=True,
init=None, max_iter=1000, check_input=True, verbose=0,
positive=False):
"""Generic sparse coding
Each column of the result is the solution to a Lasso problem.
Parameters
----------
X : array of shape (n_samples, n_features)
Data matrix.
dictionary : array of shape (n_components, n_features)
The dictionary matrix against which to solve the sparse coding of
the data. Some of the algorithms assume normalized rows.
gram : None | array, shape=(n_components, n_components)
Precomputed Gram matrix, dictionary * dictionary'
gram can be None if method is 'threshold'.
cov : array, shape=(n_components, n_samples)
Precomputed covariance, dictionary * X'
algorithm : {'lasso_lars', 'lasso_cd', 'lars', 'omp', 'threshold'}
lars: uses the least angle regression method (linear_model.lars_path)
lasso_lars: uses Lars to compute the Lasso solution
lasso_cd: uses the coordinate descent method to compute the
Lasso solution (linear_model.Lasso). lasso_lars will be faster if
the estimated components are sparse.
omp: uses orthogonal matching pursuit to estimate the sparse solution
threshold: squashes to zero all coefficients less than regularization
from the projection dictionary * data'
regularization : int | float
The regularization parameter. It corresponds to alpha when
algorithm is 'lasso_lars', 'lasso_cd' or 'threshold'.
Otherwise it corresponds to n_nonzero_coefs.
init : array of shape (n_samples, n_components)
Initialization value of the sparse code. Only used if
`algorithm='lasso_cd'`.
max_iter : int, 1000 by default
Maximum number of iterations to perform if `algorithm='lasso_cd'` or
`lasso_lars`.
copy_cov : boolean, optional
Whether to copy the precomputed covariance matrix; if False, it may be
overwritten.
check_input : boolean, optional
If False, the input arrays X and dictionary will not be checked.
verbose : int
Controls the verbosity; the higher, the more messages. Defaults to 0.
positive: boolean
Whether to enforce a positivity constraint on the sparse code.
.. versionadded:: 0.20
Returns
-------
code : array of shape (n_components, n_features)
The sparse codes
See also
--------
sklearn.linear_model.lars_path
sklearn.linear_model.orthogonal_mp
sklearn.linear_model.Lasso
SparseCoder
"""
if X.ndim == 1:
X = X[:, np.newaxis]
n_samples, n_features = X.shape
n_components = dictionary.shape[0]
if dictionary.shape[1] != X.shape[1]:
raise ValueError("Dictionary and X have different numbers of features:"
"dictionary.shape: {} X.shape{}".format(
dictionary.shape, X.shape))
if cov is None and algorithm != 'lasso_cd':
# overwriting cov is safe
copy_cov = False
cov = np.dot(dictionary, X.T)
_check_positive_coding(algorithm, positive)
if algorithm == 'lasso_lars':
alpha = float(regularization) / n_features # account for scaling
try:
err_mgt = np.seterr(all='ignore')
# Not passing in verbose=max(0, verbose-1) because Lars.fit already
# corrects the verbosity level.
lasso_lars = LassoLars(alpha=alpha, fit_intercept=False,
verbose=verbose, normalize=False,
precompute=gram, fit_path=False,
positive=positive, max_iter=max_iter)
lasso_lars.fit(dictionary.T, X.T, Xy=cov)
new_code = lasso_lars.coef_
finally:
np.seterr(**err_mgt)
elif algorithm == 'lasso_cd':
alpha = float(regularization) / n_features # account for scaling
# TODO: Make verbosity argument for Lasso?
# sklearn.linear_model.coordinate_descent.enet_path has a verbosity
# argument that we could pass in from Lasso.
clf = Lasso(alpha=alpha, fit_intercept=False, normalize=False,
precompute=gram, max_iter=max_iter, warm_start=True,
positive=positive)
if init is not None:
clf.coef_ = init
clf.fit(dictionary.T, X.T, check_input=check_input)
new_code = clf.coef_
elif algorithm == 'lars':
try:
err_mgt = np.seterr(all='ignore')
# Not passing in verbose=max(0, verbose-1) because Lars.fit already
# corrects the verbosity level.
lars = Lars(fit_intercept=False, verbose=verbose, normalize=False,
precompute=gram, n_nonzero_coefs=int(regularization),
fit_path=False)
lars.fit(dictionary.T, X.T, Xy=cov)
new_code = lars.coef_
finally:
np.seterr(**err_mgt)
elif algorithm == 'threshold':
new_code = ((np.sign(cov) *
np.maximum(np.abs(cov) - regularization, 0)).T)
if positive:
np.clip(new_code, 0, None, out=new_code)
elif algorithm == 'omp':
new_code = orthogonal_mp_gram(
Gram=gram, Xy=cov, n_nonzero_coefs=int(regularization),
tol=None, norms_squared=row_norms(X, squared=True),
copy_Xy=copy_cov).T
else:
raise ValueError('Sparse coding method must be "lasso_lars" '
'"lasso_cd", "lasso", "threshold" or "omp", got %s.'
% algorithm)
if new_code.ndim != 2:
return new_code.reshape(n_samples, n_components)
return new_code
# XXX : could be moved to the linear_model module
@_deprecate_positional_args
def sparse_encode(X, dictionary, *, gram=None, cov=None,
algorithm='lasso_lars', n_nonzero_coefs=None, alpha=None,
copy_cov=True, init=None, max_iter=1000, n_jobs=None,
check_input=True, verbose=0, positive=False):
"""Sparse coding
Each row of the result is the solution to a sparse coding problem.
The goal is to find a sparse array `code` such that::
X ~= code * dictionary
Read more in the :ref:`User Guide <SparseCoder>`.
Parameters
----------
X : array of shape (n_samples, n_features)
Data matrix
dictionary : array of shape (n_components, n_features)
The dictionary matrix against which to solve the sparse coding of
the data. Some of the algorithms assume normalized rows for meaningful
output.
gram : array, shape=(n_components, n_components)
Precomputed Gram matrix, dictionary * dictionary'
cov : array, shape=(n_components, n_samples)
Precomputed covariance, dictionary' * X
algorithm : {'lasso_lars', 'lasso_cd', 'lars', 'omp', 'threshold'}
lars: uses the least angle regression method (linear_model.lars_path)
lasso_lars: uses Lars to compute the Lasso solution
lasso_cd: uses the coordinate descent method to compute the
Lasso solution (linear_model.Lasso). lasso_lars will be faster if
the estimated components are sparse.
omp: uses orthogonal matching pursuit to estimate the sparse solution
threshold: squashes to zero all coefficients less than alpha from
the projection dictionary * X'
n_nonzero_coefs : int, 0.1 * n_features by default
Number of nonzero coefficients to target in each column of the
solution. This is only used by `algorithm='lars'` and `algorithm='omp'`
and is overridden by `alpha` in the `omp` case.
alpha : float, 1. by default
If `algorithm='lasso_lars'` or `algorithm='lasso_cd'`, `alpha` is the
penalty applied to the L1 norm.
If `algorithm='threshold'`, `alpha` is the absolute value of the
threshold below which coefficients will be squashed to zero.
If `algorithm='omp'`, `alpha` is the tolerance parameter: the value of
the reconstruction error targeted. In this case, it overrides
`n_nonzero_coefs`.
copy_cov : boolean, optional
Whether to copy the precomputed covariance matrix; if False, it may be
overwritten.
init : array of shape (n_samples, n_components)
Initialization value of the sparse codes. Only used if
`algorithm='lasso_cd'`.
max_iter : int, 1000 by default
Maximum number of iterations to perform if `algorithm='lasso_cd'` or
`lasso_lars`.
n_jobs : int or None, optional (default=None)
Number of parallel jobs to run.
``None`` means 1 unless in a :obj:`joblib.parallel_backend` context.
``-1`` means using all processors. See :term:`Glossary <n_jobs>`
for more details.
check_input : boolean, optional
If False, the input arrays X and dictionary will not be checked.
verbose : int, optional
Controls the verbosity; the higher, the more messages. Defaults to 0.
positive : boolean, optional
Whether to enforce positivity when finding the encoding.
.. versionadded:: 0.20
Returns
-------
code : array of shape (n_samples, n_components)
The sparse codes
See also
--------
sklearn.linear_model.lars_path
sklearn.linear_model.orthogonal_mp
sklearn.linear_model.Lasso
SparseCoder
"""
if check_input:
if algorithm == 'lasso_cd':
dictionary = check_array(dictionary, order='C', dtype='float64')
X = check_array(X, order='C', dtype='float64')
else:
dictionary = check_array(dictionary)
X = check_array(X)
n_samples, n_features = X.shape
n_components = dictionary.shape[0]
if gram is None and algorithm != 'threshold':
gram = np.dot(dictionary, dictionary.T)
if cov is None and algorithm != 'lasso_cd':
copy_cov = False
cov = np.dot(dictionary, X.T)
if algorithm in ('lars', 'omp'):
regularization = n_nonzero_coefs
if regularization is None:
regularization = min(max(n_features / 10, 1), n_components)
else:
regularization = alpha
if regularization is None:
regularization = 1.
if effective_n_jobs(n_jobs) == 1 or algorithm == 'threshold':
code = _sparse_encode(X,
dictionary, gram, cov=cov,
algorithm=algorithm,
regularization=regularization, copy_cov=copy_cov,
init=init,
max_iter=max_iter,
check_input=False,
verbose=verbose,
positive=positive)
return code
# Enter parallel code block
code = np.empty((n_samples, n_components))
slices = list(gen_even_slices(n_samples, effective_n_jobs(n_jobs)))
code_views = Parallel(n_jobs=n_jobs, verbose=verbose)(
delayed(_sparse_encode)(
X[this_slice], dictionary, gram,
cov[:, this_slice] if cov is not None else None,
algorithm,
regularization=regularization, copy_cov=copy_cov,
init=init[this_slice] if init is not None else None,
max_iter=max_iter,
check_input=False,
verbose=verbose,
positive=positive)
for this_slice in slices)
for this_slice, this_view in zip(slices, code_views):
code[this_slice] = this_view
return code
def _update_dict(dictionary, Y, code, verbose=False, return_r2=False,
random_state=None, positive=False):
"""Update the dense dictionary factor in place.
Loading ...