diff --git a/README.md b/README.md index 4718790..9bdd778 100644 --- a/README.md +++ b/README.md @@ -27,6 +27,7 @@ Features include: - Skewness, kurtosis - Power spectra metrics - Entropy measures + - Detrended Fluctuation Analysis (DFA) - Kendall tau statistics to quantify trends - Deep learning classifiers for bifurcation prediction ([Bury et al. 2021](https://www.pnas.org/doi/10.1073/pnas.2106140118)) - Visualization tools @@ -37,7 +38,7 @@ Features include: ## Install -Requires Python 3.8–3.11. Install via: +Requires Python 3.9–3.12. Install via: ```bash pip install --upgrade pip diff --git a/ewstools/core.py b/ewstools/core.py index dc64924..7323877 100644 --- a/ewstools/core.py +++ b/ewstools/core.py @@ -539,6 +539,50 @@ def entropy_func(series): self.ews[col] = df_entropy[col] # self.ews = pd.merge(self.ews, df_entropy, how="left", on="time") + + def compute_dfa(self, rolling_window=0.25, order=1): + """ + Compute DFA scaling exponent over a rolling window. + If residuals have not been computed, computation will be + performed over state variable. + Output is stored in the self.ews dataframe. + + Parameters + ---------- + rolling_window : float + Length of rolling window used to compute DFA exponent. Can be + specified as an absolute value or as a proportion of the length + of the data being analysed. Default is 0.25. + order : int + Polynomial detrending order within DFA (1 = linear). Default + is 1. + + Returns + ------- + None. + + """ + if self.transition: + df_pre = self.state[self.state.index <= self.transition] + else: + df_pre = self.state + if 0 < rolling_window <= 1: + rw_absolute = int(rolling_window * len(df_pre)) + else: + rw_absolute = rolling_window + if 'residuals' in df_pre.columns: + dfa_values = ( + df_pre['residuals'] + .rolling(window=rw_absolute) + .apply(func=lambda x: helpers.dfa(x, order=order), raw=True) + ) + else: + dfa_values = ( + df_pre['state'] + .rolling(window=rw_absolute) + .apply(func=lambda x: helpers.dfa(x, order=order), raw=True) + ) + self.ews['dfa'] = dfa_values def compute_ktau(self, tmin="earliest", tmax="latest"): """ Compute kendall tau values of CSD-based EWS. diff --git a/ewstools/helpers.py b/ewstools/helpers.py index 9fded5d..11b446b 100644 --- a/ewstools/helpers.py +++ b/ewstools/helpers.py @@ -1,6 +1,6 @@ ################################################################################################################# # ewstools -# Description: Python package for computing, analysing and visualising +# Description: Python package for computing, analysing and visualising # early warning signals (EWS) in time-series data # Author: Thomas M Bury # Web: https://www.thomasbury.net/ @@ -46,7 +46,7 @@ # For fitting power spectrum models and computing AIC weights from lmfit import Model - + def pspec_welch(yVals, @@ -58,12 +58,12 @@ def pspec_welch(yVals, ''' Computes the power spectrum of a time-series using Welch's method. - + The time-series is assumed to be stationary and to have equally spaced measurements in time. The power spectrum is computed using Welch's method, which computes the power spectrum over a rolling window of subsets of the time-series and then takes the average. - + Args ---- yVals: array of floats @@ -75,24 +75,24 @@ def pspec_welch(yVals, ham_offset: float Hamming offset as a proportion of the Hamming window size. w_cutoff: float - Cutoff frequency used in power spectrum. Given as a proportion of the + Cutoff frequency used in power spectrum. Given as a proportion of the maximum permissable frequency in the empirical power spectrum. scaling: {'spectrum', 'density'} Whether to compute the power spectrum ('spectrum') or the power spectral density ('density'). The power spectral density - is the power spectrum normalised (such that the area underneath equals one). - + is the power spectrum normalised (such that the area underneath equals one). + Returns ------- - pd.Series: + pd.Series: Power values indexed by frequency - + ''' ## Assign properties of *series* to parameters - - # Compute the sampling frequency + + # Compute the sampling frequency fs = 1/dt # Number of data points num_points = len(yVals) @@ -104,7 +104,7 @@ def pspec_welch(yVals, ham_length = num_points # Compute number of points in offset ham_offset_points = int(ham_offset*ham_length) - + ## Compute the periodogram using Welch's method (scipy.signal function) pspec_raw = signal.welch(np.asarray(yVals), fs, @@ -112,22 +112,22 @@ def pspec_welch(yVals, noverlap=ham_offset_points, return_onesided=False, scaling=scaling) - + # Put into a pandas series and index by frequency (scaled by 2*pi) pspec_series = pd.Series(pspec_raw[1], index=2*np.pi*pspec_raw[0], name='power') pspec_series.index.name = 'frequency' - + # Sort into ascending frequency pspec_series.sort_index(inplace=True) - + # Append power spectrum with first value (by symmetry) pspec_series.at[-min(pspec_series.index)] = pspec_series.iat[0] - + # Impose cutoff frequency wmax = w_cutoff*max(pspec_series.index) # cutoff frequency pspec_output = pspec_series[-wmax:wmax] # subset of power spectrum - - + + return pspec_output @@ -136,14 +136,14 @@ def pspec_welch(yVals, #------------Functional forms of power spectra to fit------------# - + def psd_fold(w,sigma,lam): ''' Analytical approximation for the power spectrum prior to a Fold bifurcation ''' return (sigma**2 / (2*np.pi))*(1/(w**2+lam**2)) - + def psd_flip(w,sigma,r): @@ -160,8 +160,8 @@ def psd_hopf(w,sigma,mu,w0): ''' return (sigma**2/(4*np.pi))*(1/((w+w0)**2+mu**2)+1/((w-w0)**2 +mu**2)) - - + + def psd_null(w,sigma): @@ -169,9 +169,9 @@ def psd_null(w,sigma): Power spectrum of white noise (flat). ''' return sigma**2/(2*np.pi) * w**0 - - - + + + @@ -182,7 +182,7 @@ def shopf_init(smax, stot, wdom): ''' Compute the 'best guess' initialisation values for sigma, mu and w0, when fitting sHopf to the empirical power spectrum. - + Args ---- smax: float @@ -191,14 +191,14 @@ def shopf_init(smax, stot, wdom): Total power in the power spectrum. wdom: float Frequency that has the highest power. - + Return ------ - list of floats: + list of floats: List containing the initialisation parameters [sigma, mu, w0] - + ''' - + # Define chunky term (use \ to continue eqn to new line) def alpha(smax, stot, wdom): return stot**3 \ @@ -208,48 +208,48 @@ def alpha(smax, stot, wdom): -13*(np.pi**2)*(wdom**4)*(smax**4)*(stot**2) \ +2*(wdom**2)*(smax**2)*(stot**4) \ ) - - # Initialisation for mu + + # Initialisation for mu mu = -(1/(3*np.pi*smax))*(stot \ +alpha(smax,stot,wdom)**(1/3) \ +(stot**2-12*(np.pi**2)*(wdom**2)*(smax**2))/(alpha(smax,stot,wdom)**(1/3))) - - + + # Initialisation for sigma sigma = np.sqrt( -2*mu*stot) - + # Initialisation for w0 w0 = wdom - + # Return list return [sigma, mu, w0] - + def sfold_init(smax, stot): ''' Compute the 'best guess' initialisation values for sigma and lamda when fitting sfold to the empirical power spectrum. - + Args -------------- smax: float Maximum power in the power spectrum. stot: float Total power in the power spectrum. - + Return ----------------- - list of floats: + list of floats: List containing the initialisation parameters [sigma, lambda] - + ''' - + # Initialisation for sigma sigma = np.sqrt(2*stot**2/(np.pi*smax)) - + # Initialisation for lamda lamda = -stot/(np.pi*smax) @@ -262,28 +262,28 @@ def sflip_init(smax, stot): ''' Compute the 'best guess' initialisation values for sigma and r when fitting sflip to the empirical power spectrum. - + Args -------------- smax: float Maximum power in the power spectrum. stot: float Total power in the power spectrum. - + Return ----------------- - list of floats: + list of floats: List containing the initialisation parameters [sigma, r] - + ''' - - + + # Initialisation for r r =(stot - 2*np.pi*smax)/(stot + 2*np.pi*smax) - + # Initialisation for sigma sigma = np.sqrt(stot*(1-r**2)) - + # Return list return [sigma, r] @@ -293,19 +293,19 @@ def snull_init(stot): ''' Compute the 'best guess' initialisation values for sigma when fitting snull to the empirical power spectrum. - + Args -------------- stot: float Total power in the power spectrum. - + Return ----------------- - list of floats: + list of floats: List containing the initialisation parameters [sigma]. - + ''' - + # Initialisation for sigma sigma = np.sqrt(stot) @@ -317,20 +317,20 @@ def snull_init(stot): #---------Run optimisation to compute best fits-----------# - + # Fold fit def fit_fold(pspec, init): ''' Fit the Fold power spectrum model to pspec and compute AIC score. Uses the package LMFIT for optimisation. - + Args -------------- pspec: pd.Series Power spectrum data as a Series indexed by frequency. init: list of floats Initial parameter guesses of the form [sigma_init, lambda_init]. - + Returns ---------------- list: @@ -338,12 +338,12 @@ def fit_fold(pspec, init): and result is a handle that contains further information on the fit. ''' - - + + # Put frequency values and power values as a list to use LMFIT freq_vals = pspec.index.tolist() power_vals = pspec.tolist() - + sigma_init, lambda_init = init # Assign model object model = Model(psd_fold) @@ -354,14 +354,14 @@ def fit_fold(pspec, init): model.set_param_hint('sigma', value=sigma_init, min=0, max=10*sigma_init) # Parameter constraints for lambda model.set_param_hint('lam', min=-np.sqrt(psi_fold/(1-psi_fold))*wMax, max=0, value=lambda_init) - + # Assign initial parameter values and constraints - params = model.make_params() + params = model.make_params() # Fit model to the empircal spectrum result = model.fit(power_vals, params, w=freq_vals) # Compute AIC score aic = result.aic - + # Export AIC score and model fit return [aic, result] @@ -374,14 +374,14 @@ def fit_flip(pspec, init): ''' Fit the Flip power spectrum model to pspec and compute AIC score. Uses the package LMFIT for optimisation. - + Args -------------- pspec: pd.Series Power spectrum data as a Series indexed by frequency. init: list of floats Initial parameter guesses of the form [sigma_init, r_init]. - + Returns ---------------- list: @@ -389,12 +389,12 @@ def fit_flip(pspec, init): and result is a handle that contains further information on the fit. ''' - - + + # Put frequency values and power values as a list to use LMFIT freq_vals = pspec.index.tolist() power_vals = pspec.tolist() - + sigma_init, r_init = init # Assign model object model = Model(psd_flip) @@ -402,9 +402,9 @@ def fit_flip(pspec, init): model.set_param_hint('sigma', value=sigma_init, min=0, max=10*sigma_init) # Parameter constraints for r model.set_param_hint('r', min=-1, max=0, value=r_init) - + # Assign initial parameter values and constraints - params = model.make_params() + params = model.make_params() # Fit model to the empircal spectrum result = model.fit(power_vals, params, w=freq_vals) # Compute AIC score @@ -419,19 +419,19 @@ def fit_flip(pspec, init): # Function to fit Hopf model to empirical specrum with specified initial parameter guess -def fit_hopf(pspec, init): - +def fit_hopf(pspec, init): + ''' Fit the Hopf power spectrum model to pspec and compute AIC score. Uses the package LMFIT for optimisation. - + Args -------------- pspec: pd.Series Power spectrum data as a Series indexed by frequency init: list of floats Initial parameter guesses of the form [sigma_init, mu_init, w0_init] - + Returns ---------------- list: @@ -439,48 +439,48 @@ def fit_hopf(pspec, init): and result is a handle that contains further information on the fit. ''' - - + + # Put frequency values and power values as a list to use LMFIT freq_vals = pspec.index.tolist() power_vals = pspec.tolist() - + # Assign labels to initialisation values sigma_init, mu_init, w0_init = init - - - # If any labels are nan, resort to default values + + + # If any labels are nan, resort to default values if np.isnan(sigma_init) or np.isnan(mu_init) or np.isnan(w0_init): sigma_init, mu_init, w0_init = [1,-0.1,1] - + # Constraint parameter psi_hopf = 0.2 - + # Compute initialisation value for the dummy variable delta (direct map with w0) # It must be positive to adhere to constraint - thus if negative set to 0. delta_init = max( w0_init + (mu_init/(2*np.sqrt(psi_hopf)))*np.sqrt(4-3*psi_hopf + np.sqrt(psi_hopf**2-16*psi_hopf+16)), 0.0001) - - # Assign model object + + # Assign model object model = Model(psd_hopf) - + ## Set initialisations parameters in model attributes - + # Sigma must be positive, and set a (high) upper bound to avoid runaway computation model.set_param_hint('sigma', value=sigma_init, min=0) # Psi is a fixed parameter (not used in optimisation) model.set_param_hint('psi', value=psi_hopf, vary=False) - # Mu must be negative + # Mu must be negative model.set_param_hint('mu', value=mu_init, max=0, vary=True) # Delta is a dummy parameter, satisfying d = w0 - wThresh (see paper for wThresh). It is allowed to vary, in place of w0. model.set_param_hint('delta', value = delta_init, min=0, vary=True) # w0 is a fixed parameter dependent on delta (w0 = delta + wThresh) model.set_param_hint('w0',expr='delta - (mu/(2*sqrt(psi)))*sqrt(4-3*psi + sqrt(psi**2-16*psi+16))',max=2.5,vary=False) - + # Assign initial parameter values and constraints - params = model.make_params() + params = model.make_params() # Fit model to the empircal spectrum result = model.fit(power_vals, params, w=freq_vals) # Compute AIC score @@ -496,14 +496,14 @@ def fit_null(pspec, init): ''' Fit the Null power spectrum model to pspec and compute AIC score. Uses the package LMFIT for optimisation. - + Args -------------- pspec: pd.Series Power spectrum data as a Series indexed by frequency init: list of floats Initial parameter guesses of the form [sigma_init] - + Returns ---------------- list: @@ -511,26 +511,26 @@ def fit_null(pspec, init): and result is a handle that contains further information on the fit. ''' - + # Put frequency values and power values as a list to use LMFIT freq_vals = pspec.index.tolist() power_vals = pspec.tolist() - + sigma_init = init[0] - + # Assign model object model = Model(psd_null) - - # Initial parameter value for Null fit + + # Initial parameter value for Null fit model.set_param_hint('sigma', value=sigma_init, vary=True, min=0, max=10*sigma_init) - + # Assign initial parameter values and constraints - params = model.make_params() + params = model.make_params() # Fit model to the empircal spectrum result = model.fit(power_vals, params, w=freq_vals) # Compute AIC score aic = result.aic - + # Export AIC score and model fit return [aic, result] @@ -541,34 +541,34 @@ def fit_null(pspec, init): def aic_weights(aic_scores): ''' Computes AIC weights, given AIC scores. - + Args ---------------- aic_scores: np.array An array of AIC scores - + Returns ----------------- np.array Array of the corresponding AIC weights - + ''' - - + + # Best AIC score aic_best = min(aic_scores) - + # Differences in score from best model aic_diff = aic_scores - aic_best - + # Likelihoods for each model llhd = np.exp(-(1/2)*aic_diff) - + # Normalise to get AIC weights return llhd/sum(llhd) - - - + + + #-----------Compute spectral metrics (EWS) from power spectrum------# @@ -581,7 +581,7 @@ def pspec_metrics(pspec, ''' Compute the metrics associated with pspec that can be used as EWS. - + Args ------------------- pspec: pd.Series @@ -591,61 +591,61 @@ def pspec_metrics(pspec, coherence factor ('cf'), AIC weights ('aic'). aic: AIC weights to compute sweep: bool - If 'True', sweep over a range of intialisation - parameters when optimising to compute AIC scores, at the expense of + If 'True', sweep over a range of intialisation + parameters when optimising to compute AIC scores, at the expense of longer computation. If 'False', intialisation parameter is taken as the 'best guess'. - + Return ------------------- dict: A dictionary of spectral EWS obtained from pspec - + ''' - - + + # Initialise a dictionary for EWS spec_ews = {} - + ## Compute Smax if 'smax' in ews: smax = max(pspec) # add to DataFrame spec_ews['Smax'] = smax - - - + + + ## Compute the coherence factor if 'cf' in ews: - + # frequency at which peak occurs w_peak = abs(pspec.idxmax()) - + # power of peak frequency power_peak = pspec.max() - + # compute the first frequency from -w_peak at which power 0: w_peak = 0 - + else: # double the difference between w_half and -w_peak to get the width of the peak w_width = 2*(w_half - (-w_peak)) - + # compute coherence factor (height/relative width) coher_factor = power_peak/(w_width/w_peak) if w_peak != 0 else 0 # add to dataframe spec_ews['Coherence factor'] = coher_factor - + ## Compute AIC weights of fitted analytical forms if 'aic' in ews: - + # Compute the empirical metrics that allow us to choose sensible initialisation parameters # Peak in power spectrum smax = pspec.max() @@ -653,13 +653,13 @@ def pspec_metrics(pspec, stot = pspec.sum()*(pspec.index[1]-pspec.index[0]) # Dominant frequency (take positive value) wdom = abs(pspec.idxmax()) - - ## Create array of initialisation parmaeters - + + ## Create array of initialisation parmaeters + # Sweep values (as proportion of baseline guess) if sweep = True sweep_vals = np.array([0.5,1,1.5]) if sweep else np.array([1]) - + # Baseline parameter initialisations (computed using empirical spectrum) # Sfold [sigma_init_fold, lambda_init] = sfold_init(smax,stot) @@ -669,17 +669,17 @@ def pspec_metrics(pspec, [sigma_init_hopf, mu_init, w0_init] = shopf_init(smax,stot,wdom) # Snull [sigma_init_null] = snull_init(stot) - - + + # Arrays of initial values init_fold_array = {'sigma': sweep_vals*sigma_init_fold, 'lambda': sweep_vals*lambda_init} - + # r parameter cannot go below -1 r_sweep_vals = [0.5*r_init,r_init,0.5*r_init+0.5] if sweep else [r_init] init_flip_array = {'sigma': sweep_vals*sigma_init_flip, - 'r': r_sweep_vals} - + 'r': r_sweep_vals} + init_hopf_array = {'sigma': sweep_vals*sigma_init_hopf, 'mu': sweep_vals*mu_init, 'w0': sweep_vals*w0_init} @@ -688,9 +688,9 @@ def pspec_metrics(pspec, ## Compute AIC values and fits - + ## Fold - + # Initialise list to store AIC and model fits fold_aic_fits = [] @@ -706,12 +706,12 @@ def pspec_metrics(pspec, # Put list into array array_temp = np.array(fold_aic_fits) # Pick out the best model - [aic_fold, model_fold] = array_temp[array_temp[:,0].argmin()] - - - + [aic_fold, model_fold] = array_temp[array_temp[:,0].argmin()] + + + ## Flip - + # Initialise list to store AIC and model fits flip_aic_fits = [] @@ -727,15 +727,15 @@ def pspec_metrics(pspec, # Put list into array array_temp = np.array(flip_aic_fits) # Pick out the best model - [aic_flip, model_flip] = array_temp[array_temp[:,0].argmin()] - - - - - - + [aic_flip, model_flip] = array_temp[array_temp[:,0].argmin()] + + + + + + ## Hopf - + # Initialise list to store AIC and model fits hopf_aic_fits = [] @@ -752,13 +752,13 @@ def pspec_metrics(pspec, # Put list into array array_temp = np.array(hopf_aic_fits) # Pick out the best model - [aic_hopf, model_hopf] = array_temp[array_temp[:,0].argmin()] - - - - + [aic_hopf, model_hopf] = array_temp[array_temp[:,0].argmin()] + + + + ## Null - + # Initialise list to store AIC and model fits null_aic_fits = [] @@ -773,9 +773,9 @@ def pspec_metrics(pspec, # Put list into array array_temp = np.array(null_aic_fits) # Pick out the best model - [aic_null, model_null] = array_temp[array_temp[:,0].argmin()] - - + [aic_null, model_null] = array_temp[array_temp[:,0].argmin()] + + # Compute chosen AIC weights from the AIC scores aic_scores = {} if 'Fold' in aic: @@ -786,10 +786,10 @@ def pspec_metrics(pspec, aic_scores['Hopf']=aic_hopf if 'Null' in aic: aic_scores['Null']=aic_null - + aicw = aic_weights(np.array([aic_scores[x] for x in aic])) - aic_dict = dict(zip(aic,aicw)) - + aic_dict = dict(zip(aic,aicw)) + # Add to Dataframe if 'Fold' in aic: spec_ews['AIC fold'] = aic_dict['Fold'] @@ -799,8 +799,8 @@ def pspec_metrics(pspec, spec_ews['AIC hopf'] = aic_dict['Hopf'] if 'Null' in aic: spec_ews['AIC null'] = aic_dict['Null'] - - + + # Add fitted parameter values to DataFrame spec_ews['Params fold'] = dict((k, model_fold.values[k]) for k in ('sigma','lam')) # don't include dummy params spec_ews['Params flip'] = dict((k, model_flip.values[k]) for k in ('sigma','r')) @@ -811,7 +811,7 @@ def pspec_metrics(pspec, # Return DataFrame of metrics return spec_ews - + @@ -821,33 +821,33 @@ def pspec_metrics(pspec, def compute_autocov(df_in): ''' - Computes the autocovariance (lag-1) matrix of n + Computes the autocovariance (lag-1) matrix of n time series provided in df_in. Using the definition phi_ij = < X_i(t+1) X_j(t) > for each element of the autocovariance matrix phi. - + Args ------------------- df_in: DataFrame with n columns indexed by time - + Return ------------------- np.array: autocovariance matrix - + ''' - + # Obtain column names of df_in col_names = df_in.columns # Number of variables n = len(col_names) - - + + # Define function to compute autocovariance of two columns def autocov_cols(a,b): ''' @@ -858,23 +858,23 @@ def autocov_cols(a,b): Output: float: autocovariance between the columns ''' - + # Shift the column of a by 1 a_shift = a.shift(1) - + # Put into a dataframe df_temp = pd.concat([a_shift,b], axis=1) - + # Compute covariance of columns a and b_shift cov = df_temp.cov().iloc[0,1] - + # Output return cov - - + + # Compute elements of autocovariance matrix list_elements = [] - + for i in range(n): for j in range(n): a = df_in[col_names[i]] @@ -883,7 +883,7 @@ def autocov_cols(a,b): autocov = autocov_cols(a,b) # Append to list of elements list_elements.append(autocov) - + # Create autocovariance matrix from list of elements ar_autocov = np.array(list_elements).reshape(n,n) @@ -894,74 +894,74 @@ def autocov_cols(a,b): ''' - Computes the autocovariance (lag-1) matrix of n + Computes the autocovariance (lag-1) matrix of n time series provided in df_in. Using the definition phi_ij = < X_i(t+1) X_j(t) > for each element of the autocovariance matrix phi. - + Args ------------------- df_in: DataFrame with n columns indexed by time - + Return ------------------- np.array: autocovariance matrix - + ''' #--------------------------------------- -## Function to do Jacobian and eval reconstruction +## Function to do Jacobian and eval reconstruction def eval_recon(df_in): ''' Constructs estimate of Jacobian matrix from stationary time-series data and outputs the eigenvalues, eigenvectors and jacobian. - + Args ------------------- df_in: DataFrame with two columns indexed by time - + Return ------------------- dict - Consists of 'Eigenvalues': np.array of eigenvalues. - 'Eigenvectors': np.array of eigenvectors. 'Jacobian': pd.DataFrame of + Consists of 'Eigenvalues': np.array of eigenvalues. + 'Eigenvectors': np.array of eigenvectors. 'Jacobian': pd.DataFrame of Jacobian entries. - + ''' - + # Get the time-separation between data points dt = df_in.index[1] -df_in.index[0] - + # Compute autocovaraince matrix from columns ar_autocov = compute_autocov(df_in) - + # Compute the covariance matrix (built in function) ar_cov = df_in.cov() - + # Estimate of discrete Jacobian (formula in Williamson (2015)) # Requires computation of an inverse matrix jac = np.matmul(ar_autocov, np.linalg.inv(ar_cov)) # Write the Jacobian as a df for output (so we have col lables) df_jac = pd.DataFrame(jac, columns = df_in.columns, index=df_in.columns) - + # Compute eigenvalues and eigenvectors evals, evecs = np.linalg.eig(jac) - + # Dictionary of data output - dic_out = {'Eigenvalues':evals, + dic_out = {'Eigenvalues':evals, 'Eigenvectors':evecs, 'Jacobian':df_jac} - + return dic_out @@ -970,9 +970,84 @@ def eval_recon(df_in): - - - - + + + + + + +def dfa(series, order=1, min_scale=4, max_scale=None, n_scales=20): + ''' + Compute the DFA scaling exponent (alpha) of a 1D array. + + Detrended Fluctuation Analysis characterises how RMS fluctuations + F(s) scale with window size s: F(s) ~ s^alpha. + + Interpretation: + alpha ~ 0.5 -> uncorrelated (white noise) + alpha > 0.5 -> persistent long-range correlations + alpha ~ 1.0 -> 1/f (pink) noise + alpha ~ 1.5 -> Brownian motion + + Args + ------------------- + series: array of floats + 1D input data. + order: int + Polynomial detrending order (1 = linear). Default is 1. + min_scale: int + Minimum segment size in samples. Default is 4. + max_scale: int or None + Maximum segment size. Clipped to len(series)//2. Default is None. + n_scales: int + Number of logarithmically spaced scales. Default is 20. + + Return + ------------------- + float: + DFA scaling exponent alpha. Returns NaN if insufficient data. + + ''' + x = np.asarray(series, dtype=np.float64) + if not np.isfinite(x).all(): + return float('nan') + n = len(x) + if n < 2 * min_scale: + return float('nan') + y = np.cumsum(x - x.mean()) + effective_max = n // 2 + if max_scale is None: + max_scale = effective_max + else: + max_scale = min(max_scale, effective_max) + max_scale = max(max_scale, min_scale + 1) + raw_scales = np.round( + np.logspace(np.log10(min_scale), np.log10(max_scale), n_scales) + ).astype(int) + scales = np.unique(raw_scales) + scales = scales[scales >= min_scale] + fluct = np.empty(len(scales)) + valid = np.zeros(len(scales), dtype=bool) + for idx, s in enumerate(scales): + n_segs = n // s + if n_segs < 2: + continue + seg_len = n_segs * s + segs = y[:seg_len].reshape(n_segs, s) + t = np.arange(s, dtype=np.float64) + rms_sum = 0.0 + for seg in segs: + coeffs = np.polyfit(t, seg, order) + trend = np.polyval(coeffs, t) + rms_sum += np.mean((seg - trend) ** 2) + fluct[idx] = np.sqrt(rms_sum / n_segs) + valid[idx] = True + valid_idx = valid & (fluct > 0) + if valid_idx.sum() < 3: + return float('nan') + log_s = np.log10(scales[valid_idx].astype(float)) + log_f = np.log10(fluct[valid_idx]) + alpha = float(np.polyfit(log_s, log_f, 1)[0]) + return alpha diff --git a/tests/test_dfa.py b/tests/test_dfa.py new file mode 100644 index 0000000..d0a0504 --- /dev/null +++ b/tests/test_dfa.py @@ -0,0 +1,60 @@ +"""Tests for DFA (Detrended Fluctuation Analysis).""" + +import numpy as np +import pandas as pd +import ewstools +from ewstools import helpers + + +def test_dfa_white_noise(): + """White noise has no long-range correlations: alpha ~ 0.5.""" + rng = np.random.default_rng(42) + x = rng.standard_normal(2000) + alpha = helpers.dfa(x) + assert not np.isnan(alpha) + assert 0.35 <= alpha <= 0.65, f"White noise alpha out of range: {alpha:.4f}" + + +def test_dfa_brownian_motion(): + """Brownian motion (cumulative sum of white noise): alpha ~ 1.5.""" + rng = np.random.default_rng(42) + x = np.cumsum(rng.standard_normal(2000)) + alpha = helpers.dfa(x) + assert not np.isnan(alpha) + assert 1.3 <= alpha <= 1.7, f"Brownian alpha out of range: {alpha:.4f}" + + +def test_dfa_short_series(): + """DFA returns NaN for series too short to analyse.""" + x = np.random.default_rng(42).standard_normal(5) + assert np.isnan(helpers.dfa(x, min_scale=4)) + + +def test_dfa_deterministic(): + """DFA is deterministic for identical input.""" + x = np.random.default_rng(99).standard_normal(1000) + assert helpers.dfa(x) == helpers.dfa(x) + + +def test_TimeSeries_dfa(): + """DFA integrates correctly through the TimeSeries interface.""" + rng = np.random.default_rng(42) + tVals = np.arange(0, 10, 0.02) + data = pd.Series(5 + rng.standard_normal(len(tVals)), index=tVals) + ts = ewstools.TimeSeries(data, transition=8) + ts.compute_dfa(rolling_window=0.25) + assert isinstance(ts.ews, pd.DataFrame) + assert "dfa" in ts.ews.columns + ts.compute_ktau() + assert isinstance(ts.ktau, dict) + assert "dfa" in ts.ktau.keys() + + +def test_TimeSeries_dfa_no_transition(): + """DFA works on a TimeSeries without a specified transition point.""" + rng = np.random.default_rng(42) + tVals = np.arange(0, 10, 0.02) + data = pd.Series(5 + rng.standard_normal(len(tVals)), index=tVals) + ts = ewstools.TimeSeries(data) + ts.compute_dfa(rolling_window=0.25) + assert "dfa" in ts.ews.columns