diff --git a/changelog/18.feature.md b/changelog/18.feature.md new file mode 100644 index 0000000..b6b4665 --- /dev/null +++ b/changelog/18.feature.md @@ -0,0 +1,12 @@ ++ include support for logistic-decay ++ we added three new classes/functions to the API in the module `convergence` + + `LogisticDecaySplineHelper` + + `LogisticDecaySplineHelperDerivative` + + `get_logistic_decay_harmonised_spline` ++ whereby the function `get_logistic_decay_harmonised_spline` is mainly meant as user-interface + This function can be used as value for the `get_harmonise_spline` parameter in `harmonise`. + The function takes the same arguments as `get_cosine_decay_harmonised_spline` and additionally a + `slope` and `shift` argument. ++ These arguments determine slope and location of the logistic function and can be passed using + `functools.partial(get_logistic_decay_harmonised_spline, slope = 0., shift = 0.)` ++ An example use-case is added to the `getting-started-tutorial`. diff --git a/docs/tutorials/tutorial.py b/docs/tutorials/tutorial.py index 1d47bb2..84b78b2 100644 --- a/docs/tutorials/tutorial.py +++ b/docs/tutorials/tutorial.py @@ -32,6 +32,7 @@ from gradient_aware_harmonisation import harmonise from gradient_aware_harmonisation.convergence import ( + get_logistic_decay_harmonised_spline, get_polynomial_decay_harmonised_spline, ) from gradient_aware_harmonisation.plotting import plotting @@ -115,6 +116,30 @@ get_harmonised_spline=partial(get_polynomial_decay_harmonised_spline, power=2.0), ) +plotting( + harmonisee_timeseries, + target_timeseries, + harmonised_timeseries, + harmonisation_time, + convergence_time, +) + +# %% [markdown] +# ### Use logistic-decay + +# %% +convergence_time = 8.0 + +harmonised_timeseries = harmonise( + target_timeseries=target_timeseries, + harmonisee_timeseries=harmonisee_timeseries, + harmonisation_time=harmonisation_time, + convergence_time=convergence_time, + get_harmonised_spline=partial( + get_logistic_decay_harmonised_spline, slope=-1.0, shift=-8.0 + ), +) + plotting( harmonisee_timeseries, target_timeseries, diff --git a/src/gradient_aware_harmonisation/convergence.py b/src/gradient_aware_harmonisation/convergence.py index 7f16642..b83d8ac 100644 --- a/src/gradient_aware_harmonisation/convergence.py +++ b/src/gradient_aware_harmonisation/convergence.py @@ -570,7 +570,7 @@ def calc_gamma_rising_derivative( return -gamma_rising_derivative return gamma_rising_derivative - def derivative(self) -> CosineDecaySplineHelperDerivative: + def derivative(self) -> PolynomialDecaySplineHelperDerivative: """ Calculate the derivative of self @@ -581,7 +581,7 @@ def derivative(self) -> CosineDecaySplineHelperDerivative: """ raise NotImplementedError - def antiderivative(self) -> CosineDecaySplineHelperDerivative: + def antiderivative(self) -> PolynomialDecaySplineHelperDerivative: """ Calculate the anti-derivative/integral of self @@ -664,3 +664,367 @@ def get_polynomial_decay_harmonised_spline( convergence_spline, ), ) + + +@define +class LogisticDecaySplineHelper: + """ + Spline that supports being used as a logistic-decay between splines + + Between `initial_time` and `final_time`, + we return values based on a polynomial-decay between 1 and 0 + if `self.apply_to_convergence` is `False`, + otherwise we return values based on a logistic-increase between 0 and 1. + """ + + initial_time: Union[float, int] + """ + At and before this time, we return values from `initial` + """ + + final_time: Union[float, int] + """ + At and after this time, we return values from `final` + """ + + slope: Union[float, int] + """ + Slope of logistic function + """ + + shift: Union[float, int] + """ + Shift of logistic function + """ + + apply_to_convergence: bool = False + """ + Is this helper being applied to the convergence spline? + + If `True`, we return 1 - the weights, rather than the weights. + """ + + # domain: ClassVar[list[float, float]] = [ + # np.finfo(np.float64).tiny, + # np.finfo(np.float64).max, + # ] + # """Domain of spline""" + + @overload + def __call__(self, x: int | float) -> int | float: ... + + @overload + def __call__(self, x: NP_FLOAT_OR_INT) -> NP_FLOAT_OR_INT: ... + + @overload + def __call__(self, x: NP_ARRAY_OF_FLOAT_OR_INT) -> NP_ARRAY_OF_FLOAT_OR_INT: ... + + def __call__( + self, x: int | float | NP_FLOAT_OR_INT | NP_ARRAY_OF_FLOAT_OR_INT + ) -> int | float | NP_FLOAT_OR_INT | NP_ARRAY_OF_FLOAT_OR_INT: + """ + Evaluate the spline at a given x-value + + Parameters + ---------- + x + x-value + + Returns + ------- + : + Value of the spline at `x` + """ + + def calc_gamma( + x: int | float | NP_FLOAT_OR_INT | NP_ARRAY_OF_FLOAT_OR_INT, + ) -> int | float | NP_FLOAT_OR_INT | NP_ARRAY_OF_FLOAT_OR_INT: + """Get polynomial-decay""" + # compute weight (here: gamma) according to logistic-decay + + x_prime = x - self.initial_time + delta = self.final_time - self.initial_time + + gamma_decaying: int | float | NP_FLOAT_OR_INT | NP_ARRAY_OF_FLOAT_OR_INT = ( + 1 + - 1 + / ( + 1 + + np.exp( + -2 * np.exp(self.slope) * delta * x_prime + + 3 * delta + + self.shift + ) + ) + ) + + return gamma_decaying + + if not isinstance(x, np.ndarray): + if x <= self.initial_time: + gamma: float | NP_FLOAT_OR_INT | NP_ARRAY_OF_FLOAT_OR_INT = 1.0 + elif x >= self.final_time: + gamma = 0.0 + else: + gamma = calc_gamma(x) + + # The weighted sum for computing the harmonised AND converged + # function has the form: "gamma * harmonised + (1-gamma) * convergence". + # Depending on which product we want to compute (LHS or RHS of sum), + # we need gamma or 1-gamma, therefore we include the following condition + # in all our return statements. + if self.apply_to_convergence: + return 1.0 - gamma + + return gamma + + # apply decay function only to values that lie between harmonisation + # time and convergence-time + x_gte_final_time = np.where(x >= self.final_time) + x_decay = np.logical_and(x >= self.initial_time, x < self.final_time) + gamma = np.ones_like(x, dtype=np.floating) + gamma[x_gte_final_time] = 0.0 + gamma[x_decay] = calc_gamma(x[x_decay]) + + if self.apply_to_convergence: + return 1.0 - gamma + + return gamma + + def derivative(self) -> LogisticDecaySplineHelperDerivative: + """ + Calculate the derivative of self + + Returns + ------- + : + Derivative of self + + """ + return LogisticDecaySplineHelperDerivative( + initial_time=self.initial_time, + final_time=self.final_time, + slope=self.slope, + shift=self.shift, + apply_to_convergence=self.apply_to_convergence, + ) + + def antiderivative(self) -> LogisticDecaySplineHelperDerivative: + """ + Calculate the anti-derivative/integral of self + + Returns + ------- + : + Anti-derivative of self + """ + raise NotImplementedError + + +@define +class LogisticDecaySplineHelperDerivative: + """ + Derivative of [LogisticDecaySplineHelper][(m).] + """ + + initial_time: Union[float, int] + """ + Initial time of the logistic-decay + """ + + final_time: Union[float, int] + """ + Final time of the logistic-decay + """ + + slope: Union[float, int] + """ + Slope of logistic function + """ + + shift: Union[float, int] + """ + Shift of logistic function + """ + + apply_to_convergence: bool = False + """ + Is this helper being applied to the convergence spline? + """ + + # domain: ClassVar[list[float, float]] = [ + # np.finfo(np.float64).tiny, + # np.finfo(np.float64).max, + # ] + # """Domain of spline""" + + @overload + def __call__(self, x: int | float) -> int | float: ... + + @overload + def __call__(self, x: NP_FLOAT_OR_INT) -> NP_FLOAT_OR_INT: ... + + @overload + def __call__(self, x: NP_ARRAY_OF_FLOAT_OR_INT) -> NP_ARRAY_OF_FLOAT_OR_INT: ... + + def __call__( + self, x: int | float | NP_FLOAT_OR_INT | NP_ARRAY_OF_FLOAT_OR_INT + ) -> int | float | NP_FLOAT_OR_INT | NP_ARRAY_OF_FLOAT_OR_INT: + """ + Evaluate the spline at a given x-value + + Parameters + ---------- + x + x-value + + Returns + ------- + : + Value of the spline at `x` + """ + + # compute weight (here: gamma) according to a polynomial-decay + def calc_gamma_rising_derivative( + x: int | float | NP_FLOAT_OR_INT | NP_ARRAY_OF_FLOAT_OR_INT, + ) -> int | float | NP_FLOAT_OR_INT | NP_ARRAY_OF_FLOAT_OR_INT: + """Get logistic-decay derivative""" + # compute derivative of gamma according to a logistic-decay + x_prime = x - self.initial_time + delta = self.final_time - self.initial_time + scaler = 2 * self.slope * delta + + numerator = scaler * np.exp(-scaler * x_prime + 3 * delta + self.shift) + denominator = np.exp(-scaler * x_prime + 3 * delta + self.shift) + + gamma_decaying_derivative = -numerator / (denominator + 1) ** 2 + + return gamma_decaying_derivative + + if not isinstance(x, np.ndarray): + if x <= self.initial_time or x >= self.final_time: + return 0.0 + + gamma_rising_derivative = calc_gamma_rising_derivative(x) + + # The weighted sum for computing the harmonised AND converged + # function has the form: "gamma * harmonised + (1-gamma) * convergence". + # Depending on which product we want to compute (LHS or RHS of sum), + # we need gamma or 1-gamma, therefore we include the following condition + # in all our return statements. + if self.apply_to_convergence: + return -gamma_rising_derivative + + return gamma_rising_derivative + + # apply decay function only to values that lie between harmonisation + # time and convergence-time + x_decay = np.where(np.logical_and(x > self.initial_time, x < self.final_time)) + gamma_rising_derivative = np.zeros_like(x, dtype=np.floating) + gamma_rising_derivative[x_decay] = calc_gamma_rising_derivative(x[x_decay]) + + if self.apply_to_convergence: + return -gamma_rising_derivative + return gamma_rising_derivative + + def derivative(self) -> LogisticDecaySplineHelperDerivative: + """ + Calculate the derivative of self + + Returns + ------- + : + Derivative of self + """ + raise NotImplementedError + + def antiderivative(self) -> LogisticDecaySplineHelperDerivative: + """ + Calculate the anti-derivative/integral of self + + Returns + ------- + : + Anti-derivative of self + + Raises + ------ + NotImplementedError + """ + raise NotImplementedError + + +def get_logistic_decay_harmonised_spline( # noqa: PLR0913 + harmonisation_time: Union[int, float], + convergence_time: Union[int, float], + harmonised_spline_no_convergence: Spline, + convergence_spline: Spline, + slope: Union[int, float], + shift: Union[int, float], +) -> SumOfSplines: + """ + Generate the harmonised spline based on a logistic-decay + + Parameters + ---------- + harmonisation_time + Harmonisation time + + This is the time at and before which + the solution should be equal to `harmonised_spline_no_convergence`. + + convergence_time + Convergence time + + This is the time at and after which + the solution should be equal to `convergence_spline`. + + harmonised_spline_no_convergence + Harmonised spline that does not consider convergence + + convergence_spline + The spline to which the result should converge + + slope + Determine steepness of logistic function + Slope < 0: decrease slope; slope > 0: increase slope + + shift + Shift the logistic function horizontally + Shift < 0: left and shift > 0: right + + Returns + ------- + : + Harmonised spline + """ + # The harmonised spline is considered as the spline that match + # the target-spline at the harmonisation time (wrt to zero-and + # first order derivative). Then we use a decay function to let + # the harmonised spline converge to the convergence-spline. + # This decay function has the form of a weighted sum: + # weight * harmonised_spline + (1-weight) * convergence_spline + # With weights decaying from 1 to 0 whereby the decay trajectory + # is determined by the logistic decay. + return SumOfSplines( + ProductOfSplines( + LogisticDecaySplineHelper( + initial_time=harmonisation_time, + final_time=convergence_time, + slope=slope, + shift=shift, + apply_to_convergence=False, + ), + harmonised_spline_no_convergence, + ), + ProductOfSplines( + LogisticDecaySplineHelper( + initial_time=harmonisation_time, + final_time=convergence_time, + slope=slope, + shift=shift, + apply_to_convergence=True, + ), + convergence_spline, + ), + ) diff --git a/tests/unit/test_logistic_decay_derivative.py b/tests/unit/test_logistic_decay_derivative.py new file mode 100644 index 0000000..0a9aa31 --- /dev/null +++ b/tests/unit/test_logistic_decay_derivative.py @@ -0,0 +1,31 @@ +import numpy as np +import pytest + +from gradient_aware_harmonisation.convergence import ( + LogisticDecaySplineHelperDerivative, +) + +scipy = pytest.importorskip("scipy") + + +def test_LogisticDecaySplineHelperDerivative(): + """ + Test correct computation of derivative + """ + initial_time = 2002 + final_time = 2020 + apply_to_convergence = False + + logistic_helper_deriv = LogisticDecaySplineHelperDerivative( + initial_time=initial_time, + final_time=final_time, + apply_to_convergence=apply_to_convergence, + slope=np.exp(0.0), + shift=0.0, + ) + + integral, _ = scipy.integrate.quad( + logistic_helper_deriv, a=initial_time, b=final_time + ) + + np.testing.assert_allclose(integral, -1.0)