diff --git a/src/.vitepress/routes/sidebar/user.ts b/src/.vitepress/routes/sidebar/user.ts index dc9cefe0..3c2b1772 100644 --- a/src/.vitepress/routes/sidebar/user.ts +++ b/src/.vitepress/routes/sidebar/user.ts @@ -12,18 +12,43 @@ export const userRoutes = [ collapsed: true, items: [ { - text: 'Examples', + text: 'Analysis Coding', collapsed: true, items: [ - {text: 'Analysis Coding', link: '/analysis-coding'}, - {text: 'Aggregation with fedstats', link: '/survival-regression.md'}, - {text: 'Federated GLM', link: '/federated-logistic-regression.md'}, + {text: 'Introduction', link: '/analysis-coding'}, + { + text: 'Examples', + collapsed: true, + items: [ + {text: 'Aggregation with fedstats', link: '/coding_examples/survival-regression'}, + {text: 'Basic VCF QC', link: '/coding_examples/vcf-qc'}, + {text: 'CLI Tools FastQC', link: '/coding_examples/cli-fastqc'}, + {text: 'Deep Learning image classification', link: '/coding_examples/deep-learning-image-classifier'}, + {text: 'Differential Privacy', link: '/coding_examples/differential-privacy-mvp'}, + {text: 'Fedstats GLM', link: '/coding_examples/fedstats-logistic-regression'}, + {text: 'Federated Logistic Regression', link: '/coding_examples/federated-logistic-regression'}, + {text: 'GeMTeX text scores', link: '/coding_examples/gemtex-text-score-example'}, + {text: 'PPRL', link: '/coding_examples/record_linkage'}, + ] + }, ] }, - {text: 'PPRL', link: '/record_linkage'}, - {text: 'Basic VCF QC', link: '/vcf-qc'}, - {text: 'CLI Tools FastQC', link: '/cli-fastqc'}, + { + text: 'Local Testing', + collapsed: true, + items: [ + {text: 'Introduction', link: '/local-testing'}, + { + text: 'Examples', + collapsed: true, + items: [ + {text: 'Logistic Regression', link: '/testing_examples/local-testing-logistic-regression-example'}, + {text: 'Differential Privacy', link: '/testing_examples/local-testing-dp-example'}, + ] + }, + ] + }, {text: 'FHIR Queries', link: '/fhir-query'}, // {text: 'Homomorphic Encryption', link: '/homomorphic-encryption'}, ] diff --git a/src/guide/user/analysis-coding.md b/src/guide/user/analysis-coding.md index 4019d86d..9587364c 100644 --- a/src/guide/user/analysis-coding.md +++ b/src/guide/user/analysis-coding.md @@ -62,13 +62,12 @@ class MyAggregator(StarAggregator): total_patient_count = sum(analysis_results) return total_patient_count - def has_converged(self, result, last_result, num_iterations): + def has_converged(self, result, last_result): """ Determines if the aggregation process has converged. :param result: The current aggregated result. :param last_result: The aggregated result from the previous iteration. - :param num_iterations: The number of iterations completed so far. :return: True if the aggregation has converged; False to continue iterations. """ # TODO (optional): if the parameter 'simple_analysis' in 'StarModel' is set to False, @@ -132,7 +131,27 @@ if __name__ == "__main__": - Input-Parameters given by ``StarModel``: - ``result``: Output of the current iteration's ``aggregation_method()``. - ``last_result``: Output of the previous iteration's ``aggregation_method()``. - - ``num_iterations``: Number of iterations executed. This number is incremented **after** executing the ``has_converged()``-check, i.e. equates to 1 in the second iteration of the analysis. - ``main()``-function: Instantiates the ``StarModel`` class automatically executing the analysis on the node (either as an aggregator or analyzer node). This script serves as a basic "Hello World" example for performing federated analysis using FHIR data. + +### Utilizing Local Differential Privacy in ``StarModel`` +::: warning Info +In its current state, Local Differential Privacy is only supported for analyzes that return results with a single numeric value. +::: +There currently exists an alternate version of ``StarModel`` implementing a simplified local differential privacy (LocalDP) to enhance privacy during analysis: ``StarLocalDPModel``. +In order to utilize said version, simply replace the ``StarModel`` import and instantiation in the above example with ``StarLocalDPModel``. +During instantiation, one has to specify the parameters ``sensitivity`` and ``epsilon``, in addition to ``StarModel``'s normal parameters. +```python +from flame.star import StarLocalDPModel + +StarLocalDPModel( + ... + epsilon=1.0, # Privacy budget for differential privacy + sensitivity=1.0, # Sensitivity parameter for differential privacy + ... + ) +``` +Executing an analysis with ``StarLocalDPModel`` will add Laplace noise to the final results sent by the aggregator node to the Hub. +For this the given ``sensitivity`` is divided by ``epsilon`` to calculate the scale of the Laplace noise distribution. +For more information [see 'opendp' docs](https://docs.opendp.org/en/stable/api/python/opendp.measurements.html#opendp.measurements.make_laplace)). diff --git a/src/guide/user/cli-fastqc.md b/src/guide/user/coding_examples/cli-fastqc.md similarity index 95% rename from src/guide/user/cli-fastqc.md rename to src/guide/user/coding_examples/cli-fastqc.md index 93c94261..3cd05834 100644 --- a/src/guide/user/cli-fastqc.md +++ b/src/guide/user/coding_examples/cli-fastqc.md @@ -1,7 +1,7 @@ # Using CLI Tools for Federated FASTQ QC ::: warning Assumed Knowledge -This guide assumes you're already familiar with the concepts shown in the **VCF QC** tutorial (federated execution model, analyzer vs. aggregator roles, project / datastore setup, approvals). If not, read that first: see [VCF QC Guide](/guide/user/vcf-qc) plus the background docs on [Coding an Analysis](/guide/user/analysis-coding) and the [Core SDK](/guide/user/sdk-core-doc). +This guide assumes you're already familiar with the concepts shown in the **VCF QC** tutorial (federated execution model, analyzer vs. aggregator roles, project / datastore setup, approvals). If not, read that first: see [VCF QC Guide](/guide/user/coding_examples/vcf-qc) plus the background docs on [Coding an Analysis](/guide/user/analysis-coding) and the [Core SDK](/guide/user/sdk-core-doc). ::: ::: info Summary @@ -115,7 +115,7 @@ Example real output: | Node fails with no files | Check extensions & datastore mapping; maybe restrict keys incorrectly. | ## See Also -* [VCF QC Guide](/guide/user/vcf-qc) +* [VCF QC Guide](/guide/user/coding_examples/vcf-qc) * [Core SDK Reference](/guide/user/sdk-core-doc) * [Coding an Analysis](/guide/user/analysis-coding) * [Admin: Analysis Execution](/guide/admin/analysis-execution) diff --git a/src/guide/user/deep-learning-image-clasifier.md b/src/guide/user/coding_examples/deep-learning-image-classifier.md similarity index 96% rename from src/guide/user/deep-learning-image-clasifier.md rename to src/guide/user/coding_examples/deep-learning-image-classifier.md index 61d83535..94ae9ab0 100644 --- a/src/guide/user/deep-learning-image-clasifier.md +++ b/src/guide/user/coding_examples/deep-learning-image-classifier.md @@ -1,7 +1,7 @@ # Applying platform SDK and CLI to run a Deep Learning application -The more detailed guide to the deep learning showcase can be read [here](./Guide-showcase-deep-learning-image-classifier.pdf) +The more detailed guide to the deep learning showcase can be read [here](../Guide-showcase-deep-learning-image-classifier.pdf) ::: warning Assumed Knowledge This guide assumes you're already familiar with the basic concepts of federated learning. If not, read the background docs on [Coding an Analysis](/guide/user/analysis-coding) and the [Core SDK](/guide/user/sdk-core-doc). @@ -25,9 +25,9 @@ By the end of this tutorial you will learn how to use Star patterns, and how to ::: tip The reason why we use Python as the language of choice is that there is no better alternative for this kind of application due to its suitable ecosystem +::: - -## What does the analysis code? +## What does the analysis code do? Brief overview: * Analyzer runs network training for few specified number of epochs, then returns a dictionary with updated weights, loss value * The aggregator subclass computes federated average of the returned model weights, loss and its metrics received from analyzer node, and checks convergence criterion each round diff --git a/src/guide/user/coding_examples/differential-privacy-mvp.md b/src/guide/user/coding_examples/differential-privacy-mvp.md new file mode 100644 index 00000000..374dba53 --- /dev/null +++ b/src/guide/user/coding_examples/differential-privacy-mvp.md @@ -0,0 +1,108 @@ +# Analysis Coding with Local Differential Privacy +::: warning Info +This section demonstrates the use of Local Differential Privacy in distributed analysis. The example is designed to show how to enhance privacy protection while performing federated analysis across multiple nodes. +::: + +### Example Analysis using `StarLocalDPModel`: Counting Patients with Differential Privacy +This analysis example demonstrates how to count the total number of patients across multiple nodes with FHIR data, with differential privacy protections applied to the aggregated results. The patient counts from each node are summed and then noise is added to preserve privacy. + +```python +from flame.star import StarLocalDPModel, StarAnalyzer, StarAggregator + +# MyAnalyzer and MyAggregator classes remain unchanged from the introduction example + +def main(): + """ + Sets up and initiates the distributed analysis using the FLAME components. + + - Defines the custom analyzer and aggregator classes. + - Specifies the type of data and queries to execute. + - Configures analysis parameters like iteration behavior and output format. + - Applies differential privacy to protect the aggregated results. + """ + StarLocalDPModel( + analyzer=MyAnalyzer, # Custom analyzer class (must inherit from StarAnalyzer) + aggregator=MyAggregator, # Custom aggregator class (must inherit from StarAggregator) + data_type='fhir', # Type of data source ('fhir' or 's3') + query='Patient?_summary=count', # Query or list of queries to retrieve data + simple_analysis=True, # True for single-iteration; False for multi-iterative analysis + output_type='str', # Output format for the final result ('str', 'bytes', or 'pickle') + epsilon=1.0, # Privacy budget for differential privacy + sensitivity=1.0, # Sensitivity parameter for differential privacy + analyzer_kwargs=None, # Additional keyword arguments for the custom analyzer constructor (i.e. MyAnalyzer) + aggregator_kwargs=None # Additional keyword arguments for the custom aggregator constructor (i.e. MyAggregator) + ) + + +if __name__ == "__main__": + main() + +``` + +### Explanation +- **`main()`-function**: Instantiates the `StarLocalDPModel` class automatically executing the analysis on the node (either as an aggregator or analyzer node). +StarLocalDPModel extends the standard StarModel by incorporating Local Differential Privacy mechanisms to enhance privacy during federated analysis. + +This script serves as an example for performing privacy-preserving federated analysis using FHIR data with Local Differential Privacy. + +### Understanding Local Differential Privacy in `StarLocalDPModel` +::: warning Info +In its current state, Local Differential Privacy is only supported for analyzes that return results with a single numeric value. +::: +`StarLocalDPModel` is an enhanced version of `StarModel` that implements Local Differential Privacy (LocalDP) to strengthen privacy guarantees during distributed analysis. The key difference is the addition of calibrated noise to the final aggregated results before they are sent to the Hub. + +#### Key Parameters for Differential Privacy +When using `StarLocalDPModel`, two additional parameters must be specified during instantiation: +```python +StarLocalDPModel( + analyzer=MyAnalyzer, + aggregator=MyAggregator, + data_type='fhir', + query='Patient?_summary=count', + simple_analysis=True, + output_type='str', + epsilon=1.0, # Privacy budget for differential privacy + sensitivity=1.0, # Sensitivity parameter for differential privacy + analyzer_kwargs=None, + aggregator_kwargs=None +) +``` + +#### Privacy Parameters Explained +- **`epsilon`** (Privacy Budget): Controls the privacy-utility tradeoff. Lower values provide stronger privacy protection but add more noise to the results. Higher values provide more accurate results but weaker privacy guarantees. + - Typical values range from 0.1 (strong privacy) to 10.0 (weak privacy) + - In this example: `epsilon=1.0` provides a moderate level of privacy +- **`sensitivity`**: Represents the maximum amount that any single individual's data can change the analysis result. This is problem-specific and should be determined based on your analysis. + - For counting queries, sensitivity is typically 1.0 (one person can change the count by at most 1) + - In this example: `sensitivity=1.0` is appropriate for patient counting + +#### Output with Differential Privacy vs Without +- **Without Differential Privacy**: The final aggregated result is the exact sum of patient counts from all nodes. + - Example Output: `Total Patient Count: 118` +- **With Differential Privacy**: The final aggregated result includes added noise, making it an approximate count that protects individual privacy.` + - Example Output: `Total Patient Count (with DP): 119.1` + +#### How Noise is Applied + +Executing an analysis with `StarLocalDPModel` will add Laplace noise to the final results sent by the aggregator node to the Hub. The scale of the noise is calculated as: + +``` +noise_scale = sensitivity / epsilon +``` + +The Laplace distribution is then used to sample noise that is added to the aggregated result, ensuring differential privacy while maintaining statistical utility. + +For more information, see the [OpenDP documentation on Laplace mechanism](https://docs.opendp.org/en/stable/api/python/opendp.measurements.html#opendp.measurements.make_laplace). + +#### Benefits of Local Differential Privacy +- **Privacy Protection**: Even if an adversary has access to the final aggregated results, they cannot determine whether any specific individual's data was included in the analysis. +- **Quantifiable Privacy**: The epsilon parameter provides a mathematically rigorous measure of privacy loss. +- **Regulatory Compliance**: Helps meet privacy requirements in healthcare and other sensitive domains. +- **Trust**: Participants can be assured that their individual data cannot be reverse-engineered from the published results. + +#### Considerations When Using Differential Privacy +- **Accuracy vs. Privacy Tradeoff**: Lower epsilon values provide stronger privacy but reduce result accuracy. +- **Result Interpretation**: The added noise means results are approximate. Consider running sensitivity analyses with different epsilon values. +- **Single Numeric Results**: Currently, the implementation only supports single numeric outputs. Complex multi-dimensional results are not yet supported. +- **Sensitivity Calculation**: Properly calculating sensitivity is crucial for meaningful privacy guarantees. Underestimating sensitivity can compromise privacy; overestimating it adds unnecessary noise. + diff --git a/src/guide/user/coding_examples/federated-logistic-regression.md b/src/guide/user/coding_examples/federated-logistic-regression.md new file mode 100644 index 00000000..d8b2ca73 --- /dev/null +++ b/src/guide/user/coding_examples/federated-logistic-regression.md @@ -0,0 +1,488 @@ +# FLAME Pancreas Analysis Tutorial + +This guide provides a complete walkthrough of the pancreas analysis example, which demonstrates federated logistic regression using the STAR pattern. This tutorial explains how different components work together to train a machine learning model across distributed healthcare data without centralizing patient information. + +## 1. Overview + +This example implements **Federated Logistic Regression** for pancreas disease classification. In this scenario: +- Multiple hospitals (nodes) have local patient data with pancreas measurements +- Each hospital trains a logistic regression model on its local data +- A central aggregator combines the model updates using **Federated Averaging (FedAvg)** +- The process iterates until the global model converges +- **No patient data ever leaves the local hospitals** + + +### 1.1. Why Federated Learning for Healthcare? + +Healthcare data is: +- **Private**: Patient data must remain at the source institution +- **Distributed**: Different hospitals have different patient populations +- **Sensitive**: Regulatory requirements (HIPAA, GDPR) restrict data sharing + +Federated learning enables collaborative model training while respecting these constraints. + +### 1.2. Architecture + +``` +┌─────────────────┐ ┌─────────────────┐ +│ Hospital 1 │ │ Hospital 2 │ +│ │ │ │ +│ PancreasData │ │ PancreasData │ +│ ↓ │ │ ↓ │ +│ PancreasAnalyzer│ │ PancreasAnalyzer│ +│ ↓ │ │ ↓ │ +│ Local Model │ │ Local Model │ +│ Coefficients │ │ Coefficients │ +└────────┬────────┘ └────────┬────────┘ + │ │ + └────────────┬──────────────┘ + ↓ + ┌────────────────────────┐ + │ Aggregator Node │ + │ │ + │ FederatedLogistic │ + │ Regression │ + │ ↓ │ + │ Global Model │ + │ Parameters │ + └────────────────────────┘ + ↓ + (Iterate until convergence) +``` + +## 2. Code Walkthrough + +### 2.1. Imports and Setup + +```python +import pandas as pd +import numpy as np +from sklearn.linear_model import LogisticRegression +from io import BytesIO +from flame.star import StarAnalyzer, StarAggregator, StarModel +``` + +**What's happening:** +- **pandas**: Loads and processes CSV data +- **numpy**: Handles numerical operations (arrays, linear algebra) +- **LogisticRegression**: The machine learning model we're training +- **BytesIO**: Handles in-memory byte streams (data arrives as bytes) +- **flame.star**: FLAME framework components for federated learning + +### 2.2. The PancreasAnalyzer Class + +The analyzer runs at each hospital and performs local training. + +```python +class PancreasAnalyzer(StarAnalyzer): + """ + Local analyzer executed independently on each federated node. + Responsible for loading node-local data and computing model updates. + """ + + def __init__(self, flame): + super().__init__(flame) + + self.clf = LogisticRegression( + max_iter=1, # One optimization step per federated round + fit_intercept=False, # Intercept omitted for simplicity + warm_start=True # Enables parameter reuse across iterations + ) +``` + + +**Key Configuration Choices:** + +- **`max_iter=1`**: Each federated round does ONE gradient descent step + - Why? Because we want fine-grained synchronization between nodes + - Each hospital updates the model slightly, then syncs with others + +- **`fit_intercept=False`**: Simplifies the model (no bias term) + - Makes coefficient aggregation straightforward + - In production, you might want to include the intercept + +- **`warm_start=True`**: Critical for federated learning! + - Preserves model parameters between `.fit()` calls + - Each round starts from the aggregated global parameters + - Without this, the model would reset each time + + + +#### 2.2.1. The Analysis Method + +```python +def analysis_method(self, data, aggregator_results): + # Load local CSV data from byte stream + pancreas_df = pd.read_csv(BytesIO(data[0]['pancreasData.csv'])) + + # Split features and labels (last column assumed to be target) + data, labels = pancreas_df.iloc[:, :-1], pancreas_df.iloc[:, -1] + + # Initialize model coefficients with global parameters + self.clf.coef_ = aggregator_results + + # Perform one local fitting step + self.clf.fit(data, labels) + + # During the first iteration, no global parameters exist yet + if self.num_iterations == 0: + aggregator_results = self.clf.coef_.copy() + + # Return updated coefficients to the aggregator + return self.clf.coef_ +``` + +**Step-by-step breakdown:** + +1. **Data Loading**: + ```python + pancreas_df = pd.read_csv(BytesIO(data[0]['pancreasData.csv'])) + ``` + - `data[0]` is a dictionary: `{'pancreasData.csv': }` + - `BytesIO()` creates an in-memory file from bytes + - `pd.read_csv()` parses the CSV into a DataFrame + +2. **Feature-Label Split**: + ```python + data, labels = pancreas_df.iloc[:, :-1], pancreas_df.iloc[:, -1] + ``` + - All columns except the last are features (patient measurements) + - Last column is the label (disease classification: 0 or 1) + +3. **Initialize with Global Parameters**: + ```python + self.clf.coef_ = aggregator_results + ``` + - Sets the model's starting point to the global parameters + - This ensures all nodes start from the same synchronized state + - On first iteration, `aggregator_results` is `None` + +4. **Local Training**: + ```python + self.clf.fit(data, labels) + ``` + - Performs ONE step of gradient descent (remember `max_iter=1`) + - Updates coefficients based on local data + - The model improves slightly based on this hospital's patients + +5. **First Iteration Handling**: + ```python + if self.num_iterations == 0: + aggregator_results = self.clf.coef_.copy() + ``` + - On the first round, there's no global model yet + - Each hospital initializes its own coefficients + - These local initializations will be averaged to create the first global model + +6. **Return Local Update**: + ```python + return self.clf.coef_ + ``` + - Sends the updated coefficients to the aggregator + - These are numpy arrays with shape `(1, num_features)` + +### 2.3. The FederatedLogisticRegression Aggregator + +The aggregator combines updates from all hospitals. + +```python +class FederatedLogisticRegression(StarAggregator): + """ + Aggregator responsible for combining model updates + and checking convergence across federated rounds. + """ + + def __init__(self, flame): + super().__init__(flame) + self.max_iter = 10 # Maximum number of federated iterations +``` + +**Configuration:** +- `max_iter=10`: Safety limit to prevent infinite training +- Convergence might happen earlier (see `has_converged()`) + + +#### 2.3.1. The Aggregation Method + +```python +def aggregation_method(self, analysis_results): + # Stack coefficient arrays from all nodes + coefs = np.stack(analysis_results, axis=0) + + # Compute mean across nodes (Federated Averaging) + global_params_ = coefs.mean(axis=0) + + return global_params_ +``` + +**How Federated Averaging Works:** + +Imagine two hospitals: +- Hospital 1 has coefficients: `[0.5, 0.8, 0.2]` +- Hospital 2 has coefficients: `[0.3, 0.6, 0.4]` + +The aggregation computes: +``` +global_model = ([0.5, 0.8, 0.2] + [0.3, 0.6, 0.4]) / 2 + = [0.4, 0.7, 0.3] +``` + +This global model: +- Represents knowledge from both hospitals +- Doesn't favor any single institution +- Becomes the starting point for the next training round + +**Why This Works:** +- Linear models (like logistic regression) can be safely averaged +- The average of local optima approximates the global optimum +- More sophisticated aggregation schemes exist (weighted averaging, momentum, etc.) + +#### 2.3.2. The Convergence Check + +```python +def has_converged(self, result, last_result): + # exclude first iteration from convergence check, because last result is None + if last_result is None: + return False + # L2 norm of parameter difference + if np.linalg.norm(result - last_result, ord=2).item() <= 1e-8: + self.flame.flame_log( + "Delta error is smaller than the tolerance threshold", + log_type="info" + ) + return True + + # Stop if maximum number of iterations is reached + elif self.num_iterations > (self.max_iter - 1): + self.flame.flame_log( + f"Maximum number of {self.max_iter} iterations reached. " + "Returning current results.", + log_type="info" + ) + return True + + return False +``` + +**Convergence Criteria Explained:** + +1. **Parameter Stability**: + ```python + np.linalg.norm(result - last_result, ord=2) <= 1e-8 + ``` + - Computes the L2 (Euclidean) distance between consecutive models + - If the model parameters barely change, training has converged + - `1e-8` is a very small threshold (parameters differ by < 0.00000001) + +2. **Maximum Iterations**: + ```python + self.num_iterations > (self.max_iter - 1) + ``` + - Prevents infinite training loops + - After 10 rounds, stop regardless of convergence + - Protects against poorly-configured models + +**Why Two Criteria?** +- Best case: Model converges early (saves computation) +- Worst case: Model reaches max iterations (prevents hanging) + +### 2.4. StarModel Instantiation - Putting It All Together + +```python +def main(): + # Run federated training + StarModel( + PancreasAnalyzer, # Analyzer class + FederatedLogisticRegression, # Aggregator class + 's3', # Data source type + simple_analysis=False, # Multi-round analysis + output_type='pickle', # Output format + ) +``` + +**StarModel Configuration:** + +| Parameter | Value | Purpose | +|-----------|-------|---------| +| `PancreasAnalyzer` | Class | Local training logic | +| `FederatedLogisticRegression` | Class | Aggregation logic | +| `'s3'` | Data type | Treats data as S3-like objects | +| `simple_analysis=False` | Iterative | Enables multi-round training | +| `output_type='pickle'` | Format | Serializes the final model | + + +## 3. Training Flow Example + +Let's trace through two complete iterations: + +### 3.1. Iteration 0 (First Round) + +1. **Hospital 1 Analyzer**: + - Loads local pancreas data + - `aggregator_results` is `None` (first iteration) + - Initializes LogisticRegression and trains for 1 step + - Returns coefficients: `coef_1 = [0.12, 0.45, 0.33, ...]` + +2. **Hospital 2 Analyzer**: + - Loads local pancreas data + - `aggregator_results` is `None` + - Initializes LogisticRegression and trains for 1 step + - Returns coefficients: `coef_2 = [0.18, 0.52, 0.28, ...]` + +3. **Aggregator**: + - Receives `[coef_1, coef_2]` + - Computes average: `global_coef = (coef_1 + coef_2) / 2` + - Checks convergence: `last_result` is `None`, so continues + - Returns `global_coef` to all analyzers + +### 3.2. Iteration 1 (Second Round) + +1. **Hospital 1 Analyzer**: + - Loads local data again + - `aggregator_results = global_coef` (from iteration 0) + - Sets `self.clf.coef_ = global_coef` (warm start) + - Trains for 1 step (refines the global model on local data) + - Returns updated coefficients: `coef_1_new` + +2. **Hospital 2 Analyzer**: + - Loads local data again + - `aggregator_results = global_coef` + - Sets `self.clf.coef_ = global_coef` + - Trains for 1 step + - Returns updated coefficients: `coef_2_new` + +3. **Aggregator**: + - Receives `[coef_1_new, coef_2_new]` + - Computes new average: `global_coef_new` + - Checks convergence: + - Computes `||global_coef_new - global_coef||` + - If small enough, training stops + - Otherwise, continues to iteration 2 + +This process repeats until convergence or max iterations. + + + +## 4. Key Concepts Explained + +### 4.1. Warm Start vs. Cold Start + +Many machine learning model libraries reset their model's parameters on each `.fit()` call by default, a practice often called **Cold Start** (see Example). + +#### 4.1.1. Cold Start (warm_start=False): +``` +Round 1: Initialize → Train +Round 2: Initialize → Train (loses progress!) +Round 3: Initialize → Train +``` +*Here, each round starts from scratch, which is not useful for federated learning.* + +Most libraries, including `sklearn`, thereby provide a **Warm Start** option. This enables the manual application of model coefficients from previous iterations. + +#### 4.1.2. Warm Start (warm_start=True): +``` +Round 1: Initialize → Train +Round 2: Continue from Round 1 → Train +Round 3: Continue from Round 2 → Train +``` +*Each round builds on previous progress. Essential for federated learning.* + +### 4.2. Why max_iter=1? + +**Multiple iterations per round (max_iter=100)**: +- Each hospital trains independently for 100 steps +- Individual models diverge from each other more rapidly +- Aggregation less effective, slower convergence + +**Single iteration per round (max_iter=1)**: +- Each hospital takes one small step +- Frequent synchronization keeps models aligned +- Better convergence properties + +### 4.3. Data Privacy Guarantee + +##### Notice what NEVER leaves each hospital: + +❌ Raw patient data, able to be traced back to individuals + +##### What DOES get shared: + +✅ Model coefficients, not patient data (ex. numbers like `[0.4, 0.7, 0.3]`; coefficients: mathematical vector parameters used to distinguish arbitrary categories) + +## 5. Running the Example + +To run this example you need a project set up in FLAME with nodes that have access to pancreas data CSV files. +How to do that look at the [Submitting a Project Proposal](/guide/user/project) and [Starting an Analysis](/guide/user/analysis) + + +## 6. Troubleshooting + +### 6.1. Issue: "ValueError: This LogisticRegression instance is not fitted yet" + +**Cause**: The model's `coef_` attribute wasn't initialized properly. + +**Solution**: Ensure the first iteration handles None: +```python +if self.num_iterations == 0: + aggregator_results = self.clf.coef_.copy() +``` + +### 6.2. Issue: Training never converges + +**Cause**: Convergence threshold too strict or learning not effective. + +**Solutions**: +1. Increase tolerance: `1e-8` → `1e-5` +2. Reduce max_iter in analyzer: Forces smaller updates +3. Check data quality: Ensure all nodes have meaningful data + +### 6.3. Issue: "Shape mismatch" errors + +**Cause**: Different nodes have different numbers of features. + +**Solution**: Ensure all `pancreasData.csv` files have the same columns: +```python +# Validate data +pancreas_df = pd.read_csv(BytesIO(data[0]['pancreasData.csv'])) +expected_features = 8 # For example +assert pancreas_df.shape[1] == expected_features + 1 # +1 for label +``` + +### 6.4. Issue: Poor model performance + +**Possible causes:** +1. Insufficient iterations (increase `max_iter` in aggregator) +2. Unbalanced data (some nodes have very different distributions) +3. Model too simple (try more complex models) +4. Need feature engineering (normalize, add polynomial features) + +**Solutions:** +```python +# Normalize features +from sklearn.preprocessing import StandardScaler + +def analysis_method(self, data, aggregator_results): + pancreas_df = pd.read_csv(BytesIO(data[0]['pancreasData.csv'])) + X, y = pancreas_df.iloc[:, :-1], pancreas_df.iloc[:, -1] + + # Normalize + scaler = StandardScaler() + X = scaler.fit_transform(X) + + # Continue with training... +``` + + +## 7. Best Practices + +1. **Always use warm_start=True** for iterative federated learning +2. **Set max_iter=1** in the local model for fine-grained synchronization +3. **Handle the first iteration** where aggregator_results is None +4. **Include convergence checks** to prevent infinite loops +5. **Log important events** using `self.flame.flame_log()` +6. **Validate data shapes** to catch configuration errors early +7. **Test with 2-3 nodes first** before scaling up +8. **Save intermediate results** during development for debugging +9. **Utilize [FlameSDK's built-in testing environments](/guide/user/testing_examples/local-testing-logistic-regression-example)** to simulate and test your federated pipeline execution +10. **Integrate given class fields** (like ``self.num_iterations``) efficiently instead of creating new tracking variables with identical purpose + diff --git a/src/guide/user/federated-logistic-regression.md b/src/guide/user/coding_examples/fedstats-logistic-regression.md similarity index 96% rename from src/guide/user/federated-logistic-regression.md rename to src/guide/user/coding_examples/fedstats-logistic-regression.md index 0ca7afab..ab952a06 100644 --- a/src/guide/user/federated-logistic-regression.md +++ b/src/guide/user/coding_examples/fedstats-logistic-regression.md @@ -24,7 +24,7 @@ We use already implemented features from `fedstats` to iteratively update global      1.[*Nodes*] Set received estimates from aggregator as current.      2.[*Nodes*] Calculate, based on local data and current estimates all parts of the Fisher scoring algorithm and return them to aggregator.      3.[*Aggregator*] Set results from nodes. -     4.[A*ggregator*] Use the results to estimate a full score vector and Fisher information matrix and update coefficients of regression model. +     4.[*Aggregator*] Use the results to estimate a full score vector and Fisher information matrix and update coefficients of regression model.      5.[*Aggregator*] In the last round after convergence: return summary as final results. > [!NOTE] @@ -98,9 +98,9 @@ class FederatedLogisticRegression(StarAggregator): else: return self.glm.get_summary() - def has_converged(self, result, last_result, num_iterations): + def has_converged(self, result, last_result): if self._convergence_flag: - print(f"Converged after {num_iterations} iterations.") + print(f"Converged after {self.num_iterations} iterations.") return True convergence = self.glm.check_convergence(last_result[0], result[0], tol=1e-4) @@ -109,7 +109,7 @@ class FederatedLogisticRegression(StarAggregator): # the final result can be modified. Maybe there is a better solution in the future. self._convergence_flag = True return False # here, False is returned even though convergence is achieved to perform a final "redundant" round - elif num_iterations > 100: + elif self.num_iterations > 100: # TODO: Include option for max iteration and not hardcoded tol print( "Maximum number of 100 iterations reached. Returning current results." diff --git a/src/guide/user/coding_examples/gemtex-text-score-example.md b/src/guide/user/coding_examples/gemtex-text-score-example.md new file mode 100644 index 00000000..72d276f9 --- /dev/null +++ b/src/guide/user/coding_examples/gemtex-text-score-example.md @@ -0,0 +1,233 @@ +# Step-by-Step Guide for a minimal Text scoring example using FLAME +This script was developed to serve as a minimal example to parse text files (here: doctor's letters) and return arbitrary text statistics. +This script reads text data from S3, computes various readability metrics on each text, aggregates these metrics across nodes, and outputs the average readability scores. + + + +## Step 1: Imports and Setup + +At the top of `test_gemtex.py`, import necessary modules + +```python +from flame.star import StarModel, StarAnalyzer, StarAggregator +from readability import Readability +``` + +* `StarModel`, `StarAnalyzer`, `StarAggregator`: FLAME SDK components. +* `Readability`: Library for computing readability scores. This library is only part of the master image for GemTeX not a default for other master images. + +## Step 2: Implement the Analyzer + +The `MyAnalyzer` class inherits from `StarAnalyzer` and has to overwrite the abstractmethod `analysis_method`: + +```python +class MyAnalyzer(StarAnalyzer): + def __init__(self, flame): + super().__init__(flame) +``` + +* **`analysis_method(self, data, aggregator_results)`**: + + * Receives `data`: a list where each element is a dictionary mapping S3 object keys to raw bytes. depending on the project your data will be given to this method. + * Decodes each bytes value to UTF-8 text. + * Creates a `Readability` object to compute seven metrics: + + * Flesch–Kincaid grade level + * Flesch reading ease + * Gunning fog index + * Coleman–Liau index + * Dale–Chall score + * Automated Readability Index (ARI) + * Linsear Write formula + * Collects these metrics for each text into `scores`. + * Calculates `avg_scores_node`: the average of each metric across all texts on that node. + * Returns a list of 7 average scores. + +````python +def analysis_method(self, data, aggregator_results): + scores = [] + for text in data[0].values(): + text = text.decode("utf-8") + r = Readability(text) + + a1 = r.flesch_kincaid().score + a2 = r.flesch().score + a3 = r.gunning_fog().score + a4 = r.coleman_liau().score + a5 = r.dale_chall().score + a6 = r.ari().score + a7 = r.linsear_write().score + + scores.append([a1, a2, a3, a4, a5, a6, a7]) + + avg_scores_node = [sum(group) / len(group) for group in zip(*scores)] + return avg_scores_node +```` + +## Step 3: Implement the Aggregator + +The `MyAggregator` class inherits from `StarAggregator` and has to overwrite the abstractmethod `aggregation_method`: + +```python +class MyAggregator(StarAggregator): + def __init__(self, flame): + super().__init__(flame) +```` + +* **`aggregation_method(self, analysis_results)`**: + + * Receives `analysis_results`: a list of lists, where each inner list is the output of one node's `analysis_method`. + * Computes `avg_scores_global`: the average of each metric across all nodes. + * Returns a list of 7 globally averaged scores. +* **`has_converged(self, result, last_result)`**: + + * Always returns `True`, so the model runs only one iteration. + +````python +def aggregation_method(self, analysis_results): + avg_scores_global = [sum(group) / len(group) for group in zip(*analysis_results)] + return avg_scores_global + + +def has_converged(self, result, last_result): + return True +```` +## Step 4: Configure and Run `StarModel` + +The `main` function sets up and runs the model: + +```python +def main(): + StarModel( + analyzer=MyAnalyzer, + aggregator=MyAggregator, + data_type='s3', + simple_analysis=True, + output_type='str', + analyzer_kwargs=None, + aggregator_kwargs=None + ) + +if __name__ == "__main__": + main() + +```` +- **`data_type`:** `'s3'` tells FLAME to fetch data from S3. +- **`simple_analysis`:** `True` for a single iteration. +- **`output_type`:** `'str'` specifies the format of the returned result. + + +The script will: + +1. Fetch all text files from configured S3 sources. +2. Compute readability metrics on each node. +3. Aggregate results and print a global list of seven average scores. + +## Together the files look like this: + +```python + +from flame.star import StarModel, StarAnalyzer, StarAggregator +from readability import Readability + + +class MyAnalyzer(StarAnalyzer): + def __init__(self, flame): + """ + Initializes the custom Analyzer node. + + :param flame: Instance of FlameCoreSDK to interact with the FLAME components. + """ + super().__init__(flame) # Connects this analyzer to the FLAME components + + def analysis_method(self, data, aggregator_results): + """ + Performs analysis on the retrieved data from data sources. + + :param data: A list of dictionaries containing the data from each data source. + - Each dictionary corresponds to a data source. + - Keys are the queries executed, and values are the results (dict for FHIR, str for S3). + + + :param aggregator_results: Results from the aggregator in previous iterations. + - None in the first iteration. + - Contains the result from the aggregator's aggregation_method in subsequent iterations. + :return: Any result of your analysis on one node (ex. patient count). + """ + scores = [] + + for text in data[0].values(): + text = text.decode("utf-8") + r = Readability(text) + + a1 = r.flesch_kincaid().score + a2 = r.flesch().score + a3 = r.gunning_fog().score + a4 = r.coleman_liau().score + a5 = r.dale_chall().score + a6 = r.ari().score + a7 = r.linsear_write().score + + scores.append([a1, a2, a3, a4, a5, a6, a7]) + + avg_scores_node = [sum(group) / len(group) for group in zip(*scores)] + + return avg_scores_node + + +class MyAggregator(StarAggregator): + def __init__(self, flame): + """ + Initializes the custom Aggregator node. + + :param flame: Instance of FlameCoreSDK to interact with the FLAME components. + """ + super().__init__(flame) # Connects this aggregator to the FLAME components + + def aggregation_method(self, analysis_results): + """ + Aggregates the results received from all analyzer nodes. + + :param analysis_results: A list of analysis results from each analyzer node. + :return: The aggregated result (e.g., total patient count across all analyzers). + """ + # averaging individual scores across all nodes + avg_scores_global = [sum(group) / len(group) for group in zip(*analysis_results)] + + return avg_scores_global + + def has_converged(self, result, last_result): + """ + Determines if the aggregation process has converged. + + :param result: The current aggregated result. + :param last_result: The aggregated result from the previous iteration. + :return: True if the aggregation has converged; False to continue iterations. + """ + return True # Return True to indicate convergence in this simple analysis + + +def main(): + """ + Sets up and initiates the distributed analysis using the FLAME components. + + - Defines the custom analyzer and aggregator classes. + - Specifies the type of data and queries to execute. + - Configures analysis parameters like iteration behavior and output format. + """ + StarModel( + analyzer=MyAnalyzer, # Custom analyzer class (must inherit from StarAnalyzer) + aggregator=MyAggregator, # Custom aggregator class (must inherit from StarAggregator) + data_type='s3', # Type of data source ('fhir' or 's3') + # query='Patient?_summary=count', # Query or list of queries to retrieve data + simple_analysis=True, # True for single-iteration; False for multi-iterative analysis + output_type='str', # Output format for the final result ('str', 'bytes', or 'pickle') + analyzer_kwargs=None, # Additional keyword arguments for the custom analyzer constructor (i.e. MyAnalyzer) + aggregator_kwargs=None # Additional keyword arguments for the custom aggregator constructor (i.e. MyAggregator) + ) + + +if __name__ == "__main__": + main() + +```` diff --git a/src/guide/user/record_linkage.md b/src/guide/user/coding_examples/record_linkage.md similarity index 98% rename from src/guide/user/record_linkage.md rename to src/guide/user/coding_examples/record_linkage.md index d6b17d09..b9dbf7e7 100644 --- a/src/guide/user/record_linkage.md +++ b/src/guide/user/coding_examples/record_linkage.md @@ -227,8 +227,8 @@ class RLAggregator(StarAggregator): ... return node_results - def has_converged(self, result, last_result, num_iterations): - if num_iterations >= 2: # iterative federation + def has_converged(self, result, last_result): + if self.num_iterations >= 2: # iterative federation return True return False ``` @@ -261,4 +261,4 @@ The structure summarizes the number of matching records across nodes. For docume - [Coding an Analysis](/guide/user/analysis-coding) - [Admin: Bucket Setup](/guide/admin/bucket-setup-for-data-store) - [Admin: Analysis Execution](/guide/admin/analysis-execution) -- [Survival Regression Example](/guide/user/survival-regression) +- [Survival Regression Example](/guide/user/coding_examples/survival-regression) diff --git a/src/guide/user/survival-regression.md b/src/guide/user/coding_examples/survival-regression.md similarity index 98% rename from src/guide/user/survival-regression.md rename to src/guide/user/coding_examples/survival-regression.md index 6d1dc360..415e3161 100644 --- a/src/guide/user/survival-regression.md +++ b/src/guide/user/coding_examples/survival-regression.md @@ -81,7 +81,7 @@ class ResultsAggregator(StarAggregator): return pd.concat((res_full_data, res_aggregated)) - def has_converged(self, result, last_result, num_iterations): + def has_converged(self, result, last_result): return True # Return True as we only have one round diff --git a/src/guide/user/vcf-qc.md b/src/guide/user/coding_examples/vcf-qc.md similarity index 99% rename from src/guide/user/vcf-qc.md rename to src/guide/user/coding_examples/vcf-qc.md index c1e43d73..38d8c6d2 100644 --- a/src/guide/user/vcf-qc.md +++ b/src/guide/user/coding_examples/vcf-qc.md @@ -217,7 +217,7 @@ class VCFAggregator(StarAggregator): return json.dumps(result) - def has_converged(self, result, last_result, num_iterations): + def has_converged(self, result, last_result): return True # Single-round analysis (no iterative federation needed) ``` @@ -301,7 +301,7 @@ Fatal messages include things like `FATAL: Empty file`, `FATAL: Zero variants`, - [Coding an Analysis](/guide/user/analysis-coding) - [Admin: Bucket Setup](/guide/admin/bucket-setup-for-data-store) - [Admin: Analysis Execution](/guide/admin/analysis-execution) -- [Survival Regression Example](/guide/user/survival-regression) +- [Survival Regression Example](/guide/user/coding_examples/survival-regression) ---- diff --git a/src/guide/user/index.md b/src/guide/user/index.md index ef3ff9ba..2b476338 100644 --- a/src/guide/user/index.md +++ b/src/guide/user/index.md @@ -27,8 +27,20 @@ integrity of the original data. **`Concepts/Tutorials`** - [Coding an Analysis](/guide/user/analysis-coding) -- [Basic VCF Quality Control using Python](/guide/user/vcf-qc) -- [Using CLI Tools for Federated FASTQ QC](/guide/user/cli-fastqc) + - Example Analyses + - [Aggregation with fedstats](/guide/user/coding_examples/survival-regression) + - [Basic VCF Quality Control using Python](/guide/user/coding_examples/vcf-qc) + - [Using CLI Tools for Federated FASTQ QC](/guide/user/coding_examples/cli-fastqc) + - [Image classification using deep learning approaches](/guide/user/coding_examples/deep-learning-image-classifier) + - [Utilizing Differential Privacy for Privacy Enhancement](/guide/user/coding_examples/differential-privacy-mvp) + - [Training a Federated Generalized Linear Model (GLM) from the Fedstat library](/guide/user/coding_examples/fedstats-logistic-regression) + - [Federated Logistic Regression classifier for Pancreatic Cancer Data](/guide/user/coding_examples/federated-logistic-regression) + - [Analysing text-formatted clinical data with GeMTeX](/guide/user/coding_examples/gemtex-text-score-example) + - [Connecting patient records across clinical sites with Privacy-Preserving Record Linkage (PPRL)](/guide/user/coding_examples/record_linkage) +- [Local Analysis Testing](/guide/user/local-testing) + - Local Testing Examples + - [Federated Logistic Regression classifier for Pancreatic Cancer Data](/guide/user/testing_examples/local-testing-logistic-regression-example) + - [Utilizing Differential Privacy for Privacy Enhancement](/guide/user/testing_examples/local-testing-dp-example) - [FHIR Queries](/guide/user/fhir-query) **`Using the Hub`** diff --git a/src/guide/user/local-testing.md b/src/guide/user/local-testing.md new file mode 100644 index 00000000..3ab7c3d7 --- /dev/null +++ b/src/guide/user/local-testing.md @@ -0,0 +1,469 @@ +# Star Pattern Local Testing Guide + +This guide explains how to use the `StarModelTester` for local testing of federated learning algorithms using the STAR pattern before deploying to the Flame platform. + +## Overview + +The STAR (Secure Training And Aggregation for Research) pattern is a federated learning architecture where: +- **Analyzer nodes** process local data independently +- **Aggregator node** combines results from all analyzers +- The process can iterate until convergence criteria are met + +The `StarModelTester` allows you to simulate this distributed architecture locally with test data, making it easy to develop and debug your federated learning algorithms. + +## Quick Start + +### Basic Example + +```python +from flame.star import StarModelTester, StarAnalyzer, StarAggregator + +# Define your custom analyzer +class MyAnalyzer(StarAnalyzer): + def __init__(self, flame): + super().__init__(flame) + + def analysis_method(self, data, aggregator_results): + # Implement your analysis logic + analysis_result = sum(data) / len(data) + return analysis_result + +# Define your custom aggregator +class MyAggregator(StarAggregator): + def __init__(self, flame): + super().__init__(flame) + + def aggregation_method(self, analysis_results): + # Implement your aggregation logic + result = sum(analysis_results) / len(analysis_results) + return result + + def has_converged(self, result, last_result): + # Define convergence criteria + return self.num_iterations >= 5 + +# Test with local data +data_1 = [1, 2, 3, 4] +data_2 = [5, 6, 7, 8] +data_splits = [data_1, data_2] + +StarModelTester( + data_splits=data_splits, + analyzer=MyAnalyzer, + aggregator=MyAggregator, + data_type='s3', + simple_analysis=False +) +``` + +## Components + +### 1. StarAnalyzer + +The analyzer processes data at each node. You must implement the `analysis_method`. + +```python +class StarAnalyzer: + def analysis_method(self, data, aggregator_results): + """ + Analyze local data, optionally incorporating previous aggregation results. + + Args: + data: Local data for this node + aggregator_results: Results from the previous aggregation iteration + (None for the first iteration) + + Returns: + Analysis result (any type) + """ + pass +``` + +**Key Points:** +- `data`: Your local data fragment +- `aggregator_results`: Previous round's aggregated result (None on first iteration) +- Return value will be sent to the aggregator +- Use `self.flame.flame_log()` for logging + +### 2. StarAggregator + +The aggregator combines results from all analyzer nodes. You must implement two methods: + +```python +class StarAggregator: + def aggregation_method(self, analysis_results): + """ + Aggregate results from all analyzer nodes. + + Args: + analysis_results: List of results from all analyzers + + Returns: + Aggregated result (any type) + """ + pass + + def has_converged(self, result, last_result): + """ + Determine if the iterative process should stop. + + Args: + result: Current aggregation result + last_result: Previous aggregation result (None for first iteration) + + Returns: + bool: True if converged, False otherwise + """ + pass +``` + +**Key Points:** +- `analysis_results`: List containing results from all analyzer nodes +- `has_converged()`: Controls iterative training (only used when `simple_analysis=False`) +- Access `self.num_iterations` to track iteration count +- Use `self.flame.flame_log()` for logging + +### 3. StarModelTester + +The testing harness that simulates the distributed environment locally. + +```python +StarModelTester( + data_splits, # List of data fragments (one per analyzer node) + analyzer, # Your StarAnalyzer subclass (not an instance) + aggregator, # Your StarAggregator subclass (not an instance) + data_type, # 'fhir' or 's3' + query=None, # Optional: Query string or list for FHIR + simple_analysis=True, # False for iterative training + output_type='str', # 'str', 'bytes', or 'pickle' + analyzer_kwargs=None, # Optional: Additional kwargs for analyzer + aggregator_kwargs=None, # Optional: Additional kwargs for aggregator + epsilon=None, # Optional: For differential privacy + sensitivity=None, # Optional: For differential privacy + result_filepath=None # Optional: Save results to file +) +``` + +## Parameters Explained + +### Required Parameters + +- **`data_splits`** (list): List of data fragments, one for each analyzer node + ```python + data_splits = [node1_data, node2_data, node3_data] + ``` + +- **`analyzer`** (Type[StarAnalyzer]): Your custom analyzer class (not an instance) + ```python + analyzer=MyAnalyzer # NOT MyAnalyzer() + ``` + +- **`aggregator`** (Type[StarAggregator]): Your custom aggregator class (not an instance) + ```python + aggregator=MyAggregator # NOT MyAggregator() + ``` + +- **`data_type`** (Literal['fhir', 's3']): Type of data source + - `'s3'`: Direct data objects + - `'fhir'`: FHIR healthcare data format + +### Optional Parameters + +- **`query`** (str | list[str]): Query for FHIR data sources, does not change anything on your local data (for local testing only a placeholder) + ```python + query="Patient?_count=100" + ``` + +- **`simple_analysis`** (bool, default=True): + - `True`: Single iteration (analyze → aggregate → done) + - `False`: Iterative training until convergence + +- **`output_type`** (Literal['str', 'bytes', 'pickle'], default='str'): + - `'str'`: Convert result to string + - `'bytes'`: Raw bytes output + - `'pickle'`: Serialize with pickle + +- **`analyzer_kwargs`** (dict): Additional keyword arguments passed to analyzer constructor + ```python + analyzer_kwargs={'learning_rate': 0.01, 'epochs': 10} + ``` + +- **`aggregator_kwargs`** (dict): Additional keyword arguments passed to aggregator constructor + ```python + aggregator_kwargs={'threshold': 0.001} + ``` + +- **`epsilon`** (float): Privacy budget for differential privacy +- **`sensitivity`** (float): Sensitivity parameter for differential privacy +- **`result_filepath`** (str): Path to save final results + ```python + result_filepath='results/model_output.pkl' + ``` + +## Usage Patterns + +### Pattern 1: Simple Analysis (Single Iteration) + +Use when you only need one round of analysis and aggregation: + +```python +StarModelTester( + data_splits=[data1, data2, data3], + analyzer=MyAnalyzer, + aggregator=MyAggregator, + data_type='s3', + simple_analysis=True # Default +) +``` + +**Flow:** +1. Each analyzer processes its local data +2. Aggregator combines all results +3. Process completes + +### Pattern 2: Iterative Training (Federated Learning) + +Use for iterative algorithms like federated SGD: + +```python +class MyAggregator(StarAggregator): + def has_converged(self, result, last_result): + if last_result is None: + return False + + # Converge when change is small + delta = abs(result - last_result) + return delta < 0.001 or self.num_iterations >= 100 + +StarModelTester( + data_splits=[data1, data2, data3], + analyzer=MyAnalyzer, + aggregator=MyAggregator, + data_type='s3', + simple_analysis=False # Enable iterative mode +) +``` + +**Flow:** +1. Each analyzer processes local data +2. Aggregator combines results +3. Check convergence +4. If not converged, send results back to analyzers +5. Repeat from step 1 + +### Pattern 3: With Differential Privacy + +Add privacy guarantees to your federated learning: + +```python +StarModelTester( + data_splits=[data1, data2, data3], + analyzer=MyAnalyzer, + aggregator=MyAggregator, + data_type='s3', + simple_analysis=False, + epsilon=1.0, # Privacy budget + sensitivity=1.0 # Sensitivity of your computation +) +``` + +### Pattern 4: Saving Results + +Save final results to a file: + +```python +StarModelTester( + data_splits=[data1, data2], + analyzer=MyAnalyzer, + aggregator=MyAggregator, + data_type='s3', + output_type='pickle', + result_filepath='results/trained_model.pkl' +) +``` + +## Complete Working Example + +Here's a complete example implementing federated averaging for numeric data: + +```python +from typing import Any, Optional +from flame.star import StarModelTester, StarAnalyzer, StarAggregator + + +class MyAnalyzer(StarAnalyzer): + """Analyzer that computes local statistics and incorporates global feedback.""" + + def __init__(self, flame): + super().__init__(flame) + + def analysis_method(self, data, aggregator_results): + """Compute local average, adjusted by previous global average.""" + self.flame.flame_log( + f"\tAggregator results in MyAnalyzer: {aggregator_results}", + log_type='debug' + ) + + # Compute local average + local_avg = sum(data) / len(data) + + # Adjust with global feedback if available + if aggregator_results is None: + analysis_result = local_avg + else: + # Move towards global average + analysis_result = (local_avg + aggregator_results) / 2 + 0.5 + + self.flame.flame_log( + f"MyAnalysis result ({self.id}): {analysis_result}", + log_type='notice' + ) + return analysis_result + + +class MyAggregator(StarAggregator): + """Aggregator that computes global average and checks convergence.""" + + def __init__(self, flame): + super().__init__(flame) + + def aggregation_method(self, analysis_results: list[Any]) -> Any: + """Compute average of all analyzer results.""" + self.flame.flame_log( + f"\tAnalysis results in MyAggregator: {analysis_results}", + log_type='notice' + ) + + result = sum(analysis_results) / len(analysis_results) + + self.flame.flame_log( + f"MyAggregator result ({self.id}): {result}", + log_type='notice' + ) + return result + + def has_converged(self, result: Any, last_result: Optional[Any]) -> bool: + """Check if training should stop.""" + self.flame.flame_log( + f"\tLast result: {last_result}, Current result: {result}", + log_type="notice" + ) + self.flame.flame_log( + f"\tChecking convergence at iteration {self.num_iterations}", + log_type="notice" + ) + + # Stop after 5 iterations for this example + return self.num_iterations >= 5 + + +if __name__ == "__main__": + # Prepare test data (simulating data from different nodes) + data_1 = [1, 2, 3, 4] + data_2 = [5, 6, 7, 8] + data_splits = [data_1, data_2] + + # Run the test + StarModelTester( + data_splits=data_splits, + analyzer=MyAnalyzer, + aggregator=MyAggregator, + data_type='s3', + simple_analysis=False + ) +``` + +## Logging and Debugging + +Use the flame logging system for debugging: + +```python +# In your analyzer or aggregator +self.flame.flame_log("Debug message", log_type='debug') # Detailed info +self.flame.flame_log("Notice message", log_type='notice') # Important info +``` + +### Useful Properties + +Access these properties in your analyzer or aggregator: + +- `self.id`: Node identifier +- `self.num_iterations`: Current iteration count +- `self.latest_result`: Result from previous iteration +- `self.role`: 'default' (analyzer) or 'aggregator' + +## Best Practices + +1. **Start Simple**: Test with `simple_analysis=True` first to verify your logic +2. **Add Logging**: Use `flame_log()` to track data flow and debug issues +3. **Test Data Splits**: Ensure each data split is representative +4. **Convergence Criteria**: Set reasonable convergence conditions to avoid infinite loops +5. **Type Hints**: Use type hints for better code documentation +6. **Handle None**: Always check if `aggregator_results` or `last_result` is None + +## Common Pitfalls + +❌ **Don't instantiate classes:** +```python +StarModelTester(..., analyzer=MyAnalyzer(), ...) # Wrong! +``` + +✅ **Pass the class itself:** +```python +StarModelTester(..., analyzer=MyAnalyzer, ...) # Correct! +``` + +❌ **Don't forget to call super().__init__():** +```python +class MyAnalyzer(StarAnalyzer): + def __init__(self, flame): + # Missing super().__init__(flame) + pass +``` + +✅ **Always call parent constructor:** +```python +class MyAnalyzer(StarAnalyzer): + def __init__(self, flame): + super().__init__(flame) # Correct! +``` + +## Next Steps + +After testing locally with `StarModelTester`: + +1. Deploy your analyzer and aggregator to the Flame platform +2. Use `StarModel` instead of `StarModelTester` for production +3. Configure real data sources (FHIR servers, S3 buckets) +4. Set up proper node authentication and security + +## Additional Resources + +- Main STAR model implementation: `flame/star/star_model.py` +- Differential privacy variant: `flame/star/star_localdp/` +- More examples: `examples/` directory + +## Troubleshooting + +**Issue**: Infinite loop in iterative mode +- **Solution**: Check your `has_converged()` implementation. Add a maximum iteration limit: + ```python + return self.num_iterations >= MAX_ITERATIONS + ``` + +**Issue**: Type errors with aggregator_results +- **Solution**: Always handle the None case: + ```python + if aggregator_results is None: + # First iteration logic + else: + # Subsequent iteration logic + ``` + +**Issue**: Data not being passed correctly +- **Solution**: Verify `data_type` matches your data format ('fhir' or 's3') + +--- + +For more information, see the main repository README or contact the Flame development team. + diff --git a/src/guide/user/testing_examples/local-testing-dp-example.md b/src/guide/user/testing_examples/local-testing-dp-example.md new file mode 100644 index 00000000..93c6479d --- /dev/null +++ b/src/guide/user/testing_examples/local-testing-dp-example.md @@ -0,0 +1,452 @@ +# Local Testing in FLAME: Star Pattern with Differential Privacy + +::: info +For an explanation of differential privacy concepts and the Star pattern, please refer to the [Star Pattern Testing Guide](/guide/user/local-testing) and [Differential Privacy Documentation](/guide/user/coding_examples/differential-privacy-mvp). +::: + +## 1. Initializing Local Testing with Differential Privacy + +This example demonstrates how to test federated learning with **Local Differential Privacy** locally before deployment. Unlike regular Star pattern testing, this adds calibrated noise to aggregated results to prevent reverse-engineering of individual node contributions. + +```python +from typing import Any, Optional +from flame.star import StarModelTester, StarAnalyzer, StarAggregator + + +class MyAnalyzer(StarAnalyzer): + def __init__(self, flame): + super().__init__(flame) + + def analysis_method(self, data, aggregator_results): + self.flame.flame_log(f"\tAggregator results in MyAnalyzer: {aggregator_results}", log_type='debug') + analysis_result = sum(data) / len(data) \ + if aggregator_results is None \ + else (sum(data) / len(data) + aggregator_results) + 1 / 2 + self.flame.flame_log(f"MyAnalysis result ({self.id}): {analysis_result}", log_type='notice') + return analysis_result + + +class MyAggregator(StarAggregator): + def __init__(self, flame): + super().__init__(flame) + + def aggregation_method(self, analysis_results: list[Any]) -> Any: + self.flame.flame_log(f"\tAnalysis results in MyAggregator: {analysis_results}", log_type='notice') + result = sum(analysis_results) / len(analysis_results) + self.flame.flame_log(f"MyAggregator result ({self.id}): {result}", log_type='notice') + return result + + def has_converged(self, result: Any, last_result: Optional[Any]) -> bool: + self.flame.flame_log(f"\tLast result: {last_result}, Current result: {result}", log_type="notice") + self.flame.flame_log(f"\tChecking convergence at iteration {self.num_iterations}", log_type="notice") + return self.num_iterations >= 5 # Limit to 5 iterations for testing + + +if __name__ == "__main__": + data_1 = [1, 2, 3, 4] + data_2 = [5, 6, 7, 8] + data_splits = [data_1, data_2] + + StarModelTester( + data_splits=data_splits, # List of node-local datasets + analyzer=MyAnalyzer, # Custom Analyzer class + aggregator=MyAggregator, # Custom Aggregator class + data_type='s3', # Data source type + simple_analysis=False, # Enable multi-iteration training + epsilon=1, # Privacy budget + sensitivity=1 # Sensitivity parameter (10**0 = 1) + ) +``` + +### 1.1. Data Preparation: + +```python +data_1 = [1, 2, 3, 4] +data_2 = [5, 6, 7, 8] +data_splits = [data_1, data_2] +``` + +This creates a simple data structure for testing: +- **`data_1`**: Simulates data from Node 1 (e.g., Hospital A) +- **`data_2`**: Simulates data from Node 2 (e.g., Hospital B) +- **`data_splits`**: List where each element represents one node's local data + +**Why this structure?** +- Mimics federated deployments where each node has isolated data +- Simple numeric lists make it easy to verify differential privacy noise +- Each node processes data independently before aggregation + +### 1.2. StarModelTester Configuration with Differential Privacy: + +| Parameter | Value | Purpose | +|-----------|-------|---------| +| `data_splits` | `[data_1, data_2]` | Two nodes with local data | +| `MyAnalyzer` | Class | Local analysis logic at each node | +| `MyAggregator` | Class | Global aggregation logic | +| `'s3'` | Data type | Treats data as direct objects | +| `simple_analysis=False` | Iterative | Enables multi-round federated learning | +| `epsilon=1` | Privacy budget | Moderate privacy protection | +| `sensitivity=1` | Sensitivity | Max individual contribution (10**0 = 1) | + +**Key Difference from Regular Star Pattern:** +- **`epsilon`** and **`sensitivity`** parameters enable differential privacy +- Laplace noise is automatically added to aggregated results +- Noise scale = `sensitivity / epsilon = 1 / 1 = 1.0` + +## 2. Understanding the Analysis Logic + +### 2.1. MyAnalyzer Class + +```python +def analysis_method(self, data, aggregator_results): + analysis_result = sum(data) / len(data) \ + if aggregator_results is None \ + else (sum(data) / len(data) + aggregator_results) + 1 / 2 + return analysis_result +``` + +**Iteration 0 (First Round):** +- `aggregator_results` is `None` +- Node 1: `sum([1,2,3,4]) / 4 = 2.5` +- Node 2: `sum([5,6,7,8]) / 4 = 6.5` + +**Subsequent Iterations:** +- Uses feedback from previous aggregation +- Blends local average with global result +- Formula: `(local_avg + global_result) + 0.5` + +**Example for Node 1 in Iteration 1:** +```python +local_avg = 2.5 +global_result = 4.5 (from Iteration 0, with noise) +analysis_result = (2.5 + 4.5) + 0.5 = 7.5 +``` + +### 2.2. MyAggregator Class + +```python +def aggregation_method(self, analysis_results): + result = sum(analysis_results) / len(analysis_results) + return result +``` + +**Iteration 0 Example:** +- Receives: `[2.5, 6.5]` from both analyzers +- Computes: `(2.5 + 6.5) / 2 = 4.5` (true result) +- **DP Noise Added**: Result becomes `4.5 + noise` (e.g., `4.523891`) +- Returns noisy result to nodes + +**How Differential Privacy Works:** +1. Aggregator computes true result: `4.5` +2. System samples noise: `Laplace(scale=1.0)` → e.g., `+0.024` +3. Noisy result sent to Hub and back to analyzers: `4.524` + +### 2.3. Convergence Logic + +```python +def has_converged(self, result, last_result): + return self.num_iterations >= 5 +``` + +**Simple iteration-based stopping:** +- Stops after 5 iterations +- More sophisticated approaches could check: + - Change between iterations: `abs(result - last_result) < threshold` + - But must account for DP noise in threshold! + +## 3. Running the Example + +### 3.1. Prerequisites + +1. **Project structure**: + ``` + test/ + └── test_star_pattern_dp.py + ``` + +2. **Dependencies**: + - `flame` package with Star pattern support + - `opendp` library (for Laplace noise generation) + +### 3.2. Running the Test + +```bash +cd test/ +python test_star_pattern_dp.py +``` + +### 3.3. Expected Output + +``` +--- Starting Iteration 0 --- +Analyzer node_0 started + Data extracted: [1, 2, 3, 4] + Aggregator results in MyAnalyzer: None +MyAnalysis result (node_0): 2.5 +Analyzer node_1 started + Data extracted: [5, 6, 7, 8] + Aggregator results in MyAnalyzer: None +MyAnalysis result (node_1): 6.5 +Aggregator started + Analysis results in MyAggregator: [2.5, 6.5] +MyAggregator result (node_2): 4.5 + Last result: None, Current result: 4.5 + Checking convergence at iteration 0 +Aggregated results: 4.5 +--- Ending Iteration 0 --- + +--- Starting Iteration 1 --- +Analyzer node_0 started + Data extracted: [1, 2, 3, 4] + Aggregator results in MyAnalyzer: 4.5 +MyAnalysis result (node_0): 7.5 +Analyzer node_1 started + Data extracted: [5, 6, 7, 8] + Aggregator results in MyAnalyzer: 4.5 +MyAnalysis result (node_1): 11.5 +Aggregator started + Analysis results in MyAggregator: [7.5, 11.5] +MyAggregator result (node_2): 9.5 + Last result: 4.5, Current result: 9.5 + Checking convergence at iteration 1 +Aggregated results: 9.5 + Last result: 4.5, Current result: 9.5 + Checking convergence at iteration 1 +--- Ending Iteration 1 --- + +--- Starting Iteration 2 --- +Analyzer node_0 started + Data extracted: [1, 2, 3, 4] + Aggregator results in MyAnalyzer: 9.5 +MyAnalysis result (node_0): 12.5 +Analyzer node_1 started + Data extracted: [5, 6, 7, 8] + Aggregator results in MyAnalyzer: 9.5 +MyAnalysis result (node_1): 16.5 +Aggregator started + Analysis results in MyAggregator: [12.5, 16.5] +MyAggregator result (node_2): 14.5 + Last result: 9.5, Current result: 14.5 + Checking convergence at iteration 2 +Aggregated results: 14.5 + Last result: 9.5, Current result: 14.5 + Checking convergence at iteration 2 +--- Ending Iteration 2 --- + +--- Starting Iteration 3 --- +Analyzer node_0 started + Data extracted: [1, 2, 3, 4] + Aggregator results in MyAnalyzer: 14.5 +MyAnalysis result (node_0): 17.5 +Analyzer node_1 started + Data extracted: [5, 6, 7, 8] + Aggregator results in MyAnalyzer: 14.5 +MyAnalysis result (node_1): 21.5 +Aggregator started + Analysis results in MyAggregator: [17.5, 21.5] +MyAggregator result (node_2): 19.5 + Last result: 14.5, Current result: 19.5 + Checking convergence at iteration 3 +Aggregated results: 19.5 + Last result: 14.5, Current result: 19.5 + Checking convergence at iteration 3 +--- Ending Iteration 3 --- + +--- Starting Iteration 4 --- +Analyzer node_0 started + Data extracted: [1, 2, 3, 4] + Aggregator results in MyAnalyzer: 19.5 +MyAnalysis result (node_0): 22.5 +Analyzer node_1 started + Data extracted: [5, 6, 7, 8] + Aggregator results in MyAnalyzer: 19.5 +MyAnalysis result (node_1): 26.5 +Aggregator started + Analysis results in MyAggregator: [22.5, 26.5] +MyAggregator result (node_2): 24.5 + Last result: 19.5, Current result: 24.5 + Checking convergence at iteration 4 +Aggregated results: 24.5 + Last result: 19.5, Current result: 24.5 + Checking convergence at iteration 4 +--- Ending Iteration 4 --- + +--- Starting Iteration 5 --- +Analyzer node_0 started + Data extracted: [1, 2, 3, 4] + Aggregator results in MyAnalyzer: 24.5 +MyAnalysis result (node_0): 27.5 +Analyzer node_1 started + Data extracted: [5, 6, 7, 8] + Aggregator results in MyAnalyzer: 24.5 +MyAnalysis result (node_1): 31.5 +Aggregator started + Analysis results in MyAggregator: [27.5, 31.5] +MyAggregator result (node_2): 29.5 + Last result: 24.5, Current result: 29.5 + Checking convergence at iteration 5 +Aggregated results: 29.5 + Test mode: Would apply local DP with epsilon=1 and sensitivity=1 + Last result: 24.5, Current result: 30.186171397668105 + Checking convergence at iteration 5 +Final result: 30.186171397668105 +--- Ending Iteration 5 --- + +``` + +### 3.4. Analyzing the Output + +**Key Observations:** + +1. **Differential Privacy Noise in Action:** + - True result (Iteration 5): `29.5` + - Noisy result: `30.186171397668105` + - Noise added: `~0.686` from Laplace distribution + +2. **Noise added at the last Iteration:** + - Only the final aggregated result has DP noise + - Intermediate results are exact averages + +3. **Privacy Preserved:** + - Cannot determine exact individual node contributions from aggregated results + - Noise makes it infeasible to reverse-engineer original data + +## 4. Best Practices for DP Testing + +### 4.1. Start Without Privacy + +```python +# Step 1: Test basic logic without DP +StarModelTester(..., simple_analysis=False) # No epsilon/sensitivity + +# Step 2: Add weak DP to verify it works +StarModelTester(..., epsilon=10, sensitivity=1) + +# Step 3: Use realistic privacy levels +StarModelTester(..., epsilon=1, sensitivity=1) +``` + +### 4.2. Document Your Privacy Choices + +```python +""" +Privacy Parameters: +- epsilon = 1.0 (moderate privacy) +- sensitivity = 1.0 (assuming max individual contribution of 1) +- Total privacy budget: epsilon * num_iterations = 1.0 * 5 = 5.0 +- Rationale: Balances utility with privacy for testing purposes +""" +``` + +### 4.3. Validate Noise Impact (only during testing) + +Add logging to track noise (you must not do so productively): +```python +def aggregation_method(self, analysis_results): + true_result = sum(analysis_results) / len(analysis_results) + self.flame.flame_log( + f"True result before DP noise: {true_result}", + log_type='notice' + ) + return true_result + # DP noise added automatically after this method returns +``` + +### 4.4. Test Multiple Runs +Since DP adds random noise, run multiple times: +```bash +for i in {1..10}; do + echo "Run $i" + python test_star_pattern_dp.py +done +``` +Collect results to understand noise distribution and variability. + +## 5. Common Issues and Solutions + +### Issue 1: Results Too Noisy to Converge +**Problem:** +```python +epsilon=0.1 # Very strong privacy +# Noise is so large that convergence criteria never met +``` + +**Solution:** +```python +# Either increase epsilon (weaker privacy) +epsilon=1.0 + +# Or use iteration-based stopping +def has_converged(self, result, last_result): + return self.num_iterations >= 10 # Don't rely on value changes +``` + +### Issue 2: Privacy Budget Concerns +**Problem:** +Multiple iterations consume privacy budget additively. + +**Solution:** +```python +# Allocate budget per iteration +total_epsilon = 1.0 +num_iterations = 5 +epsilon_per_iteration = total_epsilon / num_iterations # 0.2 + +# Or stop early to preserve budget +def has_converged(self, result, last_result): + privacy_budget_used = self.num_iterations * self.epsilon + return privacy_budget_used >= 1.0 # Stop when budget exhausted +``` + +### Issue 3: Incorrect Sensitivity +**Problem:** +```python +sensitivity=1 # But data actually ranges 0-100 +# Privacy guarantee is compromised! +``` + +**Solution:** +```python +# Calculate proper sensitivity +data_range = max_value - min_value # 100 - 0 = 100 +num_nodes = len(data_splits) # 2 +sensitivity = data_range / num_nodes # 100 / 2 = 50 + +StarModelTester(..., sensitivity=50) +``` + +### Issue 4: Non-Numeric Results +**Problem:** +```python +def aggregation_method(self, analysis_results): + return {"mean": 4.5, "std": 1.2} # Dict not supported! +``` + +**Solution:** +```python +def aggregation_method(self, analysis_results): + return sum(analysis_results) / len(analysis_results) # Single number only +``` + +## 6. Next Steps + +### 6.1. Move to Production + +After local testing succeeds: + +1. **Replace `StarModelTester` with `StarLocalDPModel`**: + ```python + from flame.star import StarLocalDPModel + + StarLocalDPModel( + analyzer=MyAnalyzer, + aggregator=MyAggregator, + data_type='fhir', # Real FHIR server + query='Patient?_summary=count', + simple_analysis=False, + epsilon=1.0, + sensitivity=1.0 + ) + ``` +2. **Configure real data sources** (FHIR servers, S3 buckets) +3. **Ensure no logging of true results in production!** diff --git a/src/guide/user/testing_examples/local-testing-logistic-regression-example.md b/src/guide/user/testing_examples/local-testing-logistic-regression-example.md new file mode 100644 index 00000000..4910e023 --- /dev/null +++ b/src/guide/user/testing_examples/local-testing-logistic-regression-example.md @@ -0,0 +1,154 @@ +# Local Testing in FLAME: Pancreatic Cancer Analysis +::: info +For an explanation of this example's main code, please refer to the [federated logistic regression coding example](/guide/user/coding_examples/federated-logistic-regression). +::: + +## 1. Initializing Local Testing + +Other than the coding example, here we simulate node-local data by loading CSV files from local directories instead of S3 buckets. The data structure mimics S3 objects by encoding the CSV content as bytes. + +```python +def main(): + # Load node-local CSV files and encode as bytes + data_1 = [{ + 'pancreasData.csv': + open('node1/pancreasData.csv').read().encode('utf-8') + }] + + data_2 = [{ + 'pancreasData.csv': + open('node2/pancreasData.csv').read().encode('utf-8') + }] + + # Combine node data into federated data splits + data_splits = [data_1, data_2] + + # Run federated training + StarModelTester( + data_splits, # List of node-local datasets + PancreasAnalyzer, # Analyzer class + FederatedLogisticRegression, # Aggregator class + 's3', # Data source type + simple_analysis=False, # Multi-round analysis + output_type='pickle', # Output format + result_filepath="./pancreas.pkl"# Save final model + ) +``` + +### 1.1. Data Preparation: + +```python +data_1 = [{ + 'pancreasData.csv': open('node1/pancreasData.csv').read().encode('utf-8') +}] +``` + +This creates a specific data structure: +- **List**: `[...]` - Each node gets one list entry +- **Dictionary**: `{'pancreasData.csv': }` - Maps filenames to content +- **Bytes**: `.encode('utf-8')` - Converts string to bytes (simulating S3 objects) + +**Why this structure?** +- Mimics real federated deployments where data comes from S3 buckets +- Each node might have multiple files (hence the dictionary) +- Bytes representation prepares for network transmission + +### 1.2. StarModelTester Configuration: + +| Parameter | Value | Purpose | +|-----------|-------|---------| +| `data_splits` | `[data_1, data_2]` | Two nodes (hospitals) | +| `PancreasAnalyzer` | Class | Local training logic | +| `FederatedLogisticRegression` | Class | Aggregation logic | +| `'s3'` | Data type | Treats data as S3-like objects | +| `simple_analysis=False` | Iterative | Enables multi-round training | +| `output_type='pickle'` | Format | Serializes the final model | +| `result_filepath` | Path | Saves model to disk | + +## 2. Running the Example + +### 2.1. Prerequisites (not mandatory, just for clarity in this example) + +1. **Project structure**: + ``` + test/ + ├── test_flame_pancreas_analysis.py + ├── node1/ + │ └── pancreasData.csv + └── node2/ + └── pancreasData.csv + ``` + +2. **Data format** (pancreasData.csv): + ```csv + feature1,feature2,feature3,...,label + 0.5,1.2,3.4,...,0 + 0.8,1.5,2.9,...,1 + ... + ``` + - Multiple feature columns (patient measurements) + - Last column is binary label (0 = healthy, 1 = disease) + +### 2.2. Running the Test + +```bash +cd test/ +python test_flame_pancreas_analysis.py +``` + +### 2.3. Expected Output + +``` +--- Starting Iteration 0 --- +Analyzer node_0 started + Data extracted: [{'pancreasData.csv': b'5S_rRNA,7SK,A1BG,A1BG-AS1,A1CF,A2M,A2M-AS1,A2ML1,A4GALT,A4GNT,AA06,AAAS,AACS +Analyzer node_1 started + Data extracted: [{'pancreasData.csv': b'5S_rRNA,7SK,A1BG,A1BG-AS1,A1CF,A2M,A2M-AS1,A2ML1,A4GALT,A4GNT,AA06,AAAS,AACS +Aggregator started +--- Ending Iteration 0 --- + +--- Starting Iteration 1 --- +Analyzer node_0 started + Data extracted: [{'pancreasData.csv': b'5S_rRNA,7SK,A1BG,A1BG-AS1,A1CF,A2M,A2M-AS1,A2ML1,A4GALT,A4GNT,AA06,AAAS,AACS +Analyzer node_1 started + Data extracted: [{'pancreasData.csv': b'5S_rRNA,7SK,A1BG,A1BG-AS1,A1CF,A2M,A2M-AS1,A2ML1,A4GALT,A4GNT,AA06,AAAS,AACS +Aggregator started +--- Ending Iteration 1 --- + +*** ... (similar output for iterations 2 through 9) ... *** + +--- Starting Iteration 10 --- +Analyzer node_0 started + Data extracted: [{'pancreasData.csv': b'5S_rRNA,7SK,A1BG,A1BG-AS1,A1CF,A2M,A2M-AS1,A2ML1,A4GALT,A4GNT,AA06,AAAS,AACS +Analyzer node_1 started + Data extracted: [{'pancreasData.csv': b'5S_rRNA,7SK,A1BG,A1BG-AS1,A1CF,A2M,A2M-AS1,A2ML1,A4GALT,A4GNT,AA06,AAAS,AACS +Aggregator started +Maximum number of 10 iterations reached. Returning current results. +Maximum number of 10 iterations reached. Returning current results. +Final result written to ./pancreas.pkl +--- Ending Iteration 10 --- + +``` + +## 3. Loading the Trained Model + +If you want to load and use the trained model later, you can do so with the following code: + +```python +import pickle + +# Load the trained model coefficients +with open('pancreas.pkl', 'rb') as f: + global_coefficients = pickle.load(f) + +print(f"Model coefficients: {global_coefficients}") +print(f"Shape: {global_coefficients.shape}") + +# Use the model for predictions +# (You'd need to reconstruct a LogisticRegression object) +clf = LogisticRegression(fit_intercept=False) +clf.coef_ = global_coefficients +# clf.classes_ = np.array([0, 1]) # Set class labels + +# predictions = clf.predict(new_patient_data) +``` diff --git a/src/public/files/fastq_qc.py b/src/public/files/fastq_qc.py index d7a416cf..82aa02fa 100644 --- a/src/public/files/fastq_qc.py +++ b/src/public/files/fastq_qc.py @@ -262,7 +262,7 @@ def aggregation_method(self, analysis_results: List[Dict[str, Any]]) -> str: # return json.dumps(result) - def has_converged(self, result, last_result, num_iterations): # type: ignore[no-untyped-def] + def has_converged(self, result, last_result): # type: ignore[no-untyped-def] return True # Single pass QC diff --git a/src/public/files/record_linkage_intersection.py b/src/public/files/record_linkage_intersection.py index 65db1534..bd510da8 100644 --- a/src/public/files/record_linkage_intersection.py +++ b/src/public/files/record_linkage_intersection.py @@ -863,19 +863,18 @@ def aggregation_method(self, analysis_results): else: self.flame.flame_log("Healthcheck failed, couldn't start Mainzelliste") - def has_converged(self, result, last_result, num_iterations): + def has_converged(self, result, last_result): """ Determines if the aggregation process has converged. :param result: The current aggregated result. :param last_result: The aggregated result from the previous iteration. - :param num_iterations: The number of iterations completed so far. :return: True if the aggregation has converged; False to continue iterations. """ # TODO (optional): if the parameter 'simple_analysis' in 'StarModel' is set to False, # this function defines the exit criteria in a multi-iterative analysis (otherwise ignored) #return True # Return True to indicate convergence in this simple analysis - if num_iterations >= 2: + if self.num_iterations >= 2: return True return False diff --git a/src/public/files/vcf_qc.py b/src/public/files/vcf_qc.py index 665d8db8..1e252a8c 100644 --- a/src/public/files/vcf_qc.py +++ b/src/public/files/vcf_qc.py @@ -183,7 +183,7 @@ def aggregation_method(self, analysis_results: List[Dict[str, Any]]) -> str: # return json.dumps(result) - def has_converged(self, result, last_result, num_iterations): # type: ignore[no-untyped-def] + def has_converged(self, result, last_result): # type: ignore[no-untyped-def] return True # Single pass QC