"""Correlations; used for assumed background covariances.
Multiply by diagonal matrices of the assumed standard deviations on
the right and left to obtain covariance matrices.
"""
import abc
import functools
# Need to specify numpy versions in some instances.
import numpy as np
# Not in dask.array
from numpy.linalg import eigh, norm
# arange changes signature
from numpy import arange, newaxis, asanyarray
from numpy import fromfunction, asarray, hstack, flip
from numpy import exp, square, fmin, sqrt
from numpy import logical_or, concatenate, isnan
from numpy import where
from numpy import sum as array_sum
from scipy.special import gamma, kv as K_nu
from scipy.sparse.linalg.interface import LinearOperator
import pyfftw.interfaces.cache
from pyfftw import next_fast_len
from pyfftw.interfaces.numpy_fft import rfftn, irfftn
import six
from .linalg import SelfAdjointLinearOperator
from .linalg import SchmidtKroneckerProduct
from .linalg_interface import is_odd
NUM_THREADS = 8
"""Number of threads :mod:`pyfftw` should use for each transform."""
ADVANCE_PLANNER_EFFORT = "FFTW_MEASURE"
"""Amount of advance planning FFTW should do.
This is done when the :class:`HomogeneousIsotropicCorrelation`
instance is created. It is set to "FFTW_MEASURE" to keep test runs
fast, but applications should probably set this to do `a more
extensive exploration of the possibilities
<https://pyfftw.readthedocs.io/en/latest/source/tutorial.html#configuring-fftw-planning-effort-and-number-of-threads>`_.
"""
PLANNER_EFFORT = "FFTW_ESTIMATE"
"""How much effort to expend planning for subsequent inversions.
Used by :meth:`HomogeneousIsotropicCorrelation.matmat`. This is used
more frequently than :data:`ADVANCE_PLANNER_EFFORT` and should
probably stay set to "FFTW_ESTIMATE". Increasing this can increase
application runtime by a factor of two.
"""
pyfftw.interfaces.cache.enable()
ROUNDOFF = 1e-13
"""Approximate size of roundoff error for correlation matrices.
Eigenvalues less than this value will be reset to this.
Gaussian correlations with a correlation length between five and ten
cells need `ROUNDOFF` greater than 1e-15 to be numerically positive
definite.
Gaussian(15) needs 1e-13 > ROUNDOFF > 1e-14
Also used in SchmidtKroneckerProduct to determine how many terms in the
Schmidt decomposition should be used.
"""
NEAR_ZERO = 1e-20
"""Where correlations are rounded to zero.
The method of assuring positive definiteness increases some values
away from zero due to roundoff. Values that were originally smaller
than this are reset to zero.
"""
FOURIER_NEAR_ZERO = 1e-15
"""Where fourier coefficients are treated as zero.
1e-20 produces overflow with the dask tests
"""
DTYPE = np.float32
[docs]class HomogeneousIsotropicCorrelation(SelfAdjointLinearOperator):
"""Homogeneous isotropic correlations using FFTs.
This embeds the physical domain passed in a larger computational
domain, which allows for treatment of domains that should not be
considered periodic.
.. note::
Do not invoke this directly. Use
:func:`HomogeneousIsotropicCorrelation.from_function` or
:func:`HomogeneousIsotropicCorrelaiton.from_array` instead.
See Also
--------
:func:`scipy.linalg.solve_circulant`
I stole the idea from here.
"""
def __init__(self, shape, computational_shape=None):
"""Set up the instance.
.. note::
Do not invoke this directly. Use
:func:`HomogeneousIsotropicCorrelation.from_function` or
:func:`HomogeneousIsotropicCorrelaiton.from_array` instead.
Parameters
----------
shape: tuple of int
The state is formally input as a vector, but the correlations
depend on the layout in some other shape, usually related to the
physical layout. This is that shape.
computational_shape: tuple of int
The shape of the embedding computational domain. Defaults
to shape. May be larger to induce non-periodic correlations
"""
state_size = np.prod(shape)
ndims = len(shape)
super(HomogeneousIsotropicCorrelation, self).__init__(
dtype=DTYPE, shape=(state_size, state_size))
if computational_shape is None:
computational_shape = shape
is_cyclic = (shape == computational_shape)
self._is_cyclic = is_cyclic
self._underlying_shape = tuple(shape)
self._computational_shape = computational_shape
# Tell pylint these will be present
self._corr_fourier = None
self._fourier_near_zero = None
self._fft = functools.partial(
rfftn, axes=arange(0, ndims, dtype=int), s=computational_shape,
threads=NUM_THREADS, planner_effort=PLANNER_EFFORT)
if is_cyclic:
self._ifft = functools.partial(
irfftn, axes=arange(0, ndims, dtype=int), s=shape,
threads=NUM_THREADS, planner_effort=PLANNER_EFFORT)
else:
axes = arange(0, ndims, dtype=int)
base_slices = tuple(slice(None, dim) for dim in shape)
def _ifft(arry):
"""Find inverse FFT of arry.
Parameters
----------
spectrum: array_like
The spectrum of a real-valued signal
Returns
-------
array_like
The spectrum transformed back to physical space
"""
slicer = (
base_slices +
tuple(slice(None) for dim in arry.shape[ndims:])
)
big_result = irfftn(
arry, axes=axes, s=computational_shape,
threads=NUM_THREADS, planner_effort=PLANNER_EFFORT)
return big_result[slicer]
self._ifft = _ifft
[docs] @classmethod
def from_function(cls, corr_func, shape, is_cyclic=True):
"""Create an instance to apply the correlation function.
Parameters
----------
corr_func: callable(dist) -> float
The correlation of the first element of the domain with
each other element.
shape: tuple of int
The state is formally a vector, but the correlations are
assumed to depend on the layout in some other shape,
usually related to the physical layout. This is the other
shape.
is_cyclic: bool
Whether to assume the domain is periodic in all directions.
Returns
-------
HomogeneousIsotropicCorrelation
"""
shape = np.atleast_1d(shape)
if is_cyclic:
computational_shape = tuple(shape)
else:
computational_shape = tuple(next_fast_len(2 * dim - 1)
for dim in shape)
self = cls(tuple(shape), computational_shape)
shape = np.asarray(self._computational_shape)
ndims = len(shape)
broadcastable_shape = shape[:, newaxis]
while broadcastable_shape.ndim < ndims + 1:
broadcastable_shape = broadcastable_shape[..., newaxis]
def corr_from_index(*index):
"""Correlation of index with zero.
Turns a correlation function in terms of index distance
into one in terms of indices on a periodic domain.
Parameters
----------
index: tuple of int
Returns
-------
float[-1, 1]
The correlation of the given index with the origin.
See Also
--------
DistanceCorrelationFunction.correlation_from_index
"""
comp2_1 = square(index)
# Components of distance to shifted origin
comp2_2 = square(broadcastable_shape - index)
# use the smaller components to get the distance to the
# closest of the shifted origins
comp2 = fmin(comp2_1, comp2_2)
return corr_func(sqrt(array_sum(comp2, axis=0)))
corr_struct = fromfunction(
corr_from_index, shape=tuple(shape),
dtype=DTYPE)
# I should be able to generate this sequence with a type-I DCT
# For some odd reason complex/complex is faster than complex/real
# This also ensures the format here is the same as in _matmat
corr_fourier = rfftn(
corr_struct, axes=arange(ndims, dtype=int),
threads=NUM_THREADS, planner_effort=ADVANCE_PLANNER_EFFORT)
self._corr_fourier = (corr_fourier)
# This is also affected by roundoff
abs_corr_fourier = abs(corr_fourier)
self._fourier_near_zero = (
abs_corr_fourier < FOURIER_NEAR_ZERO * abs_corr_fourier.max())
return self
[docs] @classmethod
def from_array(cls, corr_array, is_cyclic=True):
"""Create an instance with the given correlations.
Parameters
----------
corr_array: array_like
The correlation of the first element of the domain with
each other element.
is_cyclic: bool
Returns
-------
HomogeneousIsotropicCorrelation
"""
corr_array = asarray(corr_array)
shape = corr_array.shape
ndims = corr_array.ndim
if is_cyclic:
computational_shape = shape
self = cls(shape, computational_shape)
corr_fourier = self._fft(corr_array)
else:
computational_shape = tuple(
2 * (dim - 1) for dim in shape)
self = cls(shape, computational_shape)
for axis in reversed(range(ndims)):
corr_array = concatenate(
[corr_array, flip(corr_array[1:-1], axis)],
axis=axis)
# Advantages over dctn: guaranteed same format and gets
# nice planning done for the later evaluations.
corr_fourier = rfftn(
corr_array, axes=arange(0, ndims, dtype=int),
threads=NUM_THREADS, planner_effort=ADVANCE_PLANNER_EFFORT)
# The fft axes need to be a single chunk for the dask ffts
# It's in memory already anyway
# TODO: create a from_spectrum to delegate to
self._corr_fourier = (corr_fourier)
self._fourier_near_zero = (corr_fourier < FOURIER_NEAR_ZERO)
return self
[docs] def sqrt(self):
"""Compute an S such that S.T @ S == self.
Returns
-------
S: HomogeneousIsotropicCorrelation
"""
result = HomogeneousIsotropicCorrelation(self._underlying_shape,
self._computational_shape)
result._corr_fourier = sqrt(self._corr_fourier)
# I still don't much trust these.
result._fourier_near_zero = self._fourier_near_zero
return result
def _matmat(self, mat):
"""Evaluate the matrix product of self and `mat`.
Parameters
----------
mat: array_like[N, K]
Returns
-------
array_like[N, K]
"""
_shape = self._underlying_shape
fields = asarray(mat).reshape(_shape + (-1,))
spectral_fields = self._fft(fields)
spectral_fields *= self._corr_fourier[..., np.newaxis]
results = self._ifft(spectral_fields)
return results.reshape(mat.shape)
[docs] def inv(self):
"""Construct the matrix inverse of this operator.
Returns
-------
LinearOperator
Raises
------
ValueError
if the instance is acyclic.
"""
if not self._is_cyclic:
raise NotImplementedError(
"HomogeneousIsotropicCorrelation.inv "
"does not support acyclic correlations")
# TODO: Return a HomogeneousIsotropicLinearOperator
return LinearOperator(
shape=self.shape, dtype=self.dtype,
matvec=self.solve, rmatvec=self.solve)
[docs] def solve(self, vec):
"""Solve A @ x = vec.
Parameters
----------
vec: array_like[N]
Returns
-------
array_like[N]
Solution of `self @ x = vec`
Raises
------
NotImplementedError
if the instance is acyclic.
ValueError
If vec is the wrong shape
"""
if not self._is_cyclic:
raise NotImplementedError(
"HomogeneousIsotropicCorrelation.solve "
"does not support acyclic correlations")
if vec.shape[0] != self.shape[0]:
raise ValueError("Shape of vec not correct")
field = asarray(vec).reshape(self._underlying_shape)
spectral_field = self._fft(field)
spectral_field /= self._corr_fourier
# Dividing by a small number is numerically unstable. This is
# nearly an SVD solve already, so borrow that solution.
spectral_field[self._fourier_near_zero] = 0
result = self._ifft(spectral_field)
return result.reshape(self.shape[-1])
[docs] def kron(self, other):
"""Construct the Kronecker product of this operator and other.
Parameters
----------
other: HomogeneousIsotropicCorrelation
The other operator for the Kronecker product.
This implementation will accept other objects,
passing them along to :class:`.linalg.SchmidtKroneckerProduct`.
Returns
-------
scipy.sparse.linalg.LinearOperator
"""
if ((not isinstance(other, HomogeneousIsotropicCorrelation) or
self._is_cyclic != other._is_cyclic)):
return SchmidtKroneckerProduct(self, other)
shape = self._underlying_shape + other._underlying_shape
shift = len(self._underlying_shape)
self_index = tuple(slice(None) if i < shift else np.newaxis
for i in range(len(shape)))
other_index = tuple(np.newaxis if i < shift else slice(None)
for i in range(len(shape)))
# rfft makes the first axis half the size
# When combining things like this, I need to re-double that size again
if is_odd(self._underlying_shape[-1]):
reverse_start = -1
else:
reverse_start = -2
expanded_fft = hstack(
(self._corr_fourier,
self._corr_fourier[..., reverse_start:0:-1].conj()))
expanded_near_zero = concatenate(
(self._fourier_near_zero,
self._fourier_near_zero[..., reverse_start:0:-1]), axis=-1)
newinst = HomogeneousIsotropicCorrelation(shape)
newinst._corr_fourier = (expanded_fft[self_index] *
other._corr_fourier[other_index])
newinst._fourier_near_zero = logical_or(
expanded_near_zero[self_index],
other._fourier_near_zero[other_index])
return newinst
[docs]def make_matrix(corr_func, shape):
"""Make a correlation matrix for a domain with shape `shape`.
Parameters
----------
corr_func: callable(float) -> float[-1, 1]
Function giving correlation between two indices a distance d
from each other.
shape: tuple of int
The underlying shape of the domain. It is viewed as a vector
here, but may be more naturally seen as an N-D array. This is
the shape of that array.
`N = prod(shape)`
See Also
--------
:func:`statsmodels.stats.correlation_tools.corr_clipped`
Does something similar, and refers to other functions that may
give more accurate results.
Returns
-------
corr: np.ndarray[N, N]
Positive definite dense array, entirely in memory
"""
shape = tuple(np.atleast_1d(shape))
n_points = np.prod(shape)
# Since dask doesn't have eigh, using dask in this section slows
# the test suite by about 25%. Since it all ends up in memory,
# may as well start with it there instead of converting back and
# forth a few times.
tmp_res = np.fromfunction(corr_func.correlation_from_index,
shape=2 * shape,
dtype=DTYPE).reshape(
(n_points, n_points))
where_small = tmp_res < NEAR_ZERO
where_small &= tmp_res > -NEAR_ZERO
# This isn't always positive definite. I reset the values on
# the negative side of roundoff to the positive side
vals, vecs = eigh(tmp_res)
del tmp_res
vals[vals < ROUNDOFF] = ROUNDOFF
result = np.dot(vecs, np.diag(vals).dot(vecs.T))
# Now, there's more roundoff
# make the values that were originally small zero
result[where_small] = 0
return asarray(result)
[docs]class DistanceCorrelationFunction(six.with_metaclass(abc.ABCMeta)):
"""A correlation function that depends only on distance."""
_distance_scaling = 1
def __init__(self, length):
"""Set up instance.
Parameters
----------
length: float
The correlation length in index space. Unitless.
"""
self._length = self._distance_scaling * float(length)
def __repr__(self):
"""Return a string representation of self."""
return "{name:s}({length:g})".format(
length=self._length / self._distance_scaling,
name=type(self).__name__)
def __str__(self):
"""Return a string version for printing."""
return "{name:s}({length:3.2e})".format(
length=self._length / self._distance_scaling,
name=type(self).__name__)
@abc.abstractmethod
def __call__(self, dist):
"""Get the correlation between points whose indices differ by dist.
Parameters
----------
dist: float
The distance at which the correlation is requested.
Returns
-------
correlation: float[-1, 1]
The correlation at points that distance apart
"""
pass # pragma: no cover
[docs] def correlation_from_index(self, *indices):
"""Find the correlation between the indices.
Should be independent of the length of the underlying shape,
but `indices` must still be even.
Parameters
----------
indices: tuple of int
Returns
-------
float
Find the correlation of the point contained in the first
half of the indices with that contained in the second.
"""
half = len(indices) // 2
point1 = asanyarray(indices[:half])
point2 = asanyarray(indices[half:])
dist = norm(point1 - point2, axis=0)
return self(dist)
[docs]class ExponentialCorrelation(DistanceCorrelationFunction):
"""A exponential correlation structure.
Notes
-----
Correlation given by exp(-dist/length)
where dist is the distance between the points.
"""
def __call__(self, dist):
"""Get the correlation between the points.
Parameters
----------
dist: float
Returns
-------
corr: float
"""
return exp(-dist / self._length)
[docs]class BalgovindCorrelation(DistanceCorrelationFunction):
"""A Balgovind 3D correlation structure.
Follows Balgovind et al. 1983 recommendations for a 3D field,
modified so the correlation length better matches that used by
other correlation functions.
Notes
-----
Correlation given by :math:`(1 + 2*dist/length) exp(-2*dist/length)`
This implementation has problems for length == 10.
I have no idea why. 3 and 30 are fine.
References
----------
Balgovind, R and Dalcher, A. and Ghil, M. and Kalnay, E. (1983).
A Stochastic-Dynamic Model for the Spatial Structure of Forecast
Error Statistics *Monthly Weather Review* 111(4) 701--722.
:doi:`10.1175/1520-0493(1983)111<0701:asdmft>2.0.co;2`
"""
_distance_scaling = 0.5
def __call__(self, dist):
"""Get the correlation between the points.
Parameters
----------
dist: float
The distance between the points
Returns
-------
float
The correlation between points the given distance apart.
"""
scaled_dist = dist / self._length
return (1 + scaled_dist) * exp(-scaled_dist)
[docs]class MaternCorrelation(DistanceCorrelationFunction):
r"""A Matern correlation structure.
Follows Matern (1986) *Spatial Variation*. The specific
definition seems to be similar to the parameterization in Stern's
*Interpolation of Spatial Data*
Notes
-----
Correlation given by
:math:`[2^{\kappa-1}\Gamma(\kappa)]^{-1} (d/L)^{\kappa} K_{\kappa}(d/L)`
where :math:`\kappa` is a smoothness parameter and
:math:`K_{\kappa}` is a modified Bessel function of the third kind.
References
----------
Stein, Michael L. *Interpolation of Spatial Data: Some Theory for
Kridging* Springer-Verlag New York. Part of Springer Series in
Statistics (issn:0172-7397) isbn:978-1-4612-7166-6.
:doi:`10.1007/978-1-4612-1494-6`
"""
_distance_scaling = 1.25
def __init__(self, length, kappa=1):
r"""Set up instance.
Parameters
----------
length: float
The correlation length in index space. Unitless.
kappa: float
The smoothness parameter
:math:`kappa=\infty` is equivalent to Gaussian correlations
:math:`kappa=\frac{1}{2}` is equivalent to exponential
:math:`kappa=1` is Balgovind's recommendation for 2D fields
:math:`kappa=\frac{3}{2}` matches Balgovind's advice for 3D fields
Default value is only for full equivalence with other classes.
The default value is entirely arbitrary and may change without
notice.
"""
self._kappa = kappa
# Make sure correlation at zero is one
self._scale_const = .5 * gamma(kappa)
self._distance_scaling = self._distance_scaling * self._scale_const
super(MaternCorrelation, self).__init__(length)
def __call__(self, dist):
"""Get the correlation between the points.
Parameters
----------
dist: float
Returns
-------
corr: float
"""
kappa = self._kappa
scaled_dist = dist / self._length
result = ((.5 * scaled_dist) ** kappa *
K_nu(kappa, scaled_dist) / self._scale_const)
# K_nu returns nan at zero
return where(isnan(result), 1, result)
[docs]class GaussianCorrelation(DistanceCorrelationFunction):
"""A gaussian correlation structure.
Notes
-----
Correlation given by exp(-dist**2 / (length**2)) where dist is the
distance between the points.
"""
def __call__(self, dist):
"""Get the correlation between the points.
Parameters
----------
dist: float
Returns
-------
corr: float
"""
scaled_dist2 = square(dist / self._length)
return exp(-scaled_dist2)