"""Functions implementing 3D-Var.
Signatures follow the functions in
 :mod:`atmos_flux_inversion.optimal_interpolation`
Note
----
Forces in-memory computations.  BFGS method requires this, and there
are odd shape-mismatch errors if I just change to dask arrays.
Conjugate gradient solvers may work better for dask arrays if we drop
the covariance matrix from the return values.
"""
import scipy.optimize
import scipy.linalg
# I believe scipy's minimizer requires things that give boolean true
# or false from the objective, rather than a yet-to-be-realized dask
# array.
from numpy import asarray
from numpy import zeros_like
from atmos_flux_inversion import ConvergenceError, MAX_ITERATIONS, GRAD_TOL
from atmos_flux_inversion.util import solve, method_common
[docs]@method_common
def simple(background, background_covariance,
           observations, observation_covariance,
           observation_operator,
           reduced_background_covariance=None,
           reduced_observation_operator=None):
    """Feed everything to scipy's minimizer.
    Assumes everything follows a multivariate normal distribution
    with the specified covariance matrices.  Under this assumption
    `analysis_covariance` is exact, and `analysis` is the Maximum
    Likelihood Estimator and the Best Linear Unbiased Estimator
    for the underlying state in the frequentist framework, and
    specify the posterior distribution for the state in the
    Bayesian framework.  If these are not satisfied, these still
    form the Generalized Least Squares estimates for the state and
    an estimated uncertainty.
    Parameters
    ----------
    background: array_like[N]
        The background state estimate.
    background_covariance:  array_like[N, N]
        Covariance of background state estimate across
        realizations/ensemble members.  "Ensemble" is here
        interpreted in the sense used in statistical mechanics or
        frequentist statistics, and may not be derived from a
        sample as in meteorological ensemble Kalman filters
    observations: array_like[M]
        The observations constraining the background estimate.
    observation_covariance: array_like[M, M]
        Covariance of observations across realizations/ensemble
        members.  "Ensemble" again has the statistical meaning.
    observation_operator: array_like[M, N]
        The relationship between the state and the observations.
    reduced_background_covariance: array_like[Nred, Nred], optional
        The covariance for a smaller state space, usually obtained by
        reducing resolution in space and time.  Note that
        `reduced_observation_operator` must also be provided
    reduced_observation_operator: array_like[M, Nred], optional
        The relationship between the reduced state space and the
        observations.  Note that `reduced_background_covariance`
        must also be provided.
    Returns
    -------
    analysis: array_like[N]
        Analysis state estimate
    analysis_covariance: array_like[Nred, Nred] or array_like[N, N]
        Estimated uncertainty of analysis across
        realizations/ensemble members.  Calculated using
        reduced_background_covariance and
        reduced_observation_operator if possible
    Raises
    ------
    ConvergenceError
        If iterative solver does not converge
    Notes
    -----
    minimizes
    .. math::
        (x - x_0)^T P_B^{-1} (x - x_0) + (y - h(x))^T R^{-1} (y - h(x))
    which has gradient
    .. math::
        P_B^{-1} (x - x_0) + H^T R^{-1} (y - h(x))
    """
    def cost_function(test_state):
        """Mismatch between state, prior, and obs.
        Parameters
        ----------
        test_state: np.ndarray[N]
        Returns
        -------
        float
            A measure of the mismatch between the test state and the
            background and observations
        """
        prior_mismatch = asarray(test_state - background)
        test_obs = observation_operator.dot(test_state)
        obs_mismatch = asarray(test_obs - observations)
        prior_fit = prior_mismatch.dot(solve(
            background_covariance, prior_mismatch))
        obs_fit = obs_mismatch.dot(solve(
            observation_covariance, obs_mismatch))
        return prior_fit + obs_fit
    def cost_jacobian(test_state):
        """Gradiant of cost_function at `test_state`.
        Parameters
        ----------
        test_state: np.ndarray[N]
        Returns
        -------
        jac: np.ndarray[N]
        """
        prior_mismatch = test_state - background
        test_obs = observation_operator.dot(test_state)
        obs_mismatch = test_obs - observations
        prior_gradient = solve(background_covariance,
                               prior_mismatch)
        obs_gradient = observation_operator.T.dot(
            solve(observation_covariance,
                  obs_mismatch))
        return prior_gradient + obs_gradient
    # def cost_hessian_product(test_state, test_step):
    #     """Hessian of cost_function at `test_state` times `test_step`.
    #     Parameters
    #     ----------
    #     test_state: np.ndarray[N]
    #     test_step: np.ndarray[N]
    #     Results
    #     -------
    #     hess_prod: np.ndarray[N]
    #     """
    #     bg_prod = solve(background_covariance,
    #                        test_step)
    #     obs_prod = observation_operator.T.dot(
    #         solve(observation_covariance,
    #                  observation_operator.dot(test_step)))
    #     return bg_prod + obs_prod
    if reduced_background_covariance is None:
        method = "BFGS"
    else:
        method = "Newton-CG"
    result = scipy.optimize.minimize(
        cost_function, background,
        method=method,
        jac=cost_jacobian,
        # hessp=cost_hessian_product,
        options=dict(maxiter=MAX_ITERATIONS,
                     gtol=GRAD_TOL),
    )
    if not result.success:
        raise ConvergenceError("Did not converge: {msg:s}".format(
            msg=result.message), result)
    if reduced_background_covariance is not None:
        result.hess_inv = None
    return result.x, result.hess_inv 
[docs]@method_common
def incremental(background, background_covariance,
                observations, observation_covariance,
                observation_operator,
                reduced_background_covariance=None,
                reduced_observation_operator=None):
    """Feed everything to scipy's minimizer.
    Use the change from the background to try to avoid precision loss.
    Assumes everything follows a multivariate normal distribution
    with the specified covariance matrices.  Under this assumption
    `analysis_covariance` is exact, and `analysis` is the Maximum
    Likelihood Estimator and the Best Linear Unbiased Estimator
    for the underlying state in the frequentist framework, and
    specify the posterior distribution for the state in the
    Bayesian framework.  If these are not satisfied, these still
    form the Generalized Least Squares estimates for the state and
    an estimated uncertainty.
    Parameters
    ----------
    background: array_like[N]
        The background state estimate.
    background_covariance:  array_like[N, N]
        Covariance of background state estimate across
        realizations/ensemble members.  "Ensemble" is here
        interpreted in the sense used in statistical mechanics or
        frequentist statistics, and may not be derived from a
        sample as in meteorological ensemble Kalman filters
    observations: array_like[M]
        The observations constraining the background estimate.
    observation_covariance: array_like[M, M]
        Covariance of observations across realizations/ensemble
        members.  "Ensemble" again has the statistical meaning.
    observation_operator: array_like[M, N]
        The relationship between the state and the observations.
    reduced_background_covariance: array_like[Nred, Nred], optional
        The covariance for a smaller state space, usually obtained by
        reducing resolution in space and time.  Note that
        `reduced_observation_operator` must also be provided
    reduced_observation_operator: array_like[M, Nred], optional
        The relationship between the reduced state space and the
        observations.  Note that `reduced_background_covariance`
        must also be provided.
    Returns
    -------
    analysis: array_like[N]
        Analysis state estimate
    analysis_covariance: array_like[Nred, Nred] or array_like[N, N]
        Estimated uncertainty of analysis across
        realizations/ensemble members.  Calculated using
        reduced_background_covariance and
        reduced_observation_operator if possible
    Raises
    ------
    ConvergenceError
        If iterative solver does not converge
    Notes
    -----
    minimizes
    .. math::
        (dx)^T P_B^{-1} (dx) + (y - h(x_0) - H dx)^T R^{-1} (y - h(x_0) - H dx)
    which has gradient
    .. math::
        P_B^{-1} (dx) - H^T R^{-1} (y - h(x) - H dx)
    where :math:`x = x_0 + dx`
    """
    innovations = observations - observation_operator.dot(background)
    def cost_function(test_change):
        """Mismatch between state, prior, and obs.
        Parameters
        ----------
        test_state: np.ndarray[N]
        Returns
        -------
        cost: float
        """
        obs_change = observation_operator.dot(test_change)
        obs_mismatch = innovations - obs_change
        prior_fit = test_change.dot(asarray(solve(
            background_covariance, test_change)))
        obs_fit = obs_mismatch.dot(asarray(solve(
            observation_covariance, obs_mismatch)))
        return prior_fit + obs_fit
    def cost_jacobian(test_change):
        """Gradiant of cost_function at `test_change`.
        Parameters
        ----------
        test_state: np.ndarray[N]
        Returns
        -------
        jac: np.ndarray[N]
        """
        obs_change = observation_operator.dot(test_change)
        obs_mismatch = innovations - obs_change
        prior_gradient = solve(background_covariance,
                               test_change)
        obs_gradient = observation_operator.T.dot(
            solve(observation_covariance,
                  obs_mismatch))
        return prior_gradient - obs_gradient
    # def cost_hessian_product(test_state, test_step):
    #     """Hessian of cost_function at `test_state` times `test_step`.
    #     Parameters
    #     ----------
    #     test_state: np.ndarray[N]
    #     test_step: np.ndarray[N]
    #     Results
    #     -------
    #     hess_prod: np.ndarray[N]
    #     """
    #     bg_prod = solve(background_covariance,
    #                        test_step)
    #     obs_prod = observation_operator.T.dot(
    #         solve(observation_covariance,
    #                  observation_operator.dot(test_step)))
    #     return bg_prod + obs_prod
    if reduced_background_covariance is None:
        method = "BFGS"
    else:
        method = "Newton-CG"
    result = scipy.optimize.minimize(
        cost_function, asarray(zeros_like(background)),
        method=method,
        jac=cost_jacobian,
        # hessp=cost_hessian_product,
        options=dict(maxiter=MAX_ITERATIONS,
                     gtol=GRAD_TOL),
    )
    analysis = background + result.x
    if not result.success:
        raise ConvergenceError("Did not converge: {msg:s}".format(
            msg=result.message), result, analysis)
    if reduced_background_covariance is not None:
        result.hess_inv = None
    return analysis, result.hess_inv 
[docs]@method_common
def incr_chol(background, background_covariance,
              observations, observation_covariance,
              observation_operator,
              reduced_background_covariance=None,
              reduced_observation_operator=None):
    """Feed everything to scipy's minimizer.
    Use the change from the background to try to avoid precision loss.
    Also use Cholesky factorization of the covariances to speed
    solution of matrix equations.
    Assumes everything follows a multivariate normal distribution
    with the specified covariance matrices.  Under this assumption
    `analysis_covariance` is exact, and `analysis` is the Maximum
    Likelihood Estimator and the Best Linear Unbiased Estimator
    for the underlying state in the frequentist framework, and
    specify the posterior distribution for the state in the
    Bayesian framework.  If these are not satisfied, these still
    form the Generalized Least Squares estimates for the state and
    an estimated uncertainty.
    Parameters
    ----------
    background: array_like[N]
        The background state estimate.
    background_covariance:  array_like[N, N]
        Covariance of background state estimate across
        realizations/ensemble members.  "Ensemble" is here
        interpreted in the sense used in statistical mechanics or
        frequentist statistics, and may not be derived from a
        sample as in meteorological ensemble Kalman filters
    observations: array_like[M]
        The observations constraining the background estimate.
    observation_covariance: array_like[M, M]
        Covariance of observations across realizations/ensemble
        members.  "Ensemble" again has the statistical meaning.
    observation_operator: array_like[M, N]
        The relationship between the state and the observations.
    reduced_background_covariance: array_like[Nred, Nred], optional
        The covariance for a smaller state space, usually obtained by
        reducing resolution in space and time.  Note that
        `reduced_observation_operator` must also be provided
    reduced_observation_operator: array_like[M, Nred], optional
        The relationship between the reduced state space and the
        observations.  Note that `reduced_background_covariance`
        must also be provided.
    Returns
    -------
    analysis: array_like[N]
        Analysis state estimate
    analysis_covariance: array_like[Nred, Nred] or array_like[N, N]
        Estimated uncertainty of analysis across
        realizations/ensemble members.  Calculated using
        reduced_background_covariance and
        reduced_observation_operator if possible
    Raises
    ------
    ConvergenceError
        If iterative solver does not converge
    Notes
    -----
    minimizes
    .. math::
        (dx)^T P_B^{-1} (dx) + (y - h(x_0) - H dx)^T R^{-1} (y - h(x_0) - H dx)
    which has gradient
    .. math::
        P_B^{-1} (dx) - H^T R^{-1} (y - h(x) - H dx)
    where :math:`x = x_0 + dx`
    """
    innovations = observations - observation_operator.dot(background)
    from scipy.linalg import cho_factor, cho_solve
    # factor the covariances to make the matrix inversions faster
    bg_cov_chol_u = cho_factor(background_covariance)
    obs_cov_chol_u = cho_factor(observation_covariance)
    def cost_function(test_change):
        """Mismatch between state, prior, and obs.
        Parameters
        ----------
        test_state: np.ndarray[N]
        Returns
        -------
        cost: float
        """
        obs_change = observation_operator.dot(test_change)
        obs_mismatch = innovations - obs_change
        prior_fit = test_change.dot(cho_solve(
            bg_cov_chol_u, test_change))
        obs_fit = obs_mismatch.dot(cho_solve(
            obs_cov_chol_u, obs_mismatch))
        return prior_fit + obs_fit
    def cost_jacobian(test_change):
        """Gradiant of cost_function at `test_change`.
        Parameters
        ----------
        test_state: np.ndarray[N]
        Returns
        -------
        jac: np.ndarray[N]
        """
        obs_change = observation_operator.dot(test_change)
        obs_mismatch = innovations - obs_change
        prior_gradient = cho_solve(bg_cov_chol_u,
                                   test_change)
        obs_gradient = observation_operator.T.dot(
            cho_solve(obs_cov_chol_u,
                      obs_mismatch))
        return prior_gradient - obs_gradient
    # def cost_hessian_product(test_state, test_step):
    #     """Hessian of cost_function at `test_state` times `test_step`.
    #     Parameters
    #     ----------
    #     test_state: np.ndarray[N]
    #     test_step: np.ndarray[N]
    #     Results
    #     -------
    #     hess_prod: np.ndarray[N]
    #     """
    #     bg_prod = solve(background_covariance,
    #                        test_step)
    #     obs_prod = observation_operator.T.dot(
    #         solve(observation_covariance,
    #                  observation_operator.dot(test_step)))
    #     return bg_prod + obs_prod
    if reduced_background_covariance is None:
        method = "BFGS"
    else:
        method = "Newton-CG"
    result = scipy.optimize.minimize(
        cost_function, asarray(zeros_like(background)),
        method=method,
        jac=cost_jacobian,
        # hessp=cost_hessian_product,
        options=dict(maxiter=MAX_ITERATIONS,
                     gtol=GRAD_TOL),
    )
    analysis = background + result.x
    if not result.success:
        raise ConvergenceError("Did not converge: {msg:s}".format(
            msg=result.message), result, analysis)
    if reduced_background_covariance is not None:
        result.hess_inv = None
    return analysis, result.hess_inv