r"""
:class:`.Aggregator` solvers can greatly benefit from calculated Jacobians. The following
are some implementations for the ``DAE`` (Differential-Algebraic Equations) form and
the ``ALG`` (Algebraic) form.
The main idea behind these implementations is that the graphical structure of an
:class:`.Aggregator` object can be utilized to deduce the Jacobian sparsity, at least
across :class:`.Calculation` boundaries.
"""
from typing import Callable, Literal, Protocol, Sequence, TypeVar
import numpy as np
from stream import Aggregator, Calculation
from stream.units import Array1D, Array2D, Value
__all__ = ["DAE_jacobian", "ALG_jacobian"]
StepStrategyWithydot = Callable[[Array1D, Array1D], Array1D]
StepStrategy = Callable[[Array1D], Array1D]
T = TypeVar("T", bound=Value)
def _default_step_strategy(y: T, *_) -> T:
return 1e-12 + 1e-6 * np.abs(y)
def _associated_calculations(agr: Aggregator) -> dict[int, Sequence[Calculation]]:
indices = np.arange(len(agr))
a = {i: [calculation] for calculation, section in agr.sections.items() for i in indices[section]}
for v, d in agr.external.items():
for name, ud in d.items():
for u, place in ud.items():
for i in np.atleast_1d(indices[place]):
a[i].append(v)
return a
def _inner(fy, h, t, yh, associated_calculations, *, agr, jac, cj=None):
for j, hj in enumerate(h):
yh[j] += hj
fyh = fy.copy()
for c in associated_calculations[j]:
fyh[agr.sections[c]] = agr._op("calculate", yh, t, c)
jac[:, j] = (fyh - fy) / hj
if cj is not None:
jac[j, j] -= cj * agr.mass[j]
yh[j] -= hj
return jac if cj is None else 0
class JacFuncDAE(Protocol):
"""Protocol for the signature one should expect from the returned function."""
def __call__(self, t, y: np.ndarray, ydot: Value, Gy: Value, cj: Value, J: Array2D) -> Literal[0]:
"""A function that edits the Jacobian J as its output using the current solver values.
This signature is required by the DAE solver and not of our choice.
"""
...
[docs]
def DAE_jacobian(agr: Aggregator, step_strategy: StepStrategyWithydot = _default_step_strategy) -> JacFuncDAE:
r"""
A function for creating a one-sided approximation of the Jacobian of
an :class:`Aggregator` functional. The Jacobian is defined as follows:
.. math:: J_{ij} = \frac{dF_i}{dy_j}
\approx \frac{F_i(y + h_j) - F_i(y)}{h_j}
Where :math:`h_j` is the ``jth`` step size and :math:`y + h_j` simply means
the ``jth`` component is :math:`y_j + h_j`. This is however a bit
simplistic, since the actual required Jacobian is of the full functional:
.. math:: G(t, y, \dot{y}) = F(y, t) - M\dot{y}
Meaning the Jacobian is actually
.. math:: \frac{dG}{dy}=\frac{\partial G}{\partial\dot{y}}
\frac{\partial\dot{y}}{\partial y} + \frac{\partial G}{\partial y}
Since :math:`\dot{y}` is approximated linearly,
:math:`\partial_y\dot{y}\equiv\sigma` does not depend
on the functions considered above, only on the approximation scheme and
time step.
In our case, :math:`\partial_{\dot{y}_j} G_i=-M_{ij} = -m_i \delta_{i,j}`.
Parameters
----------
agr : Aggregator
The Aggregator whose functional is desired
step_strategy: Callable or None
A function for the step vector :math:`h = f(y, \dot{y})`
Returns
-------
J_func: Callable
A function whose signature is
:math:`J(t, y, \dot{y}, G(t, y, \dot{y}), \sigma, \text{result})`,
matching the required SUNDIALS format.
"""
associated_calculations = _associated_calculations(agr)
def _jac_func(t, y: np.ndarray, ydot: Value, Gy: Value, cj: Value, J: Array2D) -> Literal[0]:
h = step_strategy(y, ydot)
Fy = Gy + agr.mass * ydot
_inner(Fy, h, t, y.copy(), associated_calculations, agr=agr, jac=J, cj=cj)
return 0 # This function edits J which is how the data actually comes out.
return _jac_func
class JacFuncALG(Protocol):
"""Protocol for the jacobian calculation function used in the algebraic solver.
It computes the jacobian at a given time and a given state, with the aggregator baked in."""
def __call__(self, y, t=0) -> Array2D: ...
def ALG_jacobian(agr: Aggregator, step_strategy: StepStrategy = _default_step_strategy) -> JacFuncALG:
associated_calculations = _associated_calculations(agr)
jac = np.empty((len(agr), len(agr)))
def _jac_func(y: np.ndarray, t=0) -> Array2D:
h = step_strategy(y)
Fy = agr.compute(y, t)
return _inner(Fy, h, t, y.copy(), associated_calculations, agr=agr, jac=jac)
return _jac_func