Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
76b1dd1
tests for CoW behavior in pandas
alexfurmenkov Apr 10, 2026
12b2dc8
tested true CoW via shallow copy
alexfurmenkov Apr 13, 2026
7a4479d
added shallow copying for cached datasets
alexfurmenkov Apr 14, 2026
3fe3bb6
dask copy workaround
alexfurmenkov Apr 15, 2026
6676707
fix CoW tests and wrapper
alexfurmenkov Apr 27, 2026
ebaa027
Merge branch 'main' into 691-remove-deepcopy
alexfurmenkov Apr 28, 2026
6940e4b
added tests for cache methods. changed cache access to get() and get_…
alexfurmenkov Apr 28, 2026
d06d875
readme notice about CoW usage
alexfurmenkov Apr 28, 2026
c33ef02
Merge branch 'main' into 691-remove-deepcopy
RamilCDISC Apr 28, 2026
d14e859
fix filter_cache access to cache
alexfurmenkov Apr 29, 2026
5d4072e
Merge remote-tracking branch 'origin/691-remove-deepcopy' into 691-re…
alexfurmenkov Apr 29, 2026
6ff9399
Merge branch 'main' into 691-remove-deepcopy
alexfurmenkov Apr 29, 2026
1ce0d16
Merge branch 'main' into 691-remove-deepcopy
SFJohnson24 May 18, 2026
1b18087
Merge branch 'main' into 691-remove-deepcopy
SFJohnson24 May 27, 2026
92f1e27
edits
SFJohnson24 May 27, 2026
52c4124
Merge branch 'main' into 691-remove-deepcopy
SFJohnson24 May 27, 2026
fdafb93
Merge branch 'main' into 691-remove-deepcopy
SFJohnson24 May 27, 2026
44fe54e
merge main
SFJohnson24 May 28, 2026
ef5daf2
Merge branch '691-remove-deepcopy' of https://github.com/cdisc-org/cd…
SFJohnson24 May 28, 2026
237e5e2
merge main
SFJohnson24 May 28, 2026
b0cda48
merge main
SFJohnson24 May 28, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions cdisc_rules_engine/rules_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from typing import List, Union
from dateutil.parser._parser import ParserError
import traceback
import pandas as pd

from business_rules import export_rule_data
from business_rules.engine import run
Expand Down Expand Up @@ -33,6 +34,7 @@
DataServiceInterface,
)
from cdisc_rules_engine.models.actions import COREActions
from cdisc_rules_engine.models.dataset import DaskDataset
from cdisc_rules_engine.models.dataset.dataset_interface import DatasetInterface
from cdisc_rules_engine.models.dataset_variable import DatasetVariable
from cdisc_rules_engine.models.failed_validation_entity import FailedValidationEntity
Expand All @@ -59,6 +61,8 @@
from cdisc_rules_engine.models.sdtm_dataset_metadata import SDTMDatasetMetadata
from cdisc_rules_engine.enums.sensitivity import Sensitivity

pd.options.mode.copy_on_write = True

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will turn pandas CoW process wide so any one integrating rules engine in their workflow will have this enabled. We should document it by adding to the repository readme.



class RulesEngine:
def __init__(
Expand Down Expand Up @@ -375,9 +379,9 @@ def execute_rule(
rule["conditions"], dataset.columns.to_list()
)
rule_copy["conditions"].set_conditions(updated_conditions)
# Adding copy for now to avoid updating cached dataset
dataset = deepcopy(dataset)
# preprocess dataset
if isinstance(dataset, DaskDataset):
dataset = deepcopy(dataset)
dataset_preprocessor = DatasetPreprocessor(
dataset, dataset_metadata, self.data_service, self.cache
)
Expand Down
34 changes: 22 additions & 12 deletions cdisc_rules_engine/services/cache/in_memory_cache_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from cdisc_rules_engine.interfaces import (
CacheServiceInterface,
)
from cdisc_rules_engine.models.dataset import DatasetInterface
from cdisc_rules_engine.models.dataset import DatasetInterface, PandasDataset
from cachetools import LRUCache
import psutil
from multiprocessing import Lock
Expand Down Expand Up @@ -62,11 +62,16 @@ def add(self, cache_key, data):
)

def add_dataset(self, cache_key, data):
if get_data_size(data) > self.max_dataset_cache_size:
return
with self.dataset_cache_lock:
self.dataset_cache[cache_key] = data

def get_dataset(self, cache_key):
return self.dataset_cache.get(cache_key, None)
cached = self.dataset_cache.get(cache_key)
if type(cached) is PandasDataset:
return PandasDataset(cached.data.copy(deep=False))
return cached

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here the same cached wrapper is returned. Pandas CoW protects separate pandas objects sharing same underlying data. Here the change to cached.data mutates the wrapper but in end returns the same object.


def add_batch(
self,
Expand All @@ -82,27 +87,32 @@ def add_batch(
self.add(prefix + cache_key, item)

def get(self, cache_key):
return self.cache.get(cache_key, None)
cached = self.cache.get(cache_key)
if type(cached) is PandasDataset:
return PandasDataset(cached.data.copy(deep=False))
return cached

def get_all(self, cache_keys: List[str]):

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other get functions like get-all and get_all_by_prefix etc still return directly which can mutate the cached dataset. We need to cover these too as for get() function.

return [self.cache.get(key) for key in cache_keys]
return [self.get(key) for key in cache_keys]

def get_all_by_prefix(self, prefix):
items = []
for key in self.cache:
if key.startswith(prefix):
items.append(self.cache[key])
return items
with self.cache_lock:
keys = [key for key in self.cache.keys() if key.startswith(prefix)]
return [self.get(key) for key in keys]

def dataset_keys(self):
return self.dataset_cache.keys()

def filter_cache(self, prefix: str) -> dict:
return {k: self.cache[k] for k in self.cache.keys() if k.startswith(prefix)}
with self.cache_lock:
keys = [k for k in self.cache.keys() if k.startswith(prefix)]
return {k: self.get(k) for k in keys}

def get_by_regex(self, regex: str) -> dict:
regex = regex.replace("*", ".*")
return {k: self.cache[k] for k in self.cache.keys() if re.search(regex, k)}
with self.cache_lock:
keys = [k for k in self.cache.keys() if re.search(regex, k)]
return {k: self.get(k) for k in keys}

def exists(self, cache_key):
return cache_key in self.cache
Expand All @@ -119,7 +129,7 @@ def clear_all(self, prefix: str = None):
for key in keys_to_remove:
self.clear(key)
else:
self.cache = LRUCache(maxsize=self.max_size, getsizeof=asizeof.asizeof)
self.cache = LRUCache(maxsize=self.max_size, getsizeof=cust_asizeof)

def add_all(self, data: dict):
for key, val in data.items():
Expand Down
3 changes: 3 additions & 0 deletions docs/cli-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@

Run conformance validation against a CDISC standard.

Validate has pandas Copy-on-Write (CoW) enabled globally when using the rules engine.
**Note**: In Pandas 2.x this is an opt-in feature, in Pandas 3.x, CoW is enabled by default.

```bash
python core.py validate --help
```
Expand Down
224 changes: 224 additions & 0 deletions tests/unit/test_services/test_cache/test_immutable_cache.py

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test only checks the local CacheService created in this test not the production InMemoryCacheService or PandasDataset. The changes will not be tested properly.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add a little more testing that includes operations which will affect the length of the dataset for example add drop rows. The engine will perform these operations on the dataset. Confirming proper caching will be helpful.

Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
import numpy as np
import pandas as pd
import pytest

from cdisc_rules_engine.models.dataset.pandas_dataset import PandasDataset
from cdisc_rules_engine.services.cache.in_memory_cache_service import (
InMemoryCacheService,
)


@pytest.fixture(autouse=True)
def reset_singleton():
InMemoryCacheService._instance = None
yield
InMemoryCacheService._instance = None


@pytest.fixture
def cache():
return InMemoryCacheService()


@pytest.fixture
def sample_dataset():
return PandasDataset(pd.DataFrame({"A": [1, 2, 3], "B": [10, 20, 30]}))


class TestGet:
def test_returns_new_wrapper_not_cached_object(self, cache, sample_dataset):
cache.add("x", sample_dataset)
result = cache.get("x")
assert result is not cache.cache["x"]
assert result.data is not cache.cache["x"].data

def test_cow_does_not_modify_cache_on_write(self, cache, sample_dataset):
pd.options.mode.copy_on_write = True
cache.add("x", sample_dataset)
retrieved = cache.get("x")
retrieved.data.loc[0, "A"] = 999
assert cache.cache["x"].data.loc[0, "A"] == 1

def test_shares_memory_before_write(self, cache, sample_dataset):
pd.options.mode.copy_on_write = True
cache.add("x", sample_dataset)
retrieved = cache.get("x")
assert np.shares_memory(retrieved.data["A"], cache.cache["x"].data["A"])

def test_add_rows_does_not_affect_cache(self, cache, sample_dataset):
pd.options.mode.copy_on_write = True
cache.add("x", sample_dataset)
retrieved = cache.get("x")
retrieved.data = pd.concat(
[retrieved.data, pd.DataFrame({"A": [999], "B": [999]})],
ignore_index=True,
)
assert len(cache.cache["x"].data) == 3
assert len(retrieved.data) == 4

def test_drop_rows_does_not_affect_cache(self, cache, sample_dataset):
pd.options.mode.copy_on_write = True
cache.add("x", sample_dataset)
retrieved = cache.get("x")
retrieved.data = retrieved.data.drop(index=0).reset_index(drop=True)
assert len(cache.cache["x"].data) == 3
assert len(retrieved.data) == 2

def test_filter_rows_does_not_affect_cache(self, cache, sample_dataset):
pd.options.mode.copy_on_write = True
cache.add("x", sample_dataset)
retrieved = cache.get("x")
retrieved.data = retrieved.data[retrieved.data["A"] > 1].reset_index(drop=True)
assert len(cache.cache["x"].data) == 3
assert cache.cache["x"].data["A"].tolist() == [1, 2, 3]

def test_multiple_gets_are_independent(self, cache, sample_dataset):
pd.options.mode.copy_on_write = True
cache.add("x", sample_dataset)
first = cache.get("x")
second = cache.get("x")
first.data = first.data.drop(index=0).reset_index(drop=True)
assert len(second.data) == 3
assert len(cache.cache["x"].data) == 3

def test_non_dataset_returns_as_is(self, cache):
cache.add("key", {"some": "dict"})
assert cache.get("key") == {"some": "dict"}

def test_object_dtype_nested_mutation_affects_cache(self, cache):
"""CoW can't protect in nested mutations"""
df = pd.DataFrame({"A": [[1], [2], [3]]})
cache.add("x", PandasDataset(df))
retrieved = cache.get("x")
retrieved.data.loc[0, "A"].append(999)
assert cache.cache["x"].data.loc[0, "A"] == [1, 999]


class TestGetDataset:
def test_returns_new_wrapper_not_cached_object(self, cache, sample_dataset):
cache.add_dataset("x", sample_dataset)
result = cache.get_dataset("x")
assert result is not cache.dataset_cache["x"]
assert result.data is not cache.dataset_cache["x"].data

def test_cow_does_not_modify_cache_on_write(self, cache, sample_dataset):
pd.options.mode.copy_on_write = True
cache.add_dataset("x", sample_dataset)
retrieved = cache.get_dataset("x")
retrieved.data.loc[0, "A"] = 999
assert cache.dataset_cache["x"].data.loc[0, "A"] == 1

def test_add_rows_does_not_affect_cache(self, cache, sample_dataset):
pd.options.mode.copy_on_write = True
cache.add_dataset("x", sample_dataset)
retrieved = cache.get_dataset("x")
retrieved.data = pd.concat(
[retrieved.data, pd.DataFrame({"A": [999], "B": [999]})],
ignore_index=True,
)
assert len(cache.dataset_cache["x"].data) == 3
assert len(retrieved.data) == 4

def test_drop_rows_does_not_affect_cache(self, cache, sample_dataset):
pd.options.mode.copy_on_write = True
cache.add_dataset("x", sample_dataset)
retrieved = cache.get_dataset("x")
retrieved.data = retrieved.data.drop(index=0).reset_index(drop=True)
assert len(cache.dataset_cache["x"].data) == 3
assert len(retrieved.data) == 2


class TestGetAll:
def test_returns_new_wrappers(self, cache, sample_dataset):
cache.add("x", sample_dataset)
cache.add("y", sample_dataset)
results = cache.get_all(["x", "y"])
assert all(r is not cache.cache["x"] for r in results)
assert all(r.data is not cache.cache["x"].data for r in results)

def test_results_are_independent(self, cache, sample_dataset):
pd.options.mode.copy_on_write = True
cache.add("x", sample_dataset)
cache.add("y", sample_dataset)
first, second = cache.get_all(["x", "y"])
first.data = first.data.drop(index=0).reset_index(drop=True)
assert len(second.data) == 3
assert len(cache.cache["x"].data) == 3

def test_cow_does_not_modify_cache_on_write(self, cache, sample_dataset):
pd.options.mode.copy_on_write = True
cache.add("x", sample_dataset)
results = cache.get_all(["x"])
results[0].data.loc[0, "A"] = 999
assert cache.cache["x"].data.loc[0, "A"] == 1

def test_missing_key_returns_none(self, cache):
assert cache.get_all(["missing"]) == [None]


class TestGetAllByPrefix:
def test_returns_only_matching_keys(self, cache, sample_dataset):
cache.add("ds/ae", sample_dataset)
cache.add("ds/lb", sample_dataset)
cache.add("other/ae", sample_dataset)
results = cache.get_all_by_prefix("ds/")
assert len(results) == 2

def test_returns_new_wrappers(self, cache, sample_dataset):
cache.add("ds/ae", sample_dataset)
results = cache.get_all_by_prefix("ds/")
assert results[0] is not cache.cache["ds/ae"]
assert results[0].data is not cache.cache["ds/ae"].data

def test_cow_does_not_modify_cache_on_write(self, cache, sample_dataset):
pd.options.mode.copy_on_write = True
cache.add("ds/ae", sample_dataset)
results = cache.get_all_by_prefix("ds/")
results[0].data.loc[0, "A"] = 999
assert cache.cache["ds/ae"].data.loc[0, "A"] == 1

def test_drop_rows_does_not_affect_cache(self, cache, sample_dataset):
pd.options.mode.copy_on_write = True
cache.add("ds/ae", sample_dataset)
results = cache.get_all_by_prefix("ds/")
results[0].data = results[0].data.drop(index=0).reset_index(drop=True)
assert len(cache.cache["ds/ae"].data) == 3

def test_no_match_returns_empty(self, cache, sample_dataset):
cache.add("ds/ae", sample_dataset)
assert cache.get_all_by_prefix("other/") == []


class TestGetByRegex:
def test_returns_matching_keys(self, cache, sample_dataset):
cache.add("ae_data", sample_dataset)
cache.add("lb_data", sample_dataset)
cache.add("ae_meta", sample_dataset)
result = cache.get_by_regex("ae_*")
assert set(result.keys()) == {"ae_data", "ae_meta"}

def test_returns_new_wrappers(self, cache, sample_dataset):
cache.add("ae_data", sample_dataset)
result = cache.get_by_regex("ae_*")
assert result["ae_data"] is not cache.cache["ae_data"]
assert result["ae_data"].data is not cache.cache["ae_data"].data

def test_cow_does_not_modify_cache_on_write(self, cache, sample_dataset):
pd.options.mode.copy_on_write = True
cache.add("ae_data", sample_dataset)
result = cache.get_by_regex("ae_*")
result["ae_data"].data.loc[0, "A"] = 999
assert cache.cache["ae_data"].data.loc[0, "A"] == 1

def test_drop_rows_does_not_affect_cache(self, cache, sample_dataset):
pd.options.mode.copy_on_write = True
cache.add("ae_data", sample_dataset)
result = cache.get_by_regex("ae_*")
result["ae_data"].data = (
result["ae_data"].data.drop(index=0).reset_index(drop=True)
)
assert len(cache.cache["ae_data"].data) == 3

def test_no_match_returns_empty_dict(self, cache, sample_dataset):
cache.add("ae_data", sample_dataset)
assert cache.get_by_regex("lb_*") == {}
Loading