You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1016 lines
31 KiB

import itertools
import warnings
from functools import partial
import numpy as np
import pytest
import sklearn
from sklearn.base import clone
from sklearn.decomposition import (
DictionaryLearning,
MiniBatchDictionaryLearning,
SparseCoder,
dict_learning,
dict_learning_online,
sparse_encode,
)
from sklearn.decomposition._dict_learning import _update_dict
from sklearn.exceptions import ConvergenceWarning
from sklearn.utils import check_array
from sklearn.utils._testing import (
TempMemmap,
assert_allclose,
assert_array_almost_equal,
assert_array_equal,
ignore_warnings,
)
from sklearn.utils.estimator_checks import (
check_transformer_data_not_an_array,
check_transformer_general,
check_transformers_unfitted,
)
from sklearn.utils.parallel import Parallel
rng_global = np.random.RandomState(0)
n_samples, n_features = 10, 8
X = rng_global.randn(n_samples, n_features)
# TODO: remove mark once loky bug is fixed:
# https://github.com/joblib/loky/issues/458
@pytest.mark.thread_unsafe
def test_sparse_encode_shapes_omp():
rng = np.random.RandomState(0)
algorithms = ["omp", "lasso_lars", "lasso_cd", "lars", "threshold"]
for n_components, n_samples in itertools.product([1, 5], [1, 9]):
X_ = rng.randn(n_samples, n_features)
dictionary = rng.randn(n_components, n_features)
for algorithm, n_jobs in itertools.product(algorithms, [1, 2]):
code = sparse_encode(X_, dictionary, algorithm=algorithm, n_jobs=n_jobs)
assert code.shape == (n_samples, n_components)
def test_dict_learning_shapes():
n_components = 5
dico = DictionaryLearning(n_components, random_state=0).fit(X)
assert dico.components_.shape == (n_components, n_features)
n_components = 1
dico = DictionaryLearning(n_components, random_state=0).fit(X)
assert dico.components_.shape == (n_components, n_features)
assert dico.transform(X).shape == (X.shape[0], n_components)
def test_dict_learning_overcomplete():
n_components = 12
dico = DictionaryLearning(n_components, random_state=0).fit(X)
assert dico.components_.shape == (n_components, n_features)
def test_max_iter():
def ricker_function(resolution, center, width):
"""Discrete sub-sampled Ricker (Mexican hat) wavelet"""
x = np.linspace(0, resolution - 1, resolution)
x = (
(2 / (np.sqrt(3 * width) * np.pi**0.25))
* (1 - (x - center) ** 2 / width**2)
* np.exp(-((x - center) ** 2) / (2 * width**2))
)
return x
def ricker_matrix(width, resolution, n_components):
"""Dictionary of Ricker (Mexican hat) wavelets"""
centers = np.linspace(0, resolution - 1, n_components)
D = np.empty((n_components, resolution))
for i, center in enumerate(centers):
D[i] = ricker_function(resolution, center, width)
D /= np.sqrt(np.sum(D**2, axis=1))[:, np.newaxis]
return D
transform_algorithm = "lasso_cd"
resolution = 256
subsampling = 3 # subsampling factor
n_components = resolution // subsampling
# Compute a wavelet dictionary
D_multi = np.r_[
tuple(
ricker_matrix(
width=w, resolution=resolution, n_components=n_components // 5
)
for w in (10, 50, 100, 500)
)
]
X = np.linspace(0, resolution - 1, resolution)
first_quarter = X < resolution / 4
X[first_quarter] = 3.0
X[np.logical_not(first_quarter)] = -1.0
X = X.reshape(1, -1)
# check that the underlying model fails to converge
with pytest.warns(ConvergenceWarning):
model = SparseCoder(
D_multi, transform_algorithm=transform_algorithm, transform_max_iter=1
)
model.fit_transform(X)
# check that the underlying model converges w/o warnings
with warnings.catch_warnings():
warnings.simplefilter("error", ConvergenceWarning)
model = SparseCoder(
D_multi, transform_algorithm=transform_algorithm, transform_max_iter=500
)
model.fit_transform(X)
def test_dict_learning_lars_positive_parameter():
n_components = 5
alpha = 1
err_msg = "Positive constraint not supported for 'lars' coding method."
with pytest.raises(ValueError, match=err_msg):
dict_learning(X, n_components, alpha=alpha, positive_code=True)
@pytest.mark.parametrize(
"transform_algorithm",
[
"lasso_lars",
"lasso_cd",
"threshold",
],
)
@pytest.mark.parametrize("positive_code", [False, True])
@pytest.mark.parametrize("positive_dict", [False, True])
def test_dict_learning_positivity(transform_algorithm, positive_code, positive_dict):
n_components = 5
dico = DictionaryLearning(
n_components,
transform_algorithm=transform_algorithm,
random_state=0,
positive_code=positive_code,
positive_dict=positive_dict,
fit_algorithm="cd",
).fit(X)
code = dico.transform(X)
if positive_dict:
assert (dico.components_ >= 0).all()
else:
assert (dico.components_ < 0).any()
if positive_code:
assert (code >= 0).all()
else:
assert (code < 0).any()
@pytest.mark.parametrize("positive_dict", [False, True])
def test_dict_learning_lars_dict_positivity(positive_dict):
n_components = 5
dico = DictionaryLearning(
n_components,
transform_algorithm="lars",
random_state=0,
positive_dict=positive_dict,
fit_algorithm="cd",
).fit(X)
if positive_dict:
assert (dico.components_ >= 0).all()
else:
assert (dico.components_ < 0).any()
def test_dict_learning_lars_code_positivity():
n_components = 5
dico = DictionaryLearning(
n_components,
transform_algorithm="lars",
random_state=0,
positive_code=True,
fit_algorithm="cd",
).fit(X)
err_msg = "Positive constraint not supported for '{}' coding method."
err_msg = err_msg.format("lars")
with pytest.raises(ValueError, match=err_msg):
dico.transform(X)
def test_dict_learning_reconstruction():
n_components = 12
dico = DictionaryLearning(
n_components, transform_algorithm="omp", transform_alpha=0.001, random_state=0
)
code = dico.fit(X).transform(X)
assert_array_almost_equal(np.dot(code, dico.components_), X)
assert_array_almost_equal(dico.inverse_transform(code), X)
dico.set_params(transform_algorithm="lasso_lars")
code = dico.transform(X)
assert_array_almost_equal(np.dot(code, dico.components_), X, decimal=2)
assert_array_almost_equal(dico.inverse_transform(code), X, decimal=2)
# test error raised for wrong code size
with pytest.raises(ValueError, match="Expected 12, got 11."):
dico.inverse_transform(code[:, :-1])
# used to test lars here too, but there's no guarantee the number of
# nonzero atoms is right.
# TODO: remove mark once loky bug is fixed:
# https://github.com/joblib/loky/issues/458
@pytest.mark.thread_unsafe
def test_dict_learning_reconstruction_parallel():
# regression test that parallel reconstruction works with n_jobs>1
n_components = 12
dico = DictionaryLearning(
n_components,
transform_algorithm="omp",
transform_alpha=0.001,
random_state=0,
n_jobs=4,
)
code = dico.fit(X).transform(X)
assert_array_almost_equal(np.dot(code, dico.components_), X)
dico.set_params(transform_algorithm="lasso_lars")
code = dico.transform(X)
assert_array_almost_equal(np.dot(code, dico.components_), X, decimal=2)
# TODO: remove mark once loky bug is fixed:
# https://github.com/joblib/loky/issues/458
@pytest.mark.thread_unsafe
def test_dict_learning_lassocd_readonly_data():
n_components = 12
with TempMemmap(X) as X_read_only:
dico = DictionaryLearning(
n_components,
transform_algorithm="lasso_cd",
transform_alpha=0.001,
random_state=0,
n_jobs=4,
)
with ignore_warnings(category=ConvergenceWarning):
code = dico.fit(X_read_only).transform(X_read_only)
assert_array_almost_equal(
np.dot(code, dico.components_), X_read_only, decimal=2
)
def test_dict_learning_nonzero_coefs():
n_components = 4
dico = DictionaryLearning(
n_components,
transform_algorithm="lars",
transform_n_nonzero_coefs=3,
random_state=0,
)
code = dico.fit(X).transform(X[np.newaxis, 1])
assert len(np.flatnonzero(code)) == 3
dico.set_params(transform_algorithm="omp")
code = dico.transform(X[np.newaxis, 1])
assert len(np.flatnonzero(code)) == 3
def test_dict_learning_split():
n_components = 5
dico = DictionaryLearning(
n_components, transform_algorithm="threshold", random_state=0
)
code = dico.fit(X).transform(X)
Xr = dico.inverse_transform(code)
dico.split_sign = True
split_code = dico.transform(X)
assert_array_almost_equal(
split_code[:, :n_components] - split_code[:, n_components:], code
)
Xr2 = dico.inverse_transform(split_code)
assert_array_almost_equal(Xr, Xr2)
def test_dict_learning_online_shapes():
rng = np.random.RandomState(0)
n_components = 8
code, dictionary = dict_learning_online(
X,
n_components=n_components,
batch_size=4,
max_iter=10,
method="cd",
random_state=rng,
return_code=True,
)
assert code.shape == (n_samples, n_components)
assert dictionary.shape == (n_components, n_features)
assert np.dot(code, dictionary).shape == X.shape
dictionary = dict_learning_online(
X,
n_components=n_components,
batch_size=4,
max_iter=10,
method="cd",
random_state=rng,
return_code=False,
)
assert dictionary.shape == (n_components, n_features)
def test_dict_learning_online_lars_positive_parameter():
err_msg = "Positive constraint not supported for 'lars' coding method."
with pytest.raises(ValueError, match=err_msg):
dict_learning_online(X, batch_size=4, max_iter=10, positive_code=True)
@pytest.mark.parametrize(
"transform_algorithm",
[
"lasso_lars",
"lasso_cd",
"threshold",
],
)
@pytest.mark.parametrize("positive_code", [False, True])
@pytest.mark.parametrize("positive_dict", [False, True])
def test_minibatch_dictionary_learning_positivity(
transform_algorithm, positive_code, positive_dict
):
n_components = 8
dico = MiniBatchDictionaryLearning(
n_components,
batch_size=4,
max_iter=10,
transform_algorithm=transform_algorithm,
random_state=0,
positive_code=positive_code,
positive_dict=positive_dict,
fit_algorithm="cd",
).fit(X)
code = dico.transform(X)
if positive_dict:
assert (dico.components_ >= 0).all()
else:
assert (dico.components_ < 0).any()
if positive_code:
assert (code >= 0).all()
else:
assert (code < 0).any()
@pytest.mark.parametrize("positive_dict", [False, True])
def test_minibatch_dictionary_learning_lars(positive_dict):
n_components = 8
dico = MiniBatchDictionaryLearning(
n_components,
batch_size=4,
max_iter=10,
transform_algorithm="lars",
random_state=0,
positive_dict=positive_dict,
fit_algorithm="cd",
).fit(X)
if positive_dict:
assert (dico.components_ >= 0).all()
else:
assert (dico.components_ < 0).any()
@pytest.mark.parametrize("positive_code", [False, True])
@pytest.mark.parametrize("positive_dict", [False, True])
def test_dict_learning_online_positivity(positive_code, positive_dict):
rng = np.random.RandomState(0)
n_components = 8
code, dictionary = dict_learning_online(
X,
n_components=n_components,
batch_size=4,
method="cd",
alpha=1,
random_state=rng,
positive_dict=positive_dict,
positive_code=positive_code,
)
if positive_dict:
assert (dictionary >= 0).all()
else:
assert (dictionary < 0).any()
if positive_code:
assert (code >= 0).all()
else:
assert (code < 0).any()
def test_dict_learning_online_verbosity():
# test verbosity for better coverage
n_components = 5
import sys
from io import StringIO
old_stdout = sys.stdout
try:
sys.stdout = StringIO()
# convergence monitoring verbosity
dico = MiniBatchDictionaryLearning(
n_components, batch_size=4, max_iter=5, verbose=1, tol=0.1, random_state=0
)
dico.fit(X)
dico = MiniBatchDictionaryLearning(
n_components,
batch_size=4,
max_iter=5,
verbose=1,
max_no_improvement=2,
random_state=0,
)
dico.fit(X)
# higher verbosity level
dico = MiniBatchDictionaryLearning(
n_components, batch_size=4, max_iter=5, verbose=2, random_state=0
)
dico.fit(X)
# function API verbosity
dict_learning_online(
X,
n_components=n_components,
batch_size=4,
alpha=1,
verbose=1,
random_state=0,
)
dict_learning_online(
X,
n_components=n_components,
batch_size=4,
alpha=1,
verbose=2,
random_state=0,
)
finally:
sys.stdout = old_stdout
assert dico.components_.shape == (n_components, n_features)
def test_dict_learning_online_estimator_shapes():
n_components = 5
dico = MiniBatchDictionaryLearning(
n_components, batch_size=4, max_iter=5, random_state=0
)
dico.fit(X)
assert dico.components_.shape == (n_components, n_features)
def test_dict_learning_online_overcomplete():
n_components = 12
dico = MiniBatchDictionaryLearning(
n_components, batch_size=4, max_iter=5, random_state=0
).fit(X)
assert dico.components_.shape == (n_components, n_features)
def test_dict_learning_online_initialization():
n_components = 12
rng = np.random.RandomState(0)
V = rng.randn(n_components, n_features)
dico = MiniBatchDictionaryLearning(
n_components, batch_size=4, max_iter=0, dict_init=V, random_state=0
).fit(X)
assert_array_equal(dico.components_, V)
def test_dict_learning_online_readonly_initialization():
n_components = 12
rng = np.random.RandomState(0)
V = rng.randn(n_components, n_features)
V.setflags(write=False)
MiniBatchDictionaryLearning(
n_components,
batch_size=4,
max_iter=1,
dict_init=V,
random_state=0,
shuffle=False,
).fit(X)
def test_dict_learning_online_partial_fit():
n_components = 12
rng = np.random.RandomState(0)
V = rng.randn(n_components, n_features) # random init
V /= np.sum(V**2, axis=1)[:, np.newaxis]
dict1 = MiniBatchDictionaryLearning(
n_components,
max_iter=10,
batch_size=1,
alpha=1,
shuffle=False,
dict_init=V,
max_no_improvement=None,
tol=0.0,
random_state=0,
).fit(X)
dict2 = MiniBatchDictionaryLearning(
n_components, alpha=1, dict_init=V, random_state=0
)
for i in range(10):
for sample in X:
dict2.partial_fit(sample[np.newaxis, :])
assert not np.all(sparse_encode(X, dict1.components_, alpha=1) == 0)
assert_array_almost_equal(dict1.components_, dict2.components_, decimal=2)
# partial_fit should ignore max_iter (#17433)
assert dict1.n_steps_ == dict2.n_steps_ == 100
def test_sparse_encode_shapes():
n_components = 12
rng = np.random.RandomState(0)
V = rng.randn(n_components, n_features) # random init
V /= np.sum(V**2, axis=1)[:, np.newaxis]
for algo in ("lasso_lars", "lasso_cd", "lars", "omp", "threshold"):
code = sparse_encode(X, V, algorithm=algo)
assert code.shape == (n_samples, n_components)
@pytest.mark.parametrize("algo", ["lasso_lars", "lasso_cd", "threshold"])
@pytest.mark.parametrize("positive", [False, True])
def test_sparse_encode_positivity(algo, positive):
n_components = 12
rng = np.random.RandomState(0)
V = rng.randn(n_components, n_features) # random init
V /= np.sum(V**2, axis=1)[:, np.newaxis]
code = sparse_encode(X, V, algorithm=algo, positive=positive)
if positive:
assert (code >= 0).all()
else:
assert (code < 0).any()
@pytest.mark.parametrize("algo", ["lars", "omp"])
def test_sparse_encode_unavailable_positivity(algo):
n_components = 12
rng = np.random.RandomState(0)
V = rng.randn(n_components, n_features) # random init
V /= np.sum(V**2, axis=1)[:, np.newaxis]
err_msg = "Positive constraint not supported for '{}' coding method."
err_msg = err_msg.format(algo)
with pytest.raises(ValueError, match=err_msg):
sparse_encode(X, V, algorithm=algo, positive=True)
def test_sparse_encode_input():
n_components = 100
rng = np.random.RandomState(0)
V = rng.randn(n_components, n_features) # random init
V /= np.sum(V**2, axis=1)[:, np.newaxis]
Xf = check_array(X, order="F")
for algo in ("lasso_lars", "lasso_cd", "lars", "omp", "threshold"):
a = sparse_encode(X, V, algorithm=algo)
b = sparse_encode(Xf, V, algorithm=algo)
assert_array_almost_equal(a, b)
def test_sparse_encode_error():
n_components = 12
rng = np.random.RandomState(0)
V = rng.randn(n_components, n_features) # random init
V /= np.sum(V**2, axis=1)[:, np.newaxis]
code = sparse_encode(X, V, alpha=0.001)
assert not np.all(code == 0)
assert np.sqrt(np.sum((np.dot(code, V) - X) ** 2)) < 0.1
def test_sparse_encode_error_default_sparsity():
rng = np.random.RandomState(0)
X = rng.randn(100, 64)
D = rng.randn(2, 64)
code = ignore_warnings(sparse_encode)(X, D, algorithm="omp", n_nonzero_coefs=None)
assert code.shape == (100, 2)
def test_sparse_coder_estimator():
n_components = 12
rng = np.random.RandomState(0)
V = rng.randn(n_components, n_features) # random init
V /= np.sum(V**2, axis=1)[:, np.newaxis]
coder = SparseCoder(
dictionary=V, transform_algorithm="lasso_lars", transform_alpha=0.001
)
code = coder.fit_transform(X)
Xr = coder.inverse_transform(code)
assert not np.all(code == 0)
assert np.sqrt(np.sum((np.dot(code, V) - X) ** 2)) < 0.1
np.testing.assert_allclose(Xr, np.dot(code, V))
def test_sparse_coder_estimator_clone():
n_components = 12
rng = np.random.RandomState(0)
V = rng.normal(size=(n_components, n_features)) # random init
V /= np.sum(V**2, axis=1)[:, np.newaxis]
coder = SparseCoder(
dictionary=V, transform_algorithm="lasso_lars", transform_alpha=0.001
)
cloned = clone(coder)
assert id(cloned) != id(coder)
np.testing.assert_allclose(cloned.dictionary, coder.dictionary)
assert id(cloned.dictionary) != id(coder.dictionary)
data = np.random.rand(n_samples, n_features).astype(np.float32)
np.testing.assert_allclose(cloned.transform(data), coder.transform(data))
# TODO: remove mark once loky bug is fixed:
# https://github.com/joblib/loky/issues/458
@pytest.mark.thread_unsafe
def test_sparse_coder_parallel_mmap():
# Non-regression test for:
# https://github.com/scikit-learn/scikit-learn/issues/5956
# Test that SparseCoder does not error by passing reading only
# arrays to child processes
rng = np.random.RandomState(777)
n_components, n_features = 40, 64
init_dict = rng.rand(n_components, n_features)
# Ensure that `data` is >2M. Joblib memory maps arrays
# if they are larger than 1MB. The 4 accounts for float32
# data type
n_samples = int(2e6) // (4 * n_features)
data = np.random.rand(n_samples, n_features).astype(np.float32)
sc = SparseCoder(init_dict, transform_algorithm="omp", n_jobs=2)
sc.fit_transform(data)
def test_sparse_coder_common_transformer():
rng = np.random.RandomState(777)
n_components, n_features = 40, 3
init_dict = rng.rand(n_components, n_features)
sc = SparseCoder(init_dict)
check_transformer_data_not_an_array(sc.__class__.__name__, sc)
check_transformer_general(sc.__class__.__name__, sc)
check_transformer_general_memmap = partial(
check_transformer_general, readonly_memmap=True
)
check_transformer_general_memmap(sc.__class__.__name__, sc)
check_transformers_unfitted(sc.__class__.__name__, sc)
def test_sparse_coder_n_features_in():
d = np.array([[1, 2, 3], [1, 2, 3]])
X = np.array([[1, 2, 3]])
sc = SparseCoder(d)
sc.fit(X)
assert sc.n_features_in_ == d.shape[1]
def test_sparse_encoder_feature_number_error():
n_components = 10
rng = np.random.RandomState(0)
D = rng.uniform(size=(n_components, n_features))
X = rng.uniform(size=(n_samples, n_features + 1))
coder = SparseCoder(D)
with pytest.raises(
ValueError, match="Dictionary and X have different numbers of features"
):
coder.fit(X)
def test_update_dict():
# Check the dict update in batch mode vs online mode
# Non-regression test for #4866
rng = np.random.RandomState(0)
code = np.array([[0.5, -0.5], [0.1, 0.9]])
dictionary = np.array([[1.0, 0.0], [0.6, 0.8]])
X = np.dot(code, dictionary) + rng.randn(2, 2)
# full batch update
newd_batch = dictionary.copy()
_update_dict(newd_batch, X, code)
# online update
A = np.dot(code.T, code)
B = np.dot(X.T, code)
newd_online = dictionary.copy()
_update_dict(newd_online, X, code, A, B)
assert_allclose(newd_batch, newd_online)
@pytest.mark.parametrize(
"algorithm", ("lasso_lars", "lasso_cd", "lars", "threshold", "omp")
)
@pytest.mark.parametrize("data_type", (np.float32, np.float64))
# Note: do not check integer input because `lasso_lars` and `lars` fail with
# `ValueError` in `_lars_path_solver`
def test_sparse_encode_dtype_match(data_type, algorithm):
n_components = 6
rng = np.random.RandomState(0)
dictionary = rng.randn(n_components, n_features)
code = sparse_encode(
X.astype(data_type), dictionary.astype(data_type), algorithm=algorithm
)
assert code.dtype == data_type
@pytest.mark.parametrize(
"algorithm", ("lasso_lars", "lasso_cd", "lars", "threshold", "omp")
)
def test_sparse_encode_numerical_consistency(algorithm):
# verify numerical consistency among np.float32 and np.float64
rtol = 1e-4
n_components = 6
rng = np.random.RandomState(0)
dictionary = rng.randn(n_components, n_features)
code_32 = sparse_encode(
X.astype(np.float32), dictionary.astype(np.float32), algorithm=algorithm
)
code_64 = sparse_encode(
X.astype(np.float64), dictionary.astype(np.float64), algorithm=algorithm
)
assert_allclose(code_32, code_64, rtol=rtol)
@pytest.mark.parametrize(
"transform_algorithm", ("lasso_lars", "lasso_cd", "lars", "threshold", "omp")
)
@pytest.mark.parametrize("data_type", (np.float32, np.float64))
# Note: do not check integer input because `lasso_lars` and `lars` fail with
# `ValueError` in `_lars_path_solver`
def test_sparse_coder_dtype_match(data_type, transform_algorithm):
# Verify preserving dtype for transform in sparse coder
n_components = 6
rng = np.random.RandomState(0)
dictionary = rng.randn(n_components, n_features)
coder = SparseCoder(
dictionary.astype(data_type), transform_algorithm=transform_algorithm
)
code = coder.transform(X.astype(data_type))
assert code.dtype == data_type
@pytest.mark.parametrize("fit_algorithm", ("lars", "cd"))
@pytest.mark.parametrize(
"transform_algorithm", ("lasso_lars", "lasso_cd", "lars", "threshold", "omp")
)
@pytest.mark.parametrize(
"data_type, expected_type",
(
(np.float32, np.float32),
(np.float64, np.float64),
(np.int32, np.float64),
(np.int64, np.float64),
),
)
def test_dictionary_learning_dtype_match(
data_type,
expected_type,
fit_algorithm,
transform_algorithm,
):
# Verify preserving dtype for fit and transform in dictionary learning class
dict_learner = DictionaryLearning(
n_components=8,
fit_algorithm=fit_algorithm,
transform_algorithm=transform_algorithm,
random_state=0,
)
dict_learner.fit(X.astype(data_type))
assert dict_learner.components_.dtype == expected_type
assert dict_learner.transform(X.astype(data_type)).dtype == expected_type
@pytest.mark.parametrize("fit_algorithm", ("lars", "cd"))
@pytest.mark.parametrize(
"transform_algorithm", ("lasso_lars", "lasso_cd", "lars", "threshold", "omp")
)
@pytest.mark.parametrize(
"data_type, expected_type",
(
(np.float32, np.float32),
(np.float64, np.float64),
(np.int32, np.float64),
(np.int64, np.float64),
),
)
def test_minibatch_dictionary_learning_dtype_match(
data_type,
expected_type,
fit_algorithm,
transform_algorithm,
):
# Verify preserving dtype for fit and transform in minibatch dictionary learning
dict_learner = MiniBatchDictionaryLearning(
n_components=8,
batch_size=10,
fit_algorithm=fit_algorithm,
transform_algorithm=transform_algorithm,
max_iter=100,
tol=1e-1,
random_state=0,
)
dict_learner.fit(X.astype(data_type))
assert dict_learner.components_.dtype == expected_type
assert dict_learner.transform(X.astype(data_type)).dtype == expected_type
assert dict_learner._A.dtype == expected_type
assert dict_learner._B.dtype == expected_type
@pytest.mark.parametrize("method", ("lars", "cd"))
@pytest.mark.parametrize(
"data_type, expected_type",
(
(np.float32, np.float32),
(np.float64, np.float64),
(np.int32, np.float64),
(np.int64, np.float64),
),
)
def test_dict_learning_dtype_match(data_type, expected_type, method):
# Verify output matrix dtype
rng = np.random.RandomState(0)
n_components = 8
code, dictionary, _ = dict_learning(
X.astype(data_type),
n_components=n_components,
alpha=1,
random_state=rng,
method=method,
)
assert code.dtype == expected_type
assert dictionary.dtype == expected_type
@pytest.mark.parametrize("method", ("lars", "cd"))
def test_dict_learning_numerical_consistency(method):
# verify numerically consistent among np.float32 and np.float64
rtol = 1e-4
n_components = 4
alpha = 2
U_64, V_64, _ = dict_learning(
X.astype(np.float64),
n_components=n_components,
alpha=alpha,
random_state=0,
method=method,
)
U_32, V_32, _ = dict_learning(
X.astype(np.float32),
n_components=n_components,
alpha=alpha,
random_state=0,
method=method,
)
# Optimal solution (U*, V*) is not unique.
# If (U*, V*) is optimal solution, (-U*,-V*) is also optimal,
# and (column permutated U*, row permutated V*) are also optional
# as long as holding UV.
# So here UV, ||U||_1,1 and sum(||V_k||_2^2) are verified
# instead of comparing directly U and V.
assert_allclose(np.matmul(U_64, V_64), np.matmul(U_32, V_32), rtol=rtol)
assert_allclose(np.sum(np.abs(U_64)), np.sum(np.abs(U_32)), rtol=rtol)
assert_allclose(np.sum(V_64**2), np.sum(V_32**2), rtol=rtol)
# verify an obtained solution is not degenerate
assert np.mean(U_64 != 0.0) > 0.05
assert np.count_nonzero(U_64 != 0.0) == np.count_nonzero(U_32 != 0.0)
@pytest.mark.parametrize("method", ("lars", "cd"))
@pytest.mark.parametrize(
"data_type, expected_type",
(
(np.float32, np.float32),
(np.float64, np.float64),
(np.int32, np.float64),
(np.int64, np.float64),
),
)
def test_dict_learning_online_dtype_match(data_type, expected_type, method):
# Verify output matrix dtype
rng = np.random.RandomState(0)
n_components = 8
code, dictionary = dict_learning_online(
X.astype(data_type),
n_components=n_components,
alpha=1,
batch_size=10,
random_state=rng,
method=method,
)
assert code.dtype == expected_type
assert dictionary.dtype == expected_type
@pytest.mark.parametrize("method", ("lars", "cd"))
def test_dict_learning_online_numerical_consistency(method):
# verify numerically consistent among np.float32 and np.float64
rtol = 1e-4
n_components = 4
alpha = 1
U_64, V_64 = dict_learning_online(
X.astype(np.float64),
n_components=n_components,
max_iter=1_000,
alpha=alpha,
batch_size=10,
random_state=0,
method=method,
tol=0.0,
max_no_improvement=None,
)
U_32, V_32 = dict_learning_online(
X.astype(np.float32),
n_components=n_components,
max_iter=1_000,
alpha=alpha,
batch_size=10,
random_state=0,
method=method,
tol=0.0,
max_no_improvement=None,
)
# Optimal solution (U*, V*) is not unique.
# If (U*, V*) is optimal solution, (-U*,-V*) is also optimal,
# and (column permutated U*, row permutated V*) are also optional
# as long as holding UV.
# So here UV, ||U||_1,1 and sum(||V_k||_2) are verified
# instead of comparing directly U and V.
assert_allclose(np.matmul(U_64, V_64), np.matmul(U_32, V_32), rtol=rtol)
assert_allclose(np.sum(np.abs(U_64)), np.sum(np.abs(U_32)), rtol=rtol)
assert_allclose(np.sum(V_64**2), np.sum(V_32**2), rtol=rtol)
# verify an obtained solution is not degenerate
assert np.mean(U_64 != 0.0) > 0.05
assert np.count_nonzero(U_64 != 0.0) == np.count_nonzero(U_32 != 0.0)
@pytest.mark.parametrize(
"estimator",
[
SparseCoder(rng_global.uniform(size=(n_features, n_features))),
DictionaryLearning(),
MiniBatchDictionaryLearning(batch_size=4, max_iter=10),
],
ids=lambda x: x.__class__.__name__,
)
def test_get_feature_names_out(estimator):
"""Check feature names for dict learning estimators."""
estimator.fit(X)
n_components = X.shape[1]
feature_names_out = estimator.get_feature_names_out()
estimator_name = estimator.__class__.__name__.lower()
assert_array_equal(
feature_names_out,
[f"{estimator_name}{i}" for i in range(n_components)],
)
# TODO: remove mark once loky bug is fixed:
# https://github.com/joblib/loky/issues/458
@pytest.mark.thread_unsafe
def test_cd_work_on_joblib_memmapped_data(monkeypatch):
monkeypatch.setattr(
sklearn.decomposition._dict_learning,
"Parallel",
partial(Parallel, max_nbytes=100),
)
rng = np.random.RandomState(0)
X_train = rng.randn(10, 10)
dict_learner = DictionaryLearning(
n_components=5,
random_state=0,
n_jobs=2,
fit_algorithm="cd",
max_iter=50,
verbose=True,
)
# This must run and complete without error.
dict_learner.fit(X_train)