Source code for stream.utilities

"""
Here lie some unspoken heroes...

As well as some helper functions regarding mixing, conversions, flux shapes
and more.

"""

import warnings
from contextlib import contextmanager
from functools import partial, reduce, singledispatch
from operator import add
from typing import Any, Callable, Iterable, ParamSpec, Protocol, Sequence, Type, TypeVar

import numpy as np
from cytoolz import valmap
from numba import njit

# noinspection PyProtectedMember
from numpy._core._multiarray_umath import normalize_axis_index

try:
    # noinspection PyProtectedMember
    from numpy.lib.function_base import _diff_dispatcher, array_function_dispatch
except ImportError:
    # noinspection PyProtectedMember
    from numpy.lib._function_base_impl import _diff_dispatcher, array_function_dispatch
from scipy.optimize import fsolve

from stream.units import Array, Array1D, Celsius, Fahrenheit, KgPerS, Place, Value

STREAM_DEBUG = 11


[docs] def harmonic_mean(*a: Value, axis=1): m = np.column_stack(a) return 1 / np.sum((1 / m), axis=axis)
[docs] def normalize(a: Sequence) -> Array1D: return np.asarray(a) / np.sum(a)
[docs] @njit def to_Fahrenheit(T: Celsius) -> Fahrenheit: return 1.8 * T + 32
def _shifted_sin(a: float, mid: float, x: Value): return np.sin(a * (x - mid)) def _integrated_cell_cosine_value(a: float, b: float, mid: float, x: Array1D) -> Array1D: return (b / a) * np.diff(_shifted_sin(a, mid, x))
[docs] def cosine_shape( x: Array1D, ppf: float = np.pi / 2, *, xmax: float = None, ) -> Array1D: r""" Creates a normalized cosine profile for cells whose boundaries are `x`, assuming its maximum is achieved at :math:`\ell/2 = (x[-1] + x[0])/2` unless stated otherwise. Parameters ---------- x: Array1D Linear array, 1D spatial ticks of cells boundaries ppf: float Power Peaking Factor. The default value is :math:`\pi/2`. xmax: float The point where the cosine shape is at its theoretical maximum. Defaults to the midpoint between x[0] and x[-1]. Returns ------- phi: Array1D a normalized array whose profile is cosine which is zero at the outer boundaries, and is normalized to unity. Notes ----- Here is the derivation. The flux profile is assumed to take the following form: .. math:: \phi = \cos\left(\frac{\pi (x - \ell/2)}{L}\right) Where :math:`L` is the extrapolated length (half of the period), which is set by the ``PPF``: .. math:: \text{PPF} \equiv \frac{\max{\phi}}{\bar{\phi}} That is .. math:: 1/\text{PPF} = \bar{\phi} = \frac{1}{\ell}\int^{\ell/2}_{-\ell/2}dx\cos\left(\frac{\pi (x-\ell/2)}{L}\right) = \frac{2L}{\ell\pi}\sin\left(\frac{\ell\pi}{2L}\right) Then, :math:`L` is found by solving for :math:`h = \ell\pi/2L \Rightarrow h/\text{PPF} = \sin(h)`. Finally, the cosine shape is integrated for each cell: .. math:: \int^{x_{i+1}}_{x_i} dx' \cos\left(\frac{\pi (x' - \ell/2)}{L}\right) = \frac{L}{\pi}\left[\sin\left(\frac{\pi (x_{i+1} - \ell/2)}{L}\right) - \sin\left(\frac{\pi (x_i - \ell/2)}{L}\right)\right] """ if not (1 <= ppf <= np.pi / 2): raise ValueError(f"PPF must be in [1, π/2], but was {ppf:.5f}") xmax = (x[-1] + x[0]) / 2 if xmax is None else xmax ll = x[-1] - x[0] # Safe because we know fsolve returns a float-able for this input deck. # noinspection PyTypeChecker h: float = fsolve(lambda _x: np.sinc(_x / np.pi) - 1 / ppf, x0=1e-3).item() a = 2 * h / ll b = ppf / ll return _integrated_cell_cosine_value(a, b, xmax, x)
[docs] def cosine_shape_by_zero_endpoints(xi: float, xe: float, x: Array1D) -> Array1D: r"""A cosine shape which is 0 at the extrapolation values, and integrates to 1 over the unextrapolated values. Parameters ---------- xi: Low extrapolation boundary. xe: High extrapolation boundary. x: Boundaries over which to bin the cosine shape Notes ----- The general cosine shape is :math:`b\cos\left(a(x-x_{max})\right)`. :math:`a` is determined from the extrapolation, where the cosine nullifies at :math:`-\frac{\pi}{2},\frac{\pi}{2}`. This means that at xi and xe we want :math:`a(xe-x_{max})=\frac{\pi}{2}` and :math:`a(xi-x_{max})=-\frac{\pi}{2}`. Since the max point of a cosine shape is exactly midway between those points, :math:`x_{max}=\frac{xi+xe}{2}`, which means that if we set :math:`ll=xe-xi` we can write :math:`a=\frac{\pi}{ll}`. :math:`b` is determined from the normalization condition. We desire that the integral inside the boundaries in the `x` vector be strictly 1. If we mark the external boundaries as :math:`x_0` and :math:`x_1`, we get: .. math:: 1 = b\int_{x_0}^{x_1}{\cos(a(x-x_{max}))dx} = \frac{b}{a}\left[\sin(a(x_1-x_{max}))-\sin(a(x_0-x_{max}))\right] and thus :math:`b=a\left[\sin(a(x_1-x_{max}))-\sin(a(x_0-x_{max}))\right]`. Now that we know what the analytical shape is, we can do the discretization. We integrate over each cell in the boundaries of `x`, which have the same shape as the equation above, but for a different integration range: .. math:: v_i = b \int_{x_i}^{x_{i+1}}{\cos(a(x-x_{max}))dx} = \frac{b}{a}\left[\sin(a(x_{i+1}-x_{max}))-\sin(a(x_i-x_{max}))\right] This guarantees that :math:`\sum_{i}{v_i}=1`. See Also -------- cosine_shape: A similar function based on knowing the PPF rather than the extrapolation points. Returns ------- Array1D A discretized cell integration of a cosine function which is 0 at xi and xe, and integrates to 1 over [xi_in, xe_in]. The vector is thus normalized to have a sum of 1. """ xi_in = x[0] xe_in = x[-1] ll = xe - xi mid = (xe + xi) / 2 a = np.pi / ll b = a / np.diff(_shifted_sin(a, mid, np.array([xi_in, xe_in]))) return _integrated_cell_cosine_value(a, b, mid, x)
[docs] def uppercase_numeric_only(s: str) -> str: """Filters a string to return only uppercase or numeric values. Parameters ---------- s: str The string to filter. Examples -------- >>> uppercase_numeric_only('ThisRunsUnderEvaluations') 'TRUE' >>> uppercase_numeric_only('MOthroOw3') 'MOO3' """ return "".join(filter(lambda c: str.isupper(c) or str.isnumeric(c), s))
[docs] def to_array(d: dict[Any, float], dtype=np.float64) -> Array1D: """Turn dictionary values into numpy array of type dtype. Parameters ---------- d: dict[Any, float] Dictionary to turn into a 1D array. dtype: NumPy dtype value. Returns ------- Array1D An array of just the values from the dictionary. """ return np.fromiter(d.values(), dtype=dtype, count=len(d))
def _yield_from(val_list): for vals in val_list: try: yield from vals except TypeError: yield vals def _length_if_possible(vals): try: return len(vals) except TypeError: return 1
[docs] def flatten_values(d: dict[Any, Value], dtype=np.float64) -> Value: """Take a dictionary of values or sequences of values and return a long array with all the values in order, flattened out. Parameters ---------- d: dict[Any, float | Sequence[float]] Dictionary to flatten down. dtype: NumPy dtype value. Returns ------- Value An array of just the values from the dictionary, unless there is only one element, in which case it is returned. """ count = sum(map(_length_if_possible, d.values())) if count == 1: # if there is only one, it's faster to return a float return next(iter(d.values())) return np.fromiter(_yield_from(d.values()), dtype=dtype, count=count)
[docs] @contextmanager def ignore_warnings(warn_type: Type[Warning]): warnings.filterwarnings("ignore", category=warn_type) yield try: # noinspection PyUnresolvedReferences warnings.filters.pop() except IndexError: pass
@array_function_dispatch(_diff_dispatcher) def pair_mean(a, n=1, axis=-1, prepend=np._NoValue, append=np._NoValue): """ Uses np.diff as a base method, but calculates the mean between two consecutive pairs. Inline comments in the function itself highlight the changes. Parameters ---------- a : array_like Input array n : int, optional The number of times values are averaged. If zero, the input is returned as-is. axis : int, optional The axis along which the mean is taken, default is the last axis. prepend, append : array_like, optional Values to prepend or append to `a` along axis prior to performing the mean. Scalar values are expanded to arrays with length 1 in the direction of axis and the shape of the input array in along all other axes. Otherwise the dimension and shape must match `a` except along axis. Returns ------- pair : ndarray The n-th differences. The shape of the output is the same as `a` except along `axis` where the dimension is smaller by `n`. The type of the output is the same as the type of the difference between any two elements of `a`. This is the same as the type of `a` in most cases. A notable exception is `datetime64`, which results in a `timedelta64` output array. See Also -------- numpy.diff """ if n == 0: return a if n < 0: raise ValueError("order must be non-negative but got " + repr(n)) a = np.asanyarray(a) nd = a.ndim if nd == 0: raise ValueError("diff requires input that is at least one dimensional") axis = normalize_axis_index(axis, nd) combined = [] if prepend is not np._NoValue: prepend = np.asanyarray(prepend) if prepend.ndim == 0: shape = list(a.shape) shape[axis] = 1 prepend = np.broadcast_to(prepend, tuple(shape)) combined.append(prepend) combined.append(a) if append is not np._NoValue: append = np.asanyarray(append) if append.ndim == 0: shape = list(a.shape) shape[axis] = 1 append = np.broadcast_to(append, tuple(shape)) combined.append(append) if len(combined) > 1: a = np.concatenate(combined, axis) slice1 = [slice(None)] * nd slice2 = [slice(None)] * nd slice1[axis] = slice(1, None) slice2[axis] = slice(None, -1) slice1 = tuple(slice1) slice2 = tuple(slice2) # np.add replaced np.subtract op = np.not_equal if a.dtype == np.bool_ else np.add for _ in range(n): a = op(a[slice1], a[slice2]) return a / 2 # Divided by 2.
[docs] def pair_mean_1d(a, prepend=None, append=None): assert a.ndim == 1 assert prepend is None or append is None sl1, sl2 = a[:-1], a[1:] mn = (sl1 + sl2) / 2 if prepend is None and append is None: return mn n = len(a) res = np.empty(n, dtype=a.dtype) if prepend is not None: res[1:] = mn res[0] = (prepend + a[0]) / 2 elif append is not None: res[:-1] = mn res[-1] = (append + a[-1]) / 2 else: raise ValueError("This should be very impossible to reach. If you are here, abandon all hope") return res
[docs] @njit def lin_interp(x1: Value, x2: Value, y1: Value, y2: Value, x: Value) -> Value: r""" Linearly interpolate between two points: (x1, y1) and (x2, y2). The assumption being that :math:`x_1 \neq x_2`. Let a line be drawn between the two points. Thus: :math:`y_1 = a x_1 + b, y_2 = a x_2 + b \Rightarrow a=(y_2-y_1)/(x_2-x_1)` and :math:`b = y_2 - a x_2 = y_1 - a x_1` Then: .. math:: y = \frac{(y_2 - y_1)}{(x_2 - x_1)}(x - x_2) + y_2 Parameters ---------- x: Value desired position for the interpolation x1, x2: Value known points, assuming :math:`x \in [x_1, x_2]` y1, y2: Value known function values for x1, x2 Returns ------- The interpolated value, y Examples -------- >>> lin_interp(x1=1, x2=3, y1=1, y2=3, x=2) 2.0 >>> lin_interp(x1=0, x2=4, y1=0, y2=2, x=3) 1.5 """ return y2 + ((y2 - y1) / (x2 - x1)) * (x - x2)
_P = ParamSpec("_P") _T = TypeVar("_T") class _FloatWorthy(Protocol): def __add__(self: _T, other: float) -> _T: ... def __radd__(self: _T, other: float) -> _T: ... def __mul__(self: _T, other: float) -> _T: ... def __rmul__(self: _T, other: float) -> _T: ... _G = TypeVar("_G", bound=_FloatWorthy)
[docs] def factor(f: Callable[_P, _G], by: float = 1.0, add: float = 0.0) -> Callable[_P, _G]: """A functor used to lineraly transform a given function's output by some values. Parameters ---------- f: Callable The decorated function by: float The multiplicative value add: float The additve value Returns ------- f': Callable A new function Examples -------- >>> factor(np.ones, 2)(3) array([2., 2., 2.]) """ def ff(*args, **kwargs): return by * f(*args, **kwargs) + add try: name = f.__name__ except AttributeError: # noinspection PyUnresolvedReferences name = f.func.__name__ ff.__name__ = f"{name} * {by:.2g} + {add:.2g}" ff.__doc__ = f.__doc__ return ff
[docs] def if_is(x: Iterable, if_none: Any = 1.0): return x if x is not None else if_none
MDOT_INTER_THRESHOLD = 1e-6
[docs] @njit def directed_Tin(Tin: Celsius | None, Tin_minus: Celsius | None, mdot: KgPerS) -> Celsius: r"""Computes the inlet temperature for a point component based on flow direction. Parameters ---------- Tin, Tin_minus : Celsius or None Positive (Negative) flow associated inlet temperature mdot : KgPerS Fluid mass flow rate Returns ------- Tin': Celsius Inlet temperature Notes ----- For the sake of removing stiffness, for absolute flow values under ``MDOT_INTER_THRESHOLD``, a linear interpolation between ``Tin`` and ``Tin_minus``. This value may be changed by overriding ``stream.utilities.MDOT_INTER_THRESHOLD``. Examples -------- >>> directed_Tin(1, 2, 3) 1.0 >>> directed_Tin(1, 2, -3) 2.0 >>> directed_Tin(None, 2, 3) 2 >>> directed_Tin(1, None, 3) 1 >>> directed_Tin(1, 2, 0) 1.5 >>> directed_Tin(np.array([1., 2.]), np.array([3., 4.]), 0.) array([2., 3.]) Raises ------ ValueError : If both ``Tin`` and ``Tin_minus`` are None. """ a = Tin is None b = Tin_minus is None if a and b: raise ValueError("Couldn't deduce temperature, both Tin and Tin_minus are None") elif b: return Tin elif a: return Tin_minus if np.abs(mdot) < MDOT_INTER_THRESHOLD: return lin_interp(-MDOT_INTER_THRESHOLD, MDOT_INTER_THRESHOLD, Tin_minus, Tin, mdot) return Tin if mdot >= 0 else Tin_minus
[docs] @njit def directed(a: np.ndarray, val) -> np.ndarray: """ Parameters ---------- a: np.ndarray array to be traversed val: determining value for traversal. If negative, the array is flipped Returns ------- a*: np.ndarray A view of 'a' which is either flipped or not """ return a if val >= 0 else a[::-1]
T = TypeVar("T")
[docs] def just(val: T) -> Callable[[...], T]: r""" Parameters ---------- val: Any A value which the returned function `just` returns Returns ------- f: Callable A function which returns ``val`` for any input. Examples -------- >>> a = just(5) >>> a(3, b="yes") 5 """ def _val(*_, **__) -> T: return val return _val
[docs] def identity(x: T) -> T: return x
[docs] def summed(it: Iterable[T], initial=None) -> T: """Sum items of an iterable by their ``__add__()`` function This differs in behavior from the builtin ``sum`` in that sum requires an initial value, which is 0 by default, whereas ``summed`` does not. Parameters ---------- it : Iterable[T] The items to be summed initial: Value to start the sum from. Returns ------- sum: T The summed item Examples -------- >>> summed(("Hello, ", "it's ", "nice ", "to ", "meet ", "you!")) "Hello, it's nice to meet you!" >>> summed(([1, 2], [3, 4])) [1, 2, 3, 4] """ if initial is not None: return reduce(add, it, initial) else: return reduce(add, it)
[docs] def concat(*arrays: Sequence, **kwargs) -> Array: """np.concatenate is cumbersome, this function makes it nicer to use >>> concat((False, ), (False, False)) array([False, False, False]) >>> concat((False,), (False, False), np.zeros(3, dtype=bool)) array([False, False, False, False, False, False]) """ return np.concatenate(arrays, **kwargs)
[docs] def strictly_monotonous(*arrays: Sequence) -> Array: r"""Concatenate and sort arrays for strictly monotonously rising time Examples -------- >>> strictly_monotonous([1, 2, 3], [1.5, 2, 2.5]) array([1. , 1.5, 2. , 2.5, 3. ]) """ return np.unique(np.sort(concat(*arrays)))
[docs] def mutually_exclusive(*arrays: Sequence) -> bool: """Checks if the arrays provided are mutually exclusive (don't contain the same elements).""" all_values = concat(arrays) unique = np.unique(all_values) return all_values.size == unique.size
[docs] @singledispatch def offset(s: T, move_by: int) -> T: """Move a variable representing a ``place`` by some integer value. Parameters ---------- s: one of (slice, int, np.array) or a dict whose values are of those types Variable to move by an offset. move_by: int Offset value. Returns ------- s': one of (slice, int, np.array) or a dict whose values are of those types The returned type matches ``s``. """ raise TypeError(f"Does not recognize type of first argument: {type(s)}")
@offset.register(slice) def _offset_slice(s: slice, move_by: int) -> slice: return slice(move_by + (s.start or 0), move_by + s.stop) @offset.register(int) def _offset_int(s: int, move_by: int) -> int: return s + move_by @offset.register(dict) def _offset_dict(s: dict[Any, Place], move_by: int) -> dict[Any, Place]: return valmap(partial(offset, move_by=move_by), s) @offset.register(np.ndarray) def _offset_array(s: np.array, move_by: int) -> np.array: return s + move_by
[docs] def uniform_dataclass(dc, val): r"""Construct an instance of dataclass ``dc`` in which all field values are ``val``""" return dc(*(val for _ in dc.__dataclass_fields__))
class _DataclassMap(Protocol): __name__: str __doc__: str def __call__(self, *dc: T, **kwargs) -> T: pass _FieldMap = _DataclassMap
[docs] def dataclass_map(dc: Type[T], f: _FieldMap) -> _DataclassMap: """ If a function should work in the same manner on all fields of a dataclass, create that function and return it. Parameters ---------- dc: dataclass f: Callable Returns ------- A function which operates `f` on all fields of `dc` instances, returning a new `dc` instance. """ def _dc_f(*instances, **kwargs): return dc(*(f(*(getattr(inst, fld) for inst in instances), **kwargs) for fld in dc.__dataclass_fields__)) _dc_f.__name__ = f.__name__ _dc_f.__doc__ = f.__doc__ return _dc_f