You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

639 lines
23 KiB

from multiprocessing import Pool
from multiprocessing.pool import Pool as PWL
import re
import math
import functools
from fractions import Fraction
import numpy as np
from numpy.testing import assert_equal, assert_
import pytest
from pytest import raises as assert_raises
from scipy.conftest import skip_xp_invalid_arg
from scipy._lib._array_api import xp_assert_equal
from scipy._lib._util import (_aligned_zeros, check_random_state, MapWrapper,
getfullargspec_no_self, FullArgSpec,
rng_integers, _validate_int, _rename_parameter,
_contains_nan, _rng_html_rewrite, _workers_wrapper)
import scipy._lib.array_api_extra as xpx
from scipy._lib.array_api_extra.testing import lazy_xp_function
from scipy import cluster, interpolate, linalg, optimize, sparse, spatial, stats
lazy_xp_function(_contains_nan)
@pytest.mark.slow
def test__aligned_zeros():
niter = 10
def check(shape, dtype, order, align):
err_msg = repr((shape, dtype, order, align))
x = _aligned_zeros(shape, dtype, order, align=align)
if align is None:
align = np.dtype(dtype).alignment
assert_equal(x.__array_interface__['data'][0] % align, 0)
if hasattr(shape, '__len__'):
assert_equal(x.shape, shape, err_msg)
else:
assert_equal(x.shape, (shape,), err_msg)
assert_equal(x.dtype, dtype)
if order == "C":
assert_(x.flags.c_contiguous, err_msg)
elif order == "F":
if x.size > 0:
# Size-0 arrays get invalid flags on NumPy 1.5
assert_(x.flags.f_contiguous, err_msg)
elif order is None:
assert_(x.flags.c_contiguous, err_msg)
else:
raise ValueError()
# try various alignments
for align in [1, 2, 3, 4, 8, 16, 32, 64, None]:
for n in [0, 1, 3, 11]:
for order in ["C", "F", None]:
for dtype in [np.uint8, np.float64]:
for shape in [n, (1, 2, 3, n)]:
for j in range(niter):
check(shape, dtype, order, align)
def test_check_random_state():
# If seed is None, return the RandomState singleton used by np.random.
# If seed is an int, return a new RandomState instance seeded with seed.
# If seed is already a RandomState instance, return it.
# Otherwise raise ValueError.
rsi = check_random_state(1)
assert_equal(type(rsi), np.random.RandomState)
rsi = check_random_state(rsi)
assert_equal(type(rsi), np.random.RandomState)
rsi = check_random_state(None)
assert_equal(type(rsi), np.random.RandomState)
assert_raises(ValueError, check_random_state, 'a')
rg = np.random.Generator(np.random.PCG64())
rsi = check_random_state(rg)
assert_equal(type(rsi), np.random.Generator)
def test_getfullargspec_no_self():
p = MapWrapper(1)
argspec = getfullargspec_no_self(p.__init__)
assert_equal(argspec, FullArgSpec(['pool'], None, None, (1,), [],
None, {}))
argspec = getfullargspec_no_self(p.__call__)
assert_equal(argspec, FullArgSpec(['func', 'iterable'], None, None, None,
[], None, {}))
class _rv_generic:
def _rvs(self, a, b=2, c=3, *args, size=None, **kwargs):
return None
rv_obj = _rv_generic()
argspec = getfullargspec_no_self(rv_obj._rvs)
assert_equal(argspec, FullArgSpec(['a', 'b', 'c'], 'args', 'kwargs',
(2, 3), ['size'], {'size': None}, {}))
def test_mapwrapper_serial():
in_arg = np.arange(10.)
out_arg = np.sin(in_arg)
p = MapWrapper(1)
assert_(p._mapfunc is map)
assert_(p.pool is None)
assert_(p._own_pool is False)
out = list(p(np.sin, in_arg))
assert_equal(out, out_arg)
with assert_raises(RuntimeError):
p = MapWrapper(0)
def test_pool():
with Pool(2) as p:
p.map(math.sin, [1, 2, 3, 4])
def test_mapwrapper_parallel():
in_arg = np.arange(10.)
out_arg = np.sin(in_arg)
with MapWrapper(2) as p:
out = p(np.sin, in_arg)
assert_equal(list(out), out_arg)
assert_(p._own_pool is True)
assert_(isinstance(p.pool, PWL))
assert_(p._mapfunc is not None)
# the context manager should've closed the internal pool
# check that it has by asking it to calculate again.
with assert_raises(Exception) as excinfo:
p(np.sin, in_arg)
assert_(excinfo.type is ValueError)
# can also set a PoolWrapper up with a map-like callable instance
with Pool(2) as p:
q = MapWrapper(p.map)
assert_(q._own_pool is False)
q.close()
# closing the PoolWrapper shouldn't close the internal pool
# because it didn't create it
out = p.map(np.sin, in_arg)
assert_equal(list(out), out_arg)
@_workers_wrapper
def user_of_workers(x, b=1, workers=None):
assert workers is not None
assert isinstance(workers, MapWrapper)
return np.array(list(workers(np.sin, x * b)))
def test__workers_wrapper():
arr = np.linspace(0, np.pi)
req = np.sin(arr * 2.0)
with Pool(2) as p:
v = user_of_workers(arr, workers=p.map, b=2)
assert_equal(v, req)
v = user_of_workers(arr, workers=None, b=2)
assert_equal(v, req)
v = user_of_workers(arr, workers=2, b=2)
assert_equal(v, req)
# assess if decorator works with partial functions
part_f = functools.partial(user_of_workers, b=2)
assert_equal(part_f(arr), req)
with Pool(2) as p:
part_f = functools.partial(user_of_workers, b=2, workers=p.map)
assert_equal(part_f(arr), req)
def test_rng_integers():
rng = np.random.RandomState()
# test that numbers are inclusive of high point
arr = rng_integers(rng, low=2, high=5, size=100, endpoint=True)
assert np.max(arr) == 5
assert np.min(arr) == 2
assert arr.shape == (100, )
# test that numbers are inclusive of high point
arr = rng_integers(rng, low=5, size=100, endpoint=True)
assert np.max(arr) == 5
assert np.min(arr) == 0
assert arr.shape == (100, )
# test that numbers are exclusive of high point
arr = rng_integers(rng, low=2, high=5, size=100, endpoint=False)
assert np.max(arr) == 4
assert np.min(arr) == 2
assert arr.shape == (100, )
# test that numbers are exclusive of high point
arr = rng_integers(rng, low=5, size=100, endpoint=False)
assert np.max(arr) == 4
assert np.min(arr) == 0
assert arr.shape == (100, )
# now try with np.random.Generator
try:
rng = np.random.default_rng()
except AttributeError:
return
# test that numbers are inclusive of high point
arr = rng_integers(rng, low=2, high=5, size=100, endpoint=True)
assert np.max(arr) == 5
assert np.min(arr) == 2
assert arr.shape == (100, )
# test that numbers are inclusive of high point
arr = rng_integers(rng, low=5, size=100, endpoint=True)
assert np.max(arr) == 5
assert np.min(arr) == 0
assert arr.shape == (100, )
# test that numbers are exclusive of high point
arr = rng_integers(rng, low=2, high=5, size=100, endpoint=False)
assert np.max(arr) == 4
assert np.min(arr) == 2
assert arr.shape == (100, )
# test that numbers are exclusive of high point
arr = rng_integers(rng, low=5, size=100, endpoint=False)
assert np.max(arr) == 4
assert np.min(arr) == 0
assert arr.shape == (100, )
class TestValidateInt:
@pytest.mark.parametrize('n', [4, np.uint8(4), np.int16(4), np.array(4)])
def test_validate_int(self, n):
n = _validate_int(n, 'n')
assert n == 4
@pytest.mark.parametrize('n', [4.0, np.array([4]), Fraction(4, 1)])
def test_validate_int_bad(self, n):
with pytest.raises(TypeError, match='n must be an integer'):
_validate_int(n, 'n')
def test_validate_int_below_min(self):
with pytest.raises(ValueError, match='n must be an integer not '
'less than 0'):
_validate_int(-1, 'n', 0)
class TestRenameParameter:
# check that wrapper `_rename_parameter` for backward-compatible
# keyword renaming works correctly
# Example method/function that still accepts keyword `old`
@_rename_parameter("old", "new")
def old_keyword_still_accepted(self, new):
return new
# Example method/function for which keyword `old` is deprecated
@_rename_parameter("old", "new", dep_version="1.9.0")
def old_keyword_deprecated(self, new):
return new
def test_old_keyword_still_accepted(self):
# positional argument and both keyword work identically
res1 = self.old_keyword_still_accepted(10)
res2 = self.old_keyword_still_accepted(new=10)
res3 = self.old_keyword_still_accepted(old=10)
assert res1 == res2 == res3 == 10
# unexpected keyword raises an error
message = re.escape("old_keyword_still_accepted() got an unexpected")
with pytest.raises(TypeError, match=message):
self.old_keyword_still_accepted(unexpected=10)
# multiple values for the same parameter raises an error
message = re.escape("old_keyword_still_accepted() got multiple")
with pytest.raises(TypeError, match=message):
self.old_keyword_still_accepted(10, new=10)
with pytest.raises(TypeError, match=message):
self.old_keyword_still_accepted(10, old=10)
with pytest.raises(TypeError, match=message):
self.old_keyword_still_accepted(new=10, old=10)
@pytest.fixture
def kwarg_lock(self):
from threading import Lock
return Lock()
def test_old_keyword_deprecated(self, kwarg_lock):
# positional argument and both keyword work identically,
# but use of old keyword results in DeprecationWarning
dep_msg = "Use of keyword argument `old` is deprecated"
res1 = self.old_keyword_deprecated(10)
res2 = self.old_keyword_deprecated(new=10)
# pytest warning filter is not thread-safe, enforce serialization
with kwarg_lock:
with pytest.warns(DeprecationWarning, match=dep_msg):
res3 = self.old_keyword_deprecated(old=10)
assert res1 == res2 == res3 == 10
# unexpected keyword raises an error
message = re.escape("old_keyword_deprecated() got an unexpected")
with pytest.raises(TypeError, match=message):
self.old_keyword_deprecated(unexpected=10)
# multiple values for the same parameter raises an error and,
# if old keyword is used, results in DeprecationWarning
message = re.escape("old_keyword_deprecated() got multiple")
with pytest.raises(TypeError, match=message):
self.old_keyword_deprecated(10, new=10)
with kwarg_lock:
with pytest.raises(TypeError, match=message), \
pytest.warns(DeprecationWarning, match=dep_msg):
# breakpoint()
self.old_keyword_deprecated(10, old=10)
with kwarg_lock:
with pytest.raises(TypeError, match=message), \
pytest.warns(DeprecationWarning, match=dep_msg):
self.old_keyword_deprecated(new=10, old=10)
class TestContainsNaN:
def test_policy(self):
data = np.array([1, 2, 3, np.nan])
assert _contains_nan(data) # default policy is "propagate"
assert _contains_nan(data, nan_policy="propagate")
assert _contains_nan(data, nan_policy="omit")
assert not _contains_nan(data[:3])
assert not _contains_nan(data[:3], nan_policy="propagate")
assert not _contains_nan(data[:3], nan_policy="omit")
with pytest.raises(ValueError, match="The input contains nan values"):
_contains_nan(data, nan_policy="raise")
assert not _contains_nan(data[:3], nan_policy="raise")
with pytest.raises(ValueError, match="nan_policy must be one of"):
_contains_nan(data, nan_policy="nan")
def test_contains_nan(self):
# Special case: empty array
assert not _contains_nan(np.array([], dtype=float))
# Integer arrays cannot contain NaN
assert not _contains_nan(np.array([1, 2, 3]))
assert not _contains_nan(np.array([[1, 2], [3, 4]]))
assert not _contains_nan(np.array([1., 2., 3.]))
assert not _contains_nan(np.array([1., 2.j, 3.]))
assert _contains_nan(np.array([1., 2.j, np.nan]))
assert _contains_nan(np.array([1., 2., np.nan]))
assert _contains_nan(np.array([np.nan, 2., np.nan]))
assert not _contains_nan(np.array([[1., 2.], [3., 4.]]))
assert _contains_nan(np.array([[1., 2.], [3., np.nan]]))
@skip_xp_invalid_arg
def test_contains_nan_with_strings(self):
data1 = np.array([1, 2, "3", np.nan]) # converted to string "nan"
assert not _contains_nan(data1)
data2 = np.array([1, 2, "3", np.nan], dtype='object')
assert _contains_nan(data2)
data3 = np.array([["1", 2], [3, np.nan]]) # converted to string "nan"
assert not _contains_nan(data3)
data4 = np.array([["1", 2], [3, np.nan]], dtype='object')
assert _contains_nan(data4)
@pytest.mark.skip_xp_backends(eager_only=True,
reason="lazy backends tested separately")
@pytest.mark.parametrize("nan_policy", ['propagate', 'omit', 'raise'])
def test_array_api(self, xp, nan_policy):
rng = np.random.default_rng(932347235892482)
x0 = rng.random(size=(2, 3, 4))
x = xp.asarray(x0)
assert not _contains_nan(x, nan_policy)
x = xpx.at(x)[1, 2, 1].set(xp.nan)
if nan_policy == 'raise':
with pytest.raises(ValueError, match="The input contains nan values"):
_contains_nan(x, nan_policy)
elif nan_policy == 'omit':
assert _contains_nan(x, nan_policy, xp_omit_okay=True)
elif nan_policy == 'propagate':
assert _contains_nan(x, nan_policy)
@pytest.mark.skip_xp_backends("numpy", reason="lazy backends only")
@pytest.mark.skip_xp_backends("cupy", reason="lazy backends only")
@pytest.mark.skip_xp_backends("array_api_strict", reason="lazy backends only")
@pytest.mark.skip_xp_backends("torch", reason="lazy backends only")
def test_array_api_lazy(self, xp):
rng = np.random.default_rng(932347235892482)
x0 = rng.random(size=(2, 3, 4))
x = xp.asarray(x0)
xp_assert_equal(_contains_nan(x), xp.asarray(False))
xp_assert_equal(_contains_nan(x, "propagate"), xp.asarray(False))
xp_assert_equal(_contains_nan(x, "omit", xp_omit_okay=True), xp.asarray(False))
# Lazy arrays don't support "omit" and "raise" policies
match = "not supported for lazy arrays"
with pytest.raises(TypeError, match=match):
_contains_nan(x, "omit")
with pytest.raises(TypeError, match=match):
_contains_nan(x, "raise")
x = xpx.at(x)[1, 2, 1].set(np.nan)
xp_assert_equal(_contains_nan(x), xp.asarray(True))
xp_assert_equal(_contains_nan(x, "propagate"), xp.asarray(True))
xp_assert_equal(_contains_nan(x, "omit", xp_omit_okay=True), xp.asarray(True))
with pytest.raises(TypeError, match=match):
_contains_nan(x, "omit")
with pytest.raises(TypeError, match=match):
_contains_nan(x, "raise")
def test__rng_html_rewrite():
def mock_str():
lines = [
'np.random.default_rng(8989843)',
'np.random.default_rng(seed)',
'np.random.default_rng(0x9a71b21474694f919882289dc1559ca)',
' bob ',
]
return lines
res = _rng_html_rewrite(mock_str)()
ref = [
'np.random.default_rng()',
'np.random.default_rng(seed)',
'np.random.default_rng()',
' bob ',
]
assert res == ref
class TestTransitionToRNG:
def kmeans(self, **kwargs):
rng = np.random.default_rng(3458934594269824562)
return cluster.vq.kmeans2(rng.random(size=(20, 3)), 3, **kwargs)
def kmeans2(self, **kwargs):
rng = np.random.default_rng(3458934594269824562)
return cluster.vq.kmeans2(rng.random(size=(20, 3)), 3, **kwargs)
def barycentric(self, **kwargs):
rng = np.random.default_rng(3458934594269824562)
x1, x2, y1 = rng.random((3, 10))
f = interpolate.BarycentricInterpolator(x1, y1, **kwargs)
return f(x2)
def clarkson_woodruff_transform(self, **kwargs):
rng = np.random.default_rng(3458934594269824562)
return linalg.clarkson_woodruff_transform(rng.random((10, 10)), 3, **kwargs)
def basinhopping(self, **kwargs):
rng = np.random.default_rng(3458934594269824562)
return optimize.basinhopping(optimize.rosen, rng.random(3), **kwargs).x
def opt(self, fun, **kwargs):
rng = np.random.default_rng(3458934594269824562)
bounds = optimize.Bounds(-rng.random(3) * 10, rng.random(3) * 10)
return fun(optimize.rosen, bounds, **kwargs).x
def differential_evolution(self, **kwargs):
return self.opt(optimize.differential_evolution, **kwargs)
def dual_annealing(self, **kwargs):
return self.opt(optimize.dual_annealing, **kwargs)
def check_grad(self, **kwargs):
rng = np.random.default_rng(3458934594269824562)
x = rng.random(3)
return optimize.check_grad(optimize.rosen, optimize.rosen_der, x,
direction='random', **kwargs)
def random_array(self, **kwargs):
return sparse.random_array((10, 10), density=1.0, **kwargs).toarray()
def random(self, **kwargs):
return sparse.random(10, 10, density=1.0, **kwargs).toarray()
def rand(self, **kwargs):
return sparse.rand(10, 10, density=1.0, **kwargs).toarray()
def svds(self, **kwargs):
rng = np.random.default_rng(3458934594269824562)
A = rng.random((10, 10))
return sparse.linalg.svds(A, **kwargs)
def random_rotation(self, **kwargs):
return spatial.transform.Rotation.random(3, **kwargs).as_matrix()
def goodness_of_fit(self, **kwargs):
rng = np.random.default_rng(3458934594269824562)
data = rng.random(100)
return stats.goodness_of_fit(stats.laplace, data, **kwargs).pvalue
def permutation_test(self, **kwargs):
rng = np.random.default_rng(3458934594269824562)
data = tuple(rng.random((2, 100)))
def statistic(x, y, axis): return np.mean(x, axis=axis) - np.mean(y, axis=axis)
return stats.permutation_test(data, statistic, **kwargs).pvalue
def bootstrap(self, **kwargs):
rng = np.random.default_rng(3458934594269824562)
data = (rng.random(100),)
return stats.bootstrap(data, np.mean, **kwargs).confidence_interval
def dunnett(self, **kwargs):
rng = np.random.default_rng(3458934594269824562)
x, y, control = rng.random((3, 100))
return stats.dunnett(x, y, control=control, **kwargs).pvalue
def sobol_indices(self, **kwargs):
def f_ishigami(x): return (np.sin(x[0]) + 7 * np.sin(x[1]) ** 2
+ 0.1 * (x[2] ** 4) * np.sin(x[0]))
dists = [stats.uniform(loc=-np.pi, scale=2 * np.pi),
stats.uniform(loc=-np.pi, scale=2 * np.pi),
stats.uniform(loc=-np.pi, scale=2 * np.pi)]
res = stats.sobol_indices(func=f_ishigami, n=1024, dists=dists, **kwargs)
return res.first_order
def qmc_engine(self, engine, **kwargs):
qrng = engine(d=1, **kwargs)
return qrng.random(4)
def halton(self, **kwargs):
return self.qmc_engine(stats.qmc.Halton, **kwargs)
def sobol(self, **kwargs):
return self.qmc_engine(stats.qmc.Sobol, **kwargs)
def latin_hypercube(self, **kwargs):
return self.qmc_engine(stats.qmc.LatinHypercube, **kwargs)
def poisson_disk(self, **kwargs):
return self.qmc_engine(stats.qmc.PoissonDisk, **kwargs)
def multivariate_normal_qmc(self, **kwargs):
X = stats.qmc.MultivariateNormalQMC([0], **kwargs)
return X.random(4)
def multinomial_qmc(self, **kwargs):
X = stats.qmc.MultinomialQMC([0.5, 0.5], 4, **kwargs)
return X.random(4)
def permutation_method(self, **kwargs):
rng = np.random.default_rng(3458934594269824562)
data = tuple(rng.random((2, 100)))
method = stats.PermutationMethod(**kwargs)
return stats.pearsonr(*data, method=method).pvalue
def bootstrap_method(self, **kwargs):
rng = np.random.default_rng(3458934594269824562)
data = tuple(rng.random((2, 100)))
res = stats.pearsonr(*data)
method = stats.BootstrapMethod(**kwargs)
return res.confidence_interval(method=method)
@pytest.mark.fail_slow(10)
@pytest.mark.slow
@pytest.mark.parametrize("method, arg_name", [
(kmeans, "seed"),
(kmeans2, "seed"),
(barycentric, "random_state"),
(clarkson_woodruff_transform, "seed"),
(basinhopping, "seed"),
(differential_evolution, "seed"),
(dual_annealing, "seed"),
(check_grad, "seed"),
(random_array, 'random_state'),
(random, 'random_state'),
(rand, 'random_state'),
(random_rotation, "random_state"),
(goodness_of_fit, "random_state"),
(permutation_test, "random_state"),
(bootstrap, "random_state"),
(permutation_method, "random_state"),
(bootstrap_method, "random_state"),
(dunnett, "random_state"),
(sobol_indices, "random_state"),
(halton, "seed"),
(sobol, "seed"),
(latin_hypercube, "seed"),
(poisson_disk, "seed"),
(multivariate_normal_qmc, "seed"),
(multinomial_qmc, "seed"),
])
def test_rng_deterministic(self, method, arg_name):
np.random.seed(None)
seed = 2949672964
rng = np.random.default_rng(seed)
message = "got multiple values for argument now known as `rng`"
with pytest.raises(TypeError, match=message):
method(self, **{'rng': rng, arg_name: seed})
rng = np.random.default_rng(seed)
res1 = method(self, rng=rng)
res2 = method(self, rng=seed)
assert_equal(res2, res1)
if method.__name__ in {"dunnett", "sobol_indices"}:
# the two kwargs have essentially the same behavior for these functions
res3 = method(self, **{arg_name: seed})
assert_equal(res3, res1)
return
rng = np.random.RandomState(seed)
res1 = method(self, **{arg_name: rng})
res2 = method(self, **{arg_name: seed})
if method.__name__ in {"halton", "sobol", "latin_hypercube", "poisson_disk",
"multivariate_normal_qmc", "multinomial_qmc"}:
# For these, passing `random_state=RandomState(seed)` is not the same as
# passing integer `seed`.
res1b = method(self, **{arg_name: np.random.RandomState(seed)})
assert_equal(res1b, res1)
res2b = method(self, **{arg_name: seed})
assert_equal(res2b, res2)
return
np.random.seed(seed)
res3 = method(self, **{arg_name: None})
assert_equal(res2, res1)
assert_equal(res3, res1)