"""Utility functions for compatibility.
Some functions mirror :mod:`numpy` functions but produce :mod:`dask`
output. Others map similar functionality across the various methods
to accomplish that end.
"""
from __future__ import absolute_import
import functools
import numpy as np
from numpy import atleast_1d, atleast_2d
from scipy.sparse.linalg import LinearOperator
from .linalg import DaskKroneckerProductOperator, kron, solve
from .linalg_interface import ProductLinearOperator, tolinearoperator
ARRAY_TYPES = (np.ndarray,)
"""Array types for determining Kronecker product type.
These are combined for a direct product.
"""
REAL_DTYPE_KINDS = "fiu"
"""The kinds used by dtypes to represent real numbers.
Includes subsets.
"""
MAX_EXPLICIT_ARRAY = 1 << 10
"""Maximum size for an array represented explicitly.
:func:`kronecker_product` will form products smaller than this as an
explicit matrix using :func:`.linalg.kron`. Arrays larger than this will use
:class:`.linalg.DaskKroneckerProductOperator`.
Currently completely arbitrary.
`2 ** 16` works fine in memory, `2**17` gives a :class:`MemoryError`.
Hopefully Dask knows not to try this.
"""
[docs]def kronecker_product(operator1, operator2):
"""Form the Kronecker product of the given operators.
Delegates to ``operator1.kron()`` if possible,
:func:`.linalg.kron` if both are :const:`ARRAY_TYPES`, or
:class:`~atmos_flux_inversion.linalg.SchmidtKroneckerProduct`
otherwise.
Parameters
----------
operator1, operator2: ~scipy.sparse.linalg.LinearOperator
The component operators of the Kronecker product.
Returns
-------
scipy.sparse.linalg.LinearOperator
The kronecker product of the given operators.
"""
if hasattr(operator1, "kron"):
return operator1.kron(operator2)
if isinstance(operator1, ARRAY_TYPES):
if ((isinstance(operator2, ARRAY_TYPES) and
operator1.size * operator2.size < MAX_EXPLICIT_ARRAY)):
return kron(operator1, operator2)
return DaskKroneckerProductOperator(operator1, operator2)
from atmos_flux_inversion.linalg import SchmidtKroneckerProduct
return SchmidtKroneckerProduct(operator1, operator2)
[docs]def method_common(inversion_method):
"""Wrap method to validate args.
Can also deal with posterior uncertainty for a reduced-resolution
domain, where the method opts not to provide that.
Parameters
----------
inversion_method: function
The inversion function to wrap.
Returns
-------
function
The wrapped function.
"""
@functools.wraps(inversion_method)
def wrapper(background, background_covariance,
observations, observation_covariance,
observation_operator,
reduced_background_covariance=None,
reduced_observation_operator=None):
"""Solve the inversion problem.
Assumes everything follows a multivariate normal distribution
with the specified covariance matrices. Under this assumption
`analysis_covariance` is exact, and `analysis` is the Maximum
Likelihood Estimator and the Best Linear Unbiased Estimator
for the underlying state in the frequentist framework, and
specify the posterior distribution for the state in the
Bayesian framework. If these are not satisfied, these still
form the Generalized Least Squares estimates for the state and
an estimated uncertainty.
Parameters
----------
background: array_like[N]
The background state estimate.
background_covariance: array_like[N, N]
Covariance of background state estimate across
realizations/ensemble members. "Ensemble" is here
interpreted in the sense used in statistical mechanics or
frequentist statistics, and may not be derived from a
sample as in meteorological ensemble Kalman filters
observations: array_like[M]
The observations constraining the background estimate.
observation_covariance: array_like[M, M]
Covariance of observations across realizations/ensemble
members. "Ensemble" again has the statistical meaning.
observation_operator: array_like[M, N]
The relationship between the state and the observations.
reduced_background_covariance: array_like[Nred, Nred], optional
The covariance for a smaller state space, usually obtained by
reducing resolution in space and time. Note that
`reduced_observation_operator` must also be provided
reduced_observation_operator: array_like[M, Nred], optional
The relationship between the reduced state space and the
observations. Note that `reduced_background_covariance`
must also be provided.
Returns
-------
analysis: array_like[N]
Analysis state estimate
analysis_covariance: array_like[Nred, Nred] or array_like[N, N]
Estimated uncertainty of analysis across
realizations/ensemble members. Calculated using
reduced_background_covariance and
reduced_observation_operator if possible
Raises
------
ValueError
If only one of `reduced_background_covariance` and
`reduced_observation_operator` is provided
"""
_LinearOperator = LinearOperator # noqa: C0103
background = atleast_1d(background)
if not isinstance(background_covariance, _LinearOperator):
background_covariance = atleast_2d(background_covariance)
observations = atleast_1d(observations)
if not isinstance(observation_covariance, _LinearOperator):
observation_covariance = atleast_2d(observation_covariance)
if not isinstance(observation_operator, _LinearOperator):
observation_operator = atleast_2d(observation_operator)
if reduced_background_covariance is not None:
if not isinstance(reduced_background_covariance, LinearOperator):
reduced_background_covariance = atleast_2d(
reduced_background_covariance)
if reduced_observation_operator is None:
raise ValueError("Need reduced versions of both B and H")
if not isinstance(reduced_observation_operator, LinearOperator):
reduced_observation_operator = atleast_2d(
reduced_observation_operator)
elif reduced_observation_operator is not None:
raise ValueError("Need reduced versions of both B and H")
analysis_estimate, analysis_covariance = (
inversion_method(background, background_covariance,
observations, observation_covariance,
observation_operator,
reduced_background_covariance,
reduced_observation_operator))
if analysis_covariance is None:
if isinstance(observation_operator, _LinearOperator):
B_HT = ProductLinearOperator(reduced_background_covariance,
reduced_observation_operator.T)
projected_reduced_background_covariance = (
ProductLinearOperator(
reduced_observation_operator,
reduced_background_covariance,
reduced_observation_operator.T))
else:
# H is an array
B_HT = reduced_background_covariance.dot(
reduced_observation_operator.T)
projected_reduced_background_covariance = (
reduced_observation_operator.dot(
B_HT))
if isinstance(observation_covariance,
_LinearOperator):
projected_reduced_background_covariance = tolinearoperator(
projected_reduced_background_covariance)
decrease = B_HT.dot(solve(
projected_reduced_background_covariance +
observation_covariance,
B_HT.T))
if isinstance(decrease, _LinearOperator):
reduced_background_covariance = tolinearoperator(
reduced_background_covariance)
elif isinstance(reduced_background_covariance, _LinearOperator):
decrease = tolinearoperator(decrease)
# (I - KH) B
analysis_covariance = (
# May need to be a LinearOperator to work properly
reduced_background_covariance -
decrease
)
return analysis_estimate, analysis_covariance
return wrapper