"""Functions implementing 3D-Var.
Signatures follow the functions in
:mod:`atmos_flux_inversion.optimal_interpolation`
Note
----
Forces in-memory computations. BFGS method requires this, and there
are odd shape-mismatch errors if I just change to dask arrays.
Conjugate gradient solvers may work better for dask arrays if we drop
the covariance matrix from the return values.
"""
import scipy.optimize
import scipy.linalg
# I believe scipy's minimizer requires things that give boolean true
# or false from the objective, rather than a yet-to-be-realized dask
# array.
from numpy import asarray
from numpy import zeros_like
from atmos_flux_inversion import ConvergenceError, MAX_ITERATIONS, GRAD_TOL
from atmos_flux_inversion.util import solve, method_common
[docs]@method_common
def simple(background, background_covariance,
observations, observation_covariance,
observation_operator,
reduced_background_covariance=None,
reduced_observation_operator=None):
"""Feed everything to scipy's minimizer.
Assumes everything follows a multivariate normal distribution
with the specified covariance matrices. Under this assumption
`analysis_covariance` is exact, and `analysis` is the Maximum
Likelihood Estimator and the Best Linear Unbiased Estimator
for the underlying state in the frequentist framework, and
specify the posterior distribution for the state in the
Bayesian framework. If these are not satisfied, these still
form the Generalized Least Squares estimates for the state and
an estimated uncertainty.
Parameters
----------
background: array_like[N]
The background state estimate.
background_covariance: array_like[N, N]
Covariance of background state estimate across
realizations/ensemble members. "Ensemble" is here
interpreted in the sense used in statistical mechanics or
frequentist statistics, and may not be derived from a
sample as in meteorological ensemble Kalman filters
observations: array_like[M]
The observations constraining the background estimate.
observation_covariance: array_like[M, M]
Covariance of observations across realizations/ensemble
members. "Ensemble" again has the statistical meaning.
observation_operator: array_like[M, N]
The relationship between the state and the observations.
reduced_background_covariance: array_like[Nred, Nred], optional
The covariance for a smaller state space, usually obtained by
reducing resolution in space and time. Note that
`reduced_observation_operator` must also be provided
reduced_observation_operator: array_like[M, Nred], optional
The relationship between the reduced state space and the
observations. Note that `reduced_background_covariance`
must also be provided.
Returns
-------
analysis: array_like[N]
Analysis state estimate
analysis_covariance: array_like[Nred, Nred] or array_like[N, N]
Estimated uncertainty of analysis across
realizations/ensemble members. Calculated using
reduced_background_covariance and
reduced_observation_operator if possible
Raises
------
ConvergenceError
If iterative solver does not converge
Notes
-----
minimizes
.. math::
(x - x_0)^T P_B^{-1} (x - x_0) + (y - h(x))^T R^{-1} (y - h(x))
which has gradient
.. math::
P_B^{-1} (x - x_0) + H^T R^{-1} (y - h(x))
"""
def cost_function(test_state):
"""Mismatch between state, prior, and obs.
Parameters
----------
test_state: np.ndarray[N]
Returns
-------
float
A measure of the mismatch between the test state and the
background and observations
"""
prior_mismatch = asarray(test_state - background)
test_obs = observation_operator.dot(test_state)
obs_mismatch = asarray(test_obs - observations)
prior_fit = prior_mismatch.dot(solve(
background_covariance, prior_mismatch))
obs_fit = obs_mismatch.dot(solve(
observation_covariance, obs_mismatch))
return prior_fit + obs_fit
def cost_jacobian(test_state):
"""Gradiant of cost_function at `test_state`.
Parameters
----------
test_state: np.ndarray[N]
Returns
-------
jac: np.ndarray[N]
"""
prior_mismatch = test_state - background
test_obs = observation_operator.dot(test_state)
obs_mismatch = test_obs - observations
prior_gradient = solve(background_covariance,
prior_mismatch)
obs_gradient = observation_operator.T.dot(
solve(observation_covariance,
obs_mismatch))
return prior_gradient + obs_gradient
# def cost_hessian_product(test_state, test_step):
# """Hessian of cost_function at `test_state` times `test_step`.
# Parameters
# ----------
# test_state: np.ndarray[N]
# test_step: np.ndarray[N]
# Results
# -------
# hess_prod: np.ndarray[N]
# """
# bg_prod = solve(background_covariance,
# test_step)
# obs_prod = observation_operator.T.dot(
# solve(observation_covariance,
# observation_operator.dot(test_step)))
# return bg_prod + obs_prod
if reduced_background_covariance is None:
method = "BFGS"
else:
method = "Newton-CG"
result = scipy.optimize.minimize(
cost_function, background,
method=method,
jac=cost_jacobian,
# hessp=cost_hessian_product,
options=dict(maxiter=MAX_ITERATIONS,
gtol=GRAD_TOL),
)
if not result.success:
raise ConvergenceError("Did not converge: {msg:s}".format(
msg=result.message), result)
if reduced_background_covariance is not None:
result.hess_inv = None
return result.x, result.hess_inv
[docs]@method_common
def incremental(background, background_covariance,
observations, observation_covariance,
observation_operator,
reduced_background_covariance=None,
reduced_observation_operator=None):
"""Feed everything to scipy's minimizer.
Use the change from the background to try to avoid precision loss.
Assumes everything follows a multivariate normal distribution
with the specified covariance matrices. Under this assumption
`analysis_covariance` is exact, and `analysis` is the Maximum
Likelihood Estimator and the Best Linear Unbiased Estimator
for the underlying state in the frequentist framework, and
specify the posterior distribution for the state in the
Bayesian framework. If these are not satisfied, these still
form the Generalized Least Squares estimates for the state and
an estimated uncertainty.
Parameters
----------
background: array_like[N]
The background state estimate.
background_covariance: array_like[N, N]
Covariance of background state estimate across
realizations/ensemble members. "Ensemble" is here
interpreted in the sense used in statistical mechanics or
frequentist statistics, and may not be derived from a
sample as in meteorological ensemble Kalman filters
observations: array_like[M]
The observations constraining the background estimate.
observation_covariance: array_like[M, M]
Covariance of observations across realizations/ensemble
members. "Ensemble" again has the statistical meaning.
observation_operator: array_like[M, N]
The relationship between the state and the observations.
reduced_background_covariance: array_like[Nred, Nred], optional
The covariance for a smaller state space, usually obtained by
reducing resolution in space and time. Note that
`reduced_observation_operator` must also be provided
reduced_observation_operator: array_like[M, Nred], optional
The relationship between the reduced state space and the
observations. Note that `reduced_background_covariance`
must also be provided.
Returns
-------
analysis: array_like[N]
Analysis state estimate
analysis_covariance: array_like[Nred, Nred] or array_like[N, N]
Estimated uncertainty of analysis across
realizations/ensemble members. Calculated using
reduced_background_covariance and
reduced_observation_operator if possible
Raises
------
ConvergenceError
If iterative solver does not converge
Notes
-----
minimizes
.. math::
(dx)^T P_B^{-1} (dx) + (y - h(x_0) - H dx)^T R^{-1} (y - h(x_0) - H dx)
which has gradient
.. math::
P_B^{-1} (dx) - H^T R^{-1} (y - h(x) - H dx)
where :math:`x = x_0 + dx`
"""
innovations = observations - observation_operator.dot(background)
def cost_function(test_change):
"""Mismatch between state, prior, and obs.
Parameters
----------
test_state: np.ndarray[N]
Returns
-------
cost: float
"""
obs_change = observation_operator.dot(test_change)
obs_mismatch = innovations - obs_change
prior_fit = test_change.dot(asarray(solve(
background_covariance, test_change)))
obs_fit = obs_mismatch.dot(asarray(solve(
observation_covariance, obs_mismatch)))
return prior_fit + obs_fit
def cost_jacobian(test_change):
"""Gradiant of cost_function at `test_change`.
Parameters
----------
test_state: np.ndarray[N]
Returns
-------
jac: np.ndarray[N]
"""
obs_change = observation_operator.dot(test_change)
obs_mismatch = innovations - obs_change
prior_gradient = solve(background_covariance,
test_change)
obs_gradient = observation_operator.T.dot(
solve(observation_covariance,
obs_mismatch))
return prior_gradient - obs_gradient
# def cost_hessian_product(test_state, test_step):
# """Hessian of cost_function at `test_state` times `test_step`.
# Parameters
# ----------
# test_state: np.ndarray[N]
# test_step: np.ndarray[N]
# Results
# -------
# hess_prod: np.ndarray[N]
# """
# bg_prod = solve(background_covariance,
# test_step)
# obs_prod = observation_operator.T.dot(
# solve(observation_covariance,
# observation_operator.dot(test_step)))
# return bg_prod + obs_prod
if reduced_background_covariance is None:
method = "BFGS"
else:
method = "Newton-CG"
result = scipy.optimize.minimize(
cost_function, asarray(zeros_like(background)),
method=method,
jac=cost_jacobian,
# hessp=cost_hessian_product,
options=dict(maxiter=MAX_ITERATIONS,
gtol=GRAD_TOL),
)
analysis = background + result.x
if not result.success:
raise ConvergenceError("Did not converge: {msg:s}".format(
msg=result.message), result, analysis)
if reduced_background_covariance is not None:
result.hess_inv = None
return analysis, result.hess_inv
[docs]@method_common
def incr_chol(background, background_covariance,
observations, observation_covariance,
observation_operator,
reduced_background_covariance=None,
reduced_observation_operator=None):
"""Feed everything to scipy's minimizer.
Use the change from the background to try to avoid precision loss.
Also use Cholesky factorization of the covariances to speed
solution of matrix equations.
Assumes everything follows a multivariate normal distribution
with the specified covariance matrices. Under this assumption
`analysis_covariance` is exact, and `analysis` is the Maximum
Likelihood Estimator and the Best Linear Unbiased Estimator
for the underlying state in the frequentist framework, and
specify the posterior distribution for the state in the
Bayesian framework. If these are not satisfied, these still
form the Generalized Least Squares estimates for the state and
an estimated uncertainty.
Parameters
----------
background: array_like[N]
The background state estimate.
background_covariance: array_like[N, N]
Covariance of background state estimate across
realizations/ensemble members. "Ensemble" is here
interpreted in the sense used in statistical mechanics or
frequentist statistics, and may not be derived from a
sample as in meteorological ensemble Kalman filters
observations: array_like[M]
The observations constraining the background estimate.
observation_covariance: array_like[M, M]
Covariance of observations across realizations/ensemble
members. "Ensemble" again has the statistical meaning.
observation_operator: array_like[M, N]
The relationship between the state and the observations.
reduced_background_covariance: array_like[Nred, Nred], optional
The covariance for a smaller state space, usually obtained by
reducing resolution in space and time. Note that
`reduced_observation_operator` must also be provided
reduced_observation_operator: array_like[M, Nred], optional
The relationship between the reduced state space and the
observations. Note that `reduced_background_covariance`
must also be provided.
Returns
-------
analysis: array_like[N]
Analysis state estimate
analysis_covariance: array_like[Nred, Nred] or array_like[N, N]
Estimated uncertainty of analysis across
realizations/ensemble members. Calculated using
reduced_background_covariance and
reduced_observation_operator if possible
Raises
------
ConvergenceError
If iterative solver does not converge
Notes
-----
minimizes
.. math::
(dx)^T P_B^{-1} (dx) + (y - h(x_0) - H dx)^T R^{-1} (y - h(x_0) - H dx)
which has gradient
.. math::
P_B^{-1} (dx) - H^T R^{-1} (y - h(x) - H dx)
where :math:`x = x_0 + dx`
"""
innovations = observations - observation_operator.dot(background)
from scipy.linalg import cho_factor, cho_solve
# factor the covariances to make the matrix inversions faster
bg_cov_chol_u = cho_factor(background_covariance)
obs_cov_chol_u = cho_factor(observation_covariance)
def cost_function(test_change):
"""Mismatch between state, prior, and obs.
Parameters
----------
test_state: np.ndarray[N]
Returns
-------
cost: float
"""
obs_change = observation_operator.dot(test_change)
obs_mismatch = innovations - obs_change
prior_fit = test_change.dot(cho_solve(
bg_cov_chol_u, test_change))
obs_fit = obs_mismatch.dot(cho_solve(
obs_cov_chol_u, obs_mismatch))
return prior_fit + obs_fit
def cost_jacobian(test_change):
"""Gradiant of cost_function at `test_change`.
Parameters
----------
test_state: np.ndarray[N]
Returns
-------
jac: np.ndarray[N]
"""
obs_change = observation_operator.dot(test_change)
obs_mismatch = innovations - obs_change
prior_gradient = cho_solve(bg_cov_chol_u,
test_change)
obs_gradient = observation_operator.T.dot(
cho_solve(obs_cov_chol_u,
obs_mismatch))
return prior_gradient - obs_gradient
# def cost_hessian_product(test_state, test_step):
# """Hessian of cost_function at `test_state` times `test_step`.
# Parameters
# ----------
# test_state: np.ndarray[N]
# test_step: np.ndarray[N]
# Results
# -------
# hess_prod: np.ndarray[N]
# """
# bg_prod = solve(background_covariance,
# test_step)
# obs_prod = observation_operator.T.dot(
# solve(observation_covariance,
# observation_operator.dot(test_step)))
# return bg_prod + obs_prod
if reduced_background_covariance is None:
method = "BFGS"
else:
method = "Newton-CG"
result = scipy.optimize.minimize(
cost_function, asarray(zeros_like(background)),
method=method,
jac=cost_jacobian,
# hessp=cost_hessian_product,
options=dict(maxiter=MAX_ITERATIONS,
gtol=GRAD_TOL),
)
analysis = background + result.x
if not result.success:
raise ConvergenceError("Did not converge: {msg:s}".format(
msg=result.message), result, analysis)
if reduced_background_covariance is not None:
result.hess_inv = None
return analysis, result.hess_inv