From 832126925a8412c6534b326b8b835b8777cc3bbb Mon Sep 17 00:00:00 2001 From: ablack3 Date: Tue, 31 Mar 2026 01:08:47 +0000 Subject: [PATCH 1/2] update attrition recording. preserve person id. remove inequality full join in favor of inner join followed by filter. --- R/CDMInterface.R | 55 ++++++++++++++++++++--------------- R/TreatmentPatterns-package.R | 1 - R/computePathways.R | 4 +-- R/constructPathways.R | 50 +++++++++++++++++++------------ R/exportPatientLevel.R | 8 ++--- 5 files changed, 68 insertions(+), 50 deletions(-) diff --git a/R/CDMInterface.R b/R/CDMInterface.R index d2b70de8..92f299c3 100644 --- a/R/CDMInterface.R +++ b/R/CDMInterface.R @@ -60,17 +60,13 @@ fetchCohortTable <- function(cdm, cohorts, cohortTableName, andromeda, andromeda for (tableName in cohortTableName) { cdm$tp_temp_tbl <- cdm[[tableName]] %>% - dplyr::group_by(.data$subject_id) %>% + dplyr::filter(.data$cohort_definition_id %in% cohortIds) %>% dplyr::mutate( - subject_id_origin = .data$subject_id + subject_id = as.character(.data$subject_id) ) %>% - dplyr::filter(.data$cohort_definition_id %in% cohortIds) %>% - dplyr::filter(!!CDMConnector::datediff("cohort_start_date", "cohort_end_date", interval = "day") >= minEraDuration) %>% - dplyr::group_by(.data$subject_id) %>% - dplyr::ungroup() %>% dplyr::inner_join( - cdm$person, - by = dplyr::join_by(subject_id_origin == person_id) + cdm$person %>% dplyr::mutate(person_id = as.character(.data$person_id)), + by = dplyr::join_by(subject_id == person_id) ) %>% dplyr::inner_join( cdm$concept, @@ -79,9 +75,6 @@ fetchCohortTable <- function(cdm, cohorts, cohortTableName, andromeda, andromeda date_of_birth = as.Date(paste0(as.character(.data$year_of_birth), "-01-01"))) %>% dplyr::mutate( age = !!CDMConnector::datediff("date_of_birth", "cohort_start_date", interval = "year")) %>% - dplyr::mutate( - subject_id_origin = as.character(.data$subject_id_origin) - ) %>% dplyr::rename(sex = "concept_name") %>% dplyr::mutate( temp_date = as.Date("1970-01-01") @@ -93,7 +86,6 @@ fetchCohortTable <- function(cdm, cohorts, cohortTableName, andromeda, andromeda dplyr::select( "cohort_definition_id", "subject_id", - "subject_id_origin", "cohort_start_date", "cohort_end_date", "age", @@ -113,32 +105,49 @@ fetchCohortTable <- function(cdm, cohorts, cohortTableName, andromeda, andromeda cdm <- CDMConnector::dropSourceTable(cdm = cdm, name = "tp_temp_tbl") - andromeda[[andromedaTableName]] <- andromeda[[andromedaTableName]] %>% - dplyr::mutate(r = dplyr::row_number()) %>% - dplyr::group_by(.data$subject_id_origin) %>% - dplyr::mutate( - subject_id = as.integer(min(.data$r, na.rm = TRUE)) - ) %>% - dplyr::select(-"r") - + # Filter to persons with at least one target cohort record targetId <- as.numeric(targetCohortIds) - + andromeda[[andromedaTableName]] <- andromeda[[andromedaTableName]] %>% dplyr::mutate(cohort_definition_id = as.numeric(.data$cohort_definition_id)) %>% dplyr::group_by(.data$subject_id) %>% dplyr::filter(any(.data$cohort_definition_id %in% targetId, na.rm = TRUE)) %>% dplyr::ungroup() - + + # Attrition: after target cohort filter n <- andromeda[[andromedaTableName]] %>% dplyr::group_by(.data$subject_id) %>% dplyr::summarise(n = dplyr::n()) %>% dplyr::pull() - + appendAttrition( toAdd = data.frame( number_records = sum(n), number_subjects = length(n), reason_id = 2, + reason = "Persons with no record in target cohort", + time_stamp = as.numeric(Sys.time()) + ), + andromeda = andromeda + ) + + # Filter by minEraDuration (dates are integers = days since epoch) + andromeda[[andromedaTableName]] <- andromeda[[andromedaTableName]] %>% + dplyr::filter( + (.data$cohort_end_date - .data$cohort_start_date) >= minEraDuration + ) + + # Attrition: after minEraDuration filter + n <- andromeda[[andromedaTableName]] %>% + dplyr::group_by(.data$subject_id) %>% + dplyr::summarise(n = dplyr::n()) %>% + dplyr::pull() + + appendAttrition( + toAdd = data.frame( + number_records = sum(n), + number_subjects = length(n), + reason_id = 3, reason = sprintf("Removing records < minEraDuration (%s)", minEraDuration), time_stamp = as.numeric(Sys.time()) ), diff --git a/R/TreatmentPatterns-package.R b/R/TreatmentPatterns-package.R index 429a0709..427ebf5d 100644 --- a/R/TreatmentPatterns-package.R +++ b/R/TreatmentPatterns-package.R @@ -49,7 +49,6 @@ utils::globalVariables( "duration_max", "duration_median", "event_name", - "subject_id_origin", "person_id", "concept_id", "year_of_birth", diff --git a/R/computePathways.R b/R/computePathways.R index da96cf4b..35635731 100644 --- a/R/computePathways.R +++ b/R/computePathways.R @@ -207,7 +207,7 @@ computePathways <- function( toAdd = data.frame( number_records = attrCounts$nRecords, number_subjects = attrCounts$nSubjects, - reason_id = 9, + reason_id = 11, reason = sprintf("treatment construction done") ), andromeda = andromeda @@ -449,7 +449,7 @@ checkCohortTable <- function(andromeda) { assertions <- checkmate::makeAssertCollection() checkmate::assertIntegerish(cohortTableHead$cohort_definition_id, add = assertions) - checkmate::assertIntegerish(cohortTableHead$subject_id, add = assertions) + checkmate::assertCharacter(cohortTableHead$subject_id, add = assertions) checkmate::assertIntegerish(cohortTableHead$cohort_start_date, add = assertions) checkmate::assertIntegerish(cohortTableHead$cohort_end_date, add = assertions) checkmate::reportAssertions(assertions) diff --git a/R/constructPathways.R b/R/constructPathways.R index 2bd6b12b..987e95c8 100755 --- a/R/constructPathways.R +++ b/R/constructPathways.R @@ -123,7 +123,7 @@ constructPathways <- function(settings, andromeda) { toAdd = data.frame( number_records = attrCounts$nRecords, number_subjects = attrCounts$nSubjects, - reason_id = 10, + reason_id = 12, reason = sprintf("Max path length (%s)", settings$maxPathLength) ), andromeda = andromeda @@ -252,17 +252,34 @@ createTreatmentHistory <- function( Andromeda::createIndex(andromeda$eventCohorts, c("personId", "startDate", "endDate")) Andromeda::createIndex(andromeda$targetCohorts, c("personId", "indexDate", "endDate")) - andromeda[[sprintf("cohortTable_%s", targetCohortId)]] <- dplyr::full_join( + # Step 1: Equi-join on person to match events to targets + andromeda$treatmentHistoryJoined <- dplyr::inner_join( x = andromeda$eventCohorts, y = andromeda$targetCohorts, by = dplyr::join_by( - personId == personId, - subject_id_origin == subject_id_origin, - y$indexDate <= x$startDate, - x$startDate <= y$endDate, + personId == personId ), suffix = c("Event", "Target") ) + # Attrition: persons with no event records (may appear as 'None' paths if nonePaths = TRUE) + attrCounts <- fetchAttritionCounts(andromeda, "treatmentHistoryJoined") + appendAttrition( + toAdd = data.frame( + number_records = attrCounts$nRecords, + number_subjects = attrCounts$nSubjects, + reason_id = 4, + reason = "Subjects with no event records (may appear as 'None' paths)" + ), + andromeda = andromeda + ) + + # Step 2: Filter to events within the time window + andromeda[[sprintf("cohortTable_%s", targetCohortId)]] <- andromeda$treatmentHistoryJoined %>% + dplyr::filter( + .data$indexDate <= .data$startDateEvent, + .data$startDateEvent <= .data$endDateTarget + ) + andromeda$treatmentHistory <- andromeda[[sprintf("cohortTable_%s", targetCohortId)]] %>% dplyr::select( "personId", @@ -279,18 +296,15 @@ createTreatmentHistory <- function( dplyr::mutate( durationEra = .data$eventEndDate - .data$eventStartDate, eventCohortId = as.character(as.integer(.data$eventCohortId)) - ) %>% - dplyr::filter( - !is.na(.data$indexYear), - !is.na(.data$eventCohortId) ) + # Attrition: events outside window attrCounts <- fetchAttritionCounts(andromeda, "treatmentHistory") appendAttrition( toAdd = data.frame( number_records = attrCounts$nRecords, number_subjects = attrCounts$nSubjects, - reason_id = 3, + reason_id = 5, reason = sprintf( "Removing events outside window (%s: %s | %s: %s)", startAnchor, windowStart, endAnchor, windowEnd @@ -363,7 +377,7 @@ doSplitEventCohorts <- function( toAdd = data.frame( number_records = attrCounts$nRecords, number_subjects = attrCounts$nSubjects, - reason_id = 4, + reason_id = 6, reason = sprintf("splitEventCohorts") ), andromeda = andromeda @@ -439,7 +453,7 @@ doEraCollapseNew <- function(andromeda, eraCollapseSize) { toAdd = data.frame( number_records = attrCounts$nRecords, number_subjects = attrCounts$nSubjects, - reason_id = 5, + reason_id = 7, reason = sprintf("Iteration %s: Collapsing eras, eraCollapse (%s)", counter, eraCollapseSize) ), andromeda = andromeda @@ -453,7 +467,7 @@ doEraCollapseNew <- function(andromeda, eraCollapseSize) { toAdd = data.frame( number_records = attrCounts$nRecords, number_subjects = attrCounts$nSubjects, - reason_id = 5, + reason_id = 7, reason = sprintf("No eras needed Collapsing, eraCollapse (%s)", eraCollapseSize) ), andromeda = andromeda @@ -525,7 +539,7 @@ doEraCollapse <- function(andromeda, eraCollapseSize) { toAdd = data.frame( number_records = attrCounts$nRecords, number_subjects = attrCounts$nSubjects, - reason_id = 5, + reason_id = 7, reason = sprintf("Collapsing eras, eraCollapse (%s)", eraCollapseSize) ), andromeda = andromeda @@ -806,7 +820,7 @@ doCombinationWindow <- function( toAdd = data.frame( number_records = attrCounts$nRecords, number_subjects = attrCounts$nSubjects, - reason_id = 6, + reason_id = 8, reason = sprintf("Iteration %s: minPostCombinationDuration (%s), combinatinoWindow (%s)", iterations, minPostCombinationDuration, combinationWindow) ), andromeda = andromeda @@ -829,7 +843,7 @@ doCombinationWindow <- function( toAdd = data.frame( number_records = attrCounts$nRecords, number_subjects = attrCounts$nSubjects, - reason_id = 7, + reason_id = 9, reason = sprintf("After Combination") ), andromeda = andromeda @@ -996,7 +1010,7 @@ doFilterTreatments <- function(andromeda, filterTreatments) { toAdd = data.frame( number_records = attrCounts$nRecords, number_subjects = attrCounts$nSubjects, - reason_id = 8, + reason_id = 10, reason = sprintf("filterTreatments (%s)", filterTreatments) ), andromeda = andromeda diff --git a/R/exportPatientLevel.R b/R/exportPatientLevel.R index d3d721f0..85c0cbaf 100644 --- a/R/exportPatientLevel.R +++ b/R/exportPatientLevel.R @@ -1,10 +1,6 @@ addOriginId <- function(table) { - andromeda <- table$src$con - - andromeda$cohortTable %>% - dplyr::select("personId", "subject_id_origin") %>% - dplyr::inner_join(table, by = dplyr::join_by(personId == personId)) %>% - dplyr::distinct() + # personId already contains the original subject_id (as character) + table } writeTreatmentHistory <- function(andromeda, outputPath) { From 2c48424c53a8e11038c782353c941c9710da1272 Mon Sep 17 00:00:00 2001 From: ablack3 Date: Wed, 1 Apr 2026 02:32:11 +0000 Subject: [PATCH 2/2] address pr comments --- R/CDMInterface.R | 33 ++++++++---------- R/computePathways.R | 2 +- R/constructPathways.R | 44 +++++++++--------------- R/exportPatientLevel.R | 2 +- tests/testthat/test-exportPatientLevel.R | 4 +-- 5 files changed, 35 insertions(+), 50 deletions(-) diff --git a/R/CDMInterface.R b/R/CDMInterface.R index 92f299c3..d67d9bd7 100644 --- a/R/CDMInterface.R +++ b/R/CDMInterface.R @@ -61,11 +61,8 @@ fetchCohortTable <- function(cdm, cohorts, cohortTableName, andromeda, andromeda for (tableName in cohortTableName) { cdm$tp_temp_tbl <- cdm[[tableName]] %>% dplyr::filter(.data$cohort_definition_id %in% cohortIds) %>% - dplyr::mutate( - subject_id = as.character(.data$subject_id) - ) %>% dplyr::inner_join( - cdm$person %>% dplyr::mutate(person_id = as.character(.data$person_id)), + cdm$person, by = dplyr::join_by(subject_id == person_id) ) %>% dplyr::inner_join( @@ -105,16 +102,14 @@ fetchCohortTable <- function(cdm, cohorts, cohortTableName, andromeda, andromeda cdm <- CDMConnector::dropSourceTable(cdm = cdm, name = "tp_temp_tbl") - # Filter to persons with at least one target cohort record - targetId <- as.numeric(targetCohortIds) - + # Filter by minEraDuration first (dates are integers = days since epoch) andromeda[[andromedaTableName]] <- andromeda[[andromedaTableName]] %>% dplyr::mutate(cohort_definition_id = as.numeric(.data$cohort_definition_id)) %>% - dplyr::group_by(.data$subject_id) %>% - dplyr::filter(any(.data$cohort_definition_id %in% targetId, na.rm = TRUE)) %>% - dplyr::ungroup() + dplyr::filter( + (.data$cohort_end_date - .data$cohort_start_date) >= minEraDuration + ) - # Attrition: after target cohort filter + # Attrition: after minEraDuration filter n <- andromeda[[andromedaTableName]] %>% dplyr::group_by(.data$subject_id) %>% dplyr::summarise(n = dplyr::n()) %>% @@ -125,19 +120,21 @@ fetchCohortTable <- function(cdm, cohorts, cohortTableName, andromeda, andromeda number_records = sum(n), number_subjects = length(n), reason_id = 2, - reason = "Persons with no record in target cohort", + reason = sprintf("Removing records < minEraDuration (%s)", minEraDuration), time_stamp = as.numeric(Sys.time()) ), andromeda = andromeda ) - # Filter by minEraDuration (dates are integers = days since epoch) + # Filter to persons with at least one target cohort record + targetId <- as.numeric(targetCohortIds) + andromeda[[andromedaTableName]] <- andromeda[[andromedaTableName]] %>% - dplyr::filter( - (.data$cohort_end_date - .data$cohort_start_date) >= minEraDuration - ) + dplyr::group_by(.data$subject_id) %>% + dplyr::filter(any(.data$cohort_definition_id %in% targetId, na.rm = TRUE)) %>% + dplyr::ungroup() - # Attrition: after minEraDuration filter + # Attrition: after target cohort filter n <- andromeda[[andromedaTableName]] %>% dplyr::group_by(.data$subject_id) %>% dplyr::summarise(n = dplyr::n()) %>% @@ -148,7 +145,7 @@ fetchCohortTable <- function(cdm, cohorts, cohortTableName, andromeda, andromeda number_records = sum(n), number_subjects = length(n), reason_id = 3, - reason = sprintf("Removing records < minEraDuration (%s)", minEraDuration), + reason = "After filtering to persons with >=1 target cohort record", time_stamp = as.numeric(Sys.time()) ), andromeda = andromeda diff --git a/R/computePathways.R b/R/computePathways.R index 35635731..52604e50 100644 --- a/R/computePathways.R +++ b/R/computePathways.R @@ -449,7 +449,7 @@ checkCohortTable <- function(andromeda) { assertions <- checkmate::makeAssertCollection() checkmate::assertIntegerish(cohortTableHead$cohort_definition_id, add = assertions) - checkmate::assertCharacter(cohortTableHead$subject_id, add = assertions) + checkmate::assertIntegerish(cohortTableHead$subject_id, add = assertions) checkmate::assertIntegerish(cohortTableHead$cohort_start_date, add = assertions) checkmate::assertIntegerish(cohortTableHead$cohort_end_date, add = assertions) checkmate::reportAssertions(assertions) diff --git a/R/constructPathways.R b/R/constructPathways.R index 987e95c8..009142fe 100755 --- a/R/constructPathways.R +++ b/R/constructPathways.R @@ -252,34 +252,37 @@ createTreatmentHistory <- function( Andromeda::createIndex(andromeda$eventCohorts, c("personId", "startDate", "endDate")) Andromeda::createIndex(andromeda$targetCohorts, c("personId", "indexDate", "endDate")) - # Step 1: Equi-join on person to match events to targets - andromeda$treatmentHistoryJoined <- dplyr::inner_join( + # Join events to targets with date constraints in the join to avoid cartesian products + andromeda[[sprintf("cohortTable_%s", targetCohortId)]] <- dplyr::inner_join( x = andromeda$eventCohorts, y = andromeda$targetCohorts, by = dplyr::join_by( - personId == personId + personId == personId, + startDate >= indexDate, + startDate <= endDate ), suffix = c("Event", "Target") ) - # Attrition: persons with no event records (may appear as 'None' paths if nonePaths = TRUE) - attrCounts <- fetchAttritionCounts(andromeda, "treatmentHistoryJoined") + # Attrition: subjects with events within window + nTargetSubjects <- andromeda$targetCohorts %>% + dplyr::distinct(.data$personId) %>% + dplyr::summarise(n = dplyr::n()) %>% + dplyr::pull() + + attrCounts <- fetchAttritionCounts(andromeda, sprintf("cohortTable_%s", targetCohortId)) + nSubjectsWithEvents <- attrCounts$nSubjects + nSubjectsWithoutEvents <- nTargetSubjects - nSubjectsWithEvents + appendAttrition( toAdd = data.frame( number_records = attrCounts$nRecords, number_subjects = attrCounts$nSubjects, reason_id = 4, - reason = "Subjects with no event records (may appear as 'None' paths)" + reason = sprintf("After joining events within window (%s subjects without events)", nSubjectsWithoutEvents) ), andromeda = andromeda ) - # Step 2: Filter to events within the time window - andromeda[[sprintf("cohortTable_%s", targetCohortId)]] <- andromeda$treatmentHistoryJoined %>% - dplyr::filter( - .data$indexDate <= .data$startDateEvent, - .data$startDateEvent <= .data$endDateTarget - ) - andromeda$treatmentHistory <- andromeda[[sprintf("cohortTable_%s", targetCohortId)]] %>% dplyr::select( "personId", @@ -297,21 +300,6 @@ createTreatmentHistory <- function( durationEra = .data$eventEndDate - .data$eventStartDate, eventCohortId = as.character(as.integer(.data$eventCohortId)) ) - - # Attrition: events outside window - attrCounts <- fetchAttritionCounts(andromeda, "treatmentHistory") - appendAttrition( - toAdd = data.frame( - number_records = attrCounts$nRecords, - number_subjects = attrCounts$nSubjects, - reason_id = 5, - reason = sprintf( - "Removing events outside window (%s: %s | %s: %s)", - startAnchor, windowStart, endAnchor, windowEnd - ) - ), - andromeda = andromeda - ) return(invisible(NULL)) } diff --git a/R/exportPatientLevel.R b/R/exportPatientLevel.R index 85c0cbaf..e0fd4a6c 100644 --- a/R/exportPatientLevel.R +++ b/R/exportPatientLevel.R @@ -1,5 +1,5 @@ addOriginId <- function(table) { - # personId already contains the original subject_id (as character) + # personId already contains the original subject_id (no remapping) table } diff --git a/tests/testthat/test-exportPatientLevel.R b/tests/testthat/test-exportPatientLevel.R index 6875785f..0c0b456f 100644 --- a/tests/testthat/test-exportPatientLevel.R +++ b/tests/testthat/test-exportPatientLevel.R @@ -39,14 +39,14 @@ test_that("exportPatientLevel", { attrition <- read.csv(file.path(tempdir(), "attrition.csv")) cdm_source_info <- read.csv(file.path(tempdir(), "cdm_source_info.csv")) - expect_equal(ncol(treatment_history), 13) + expect_equal(ncol(treatment_history), 12) expect_equal(nrow(treatment_history), 553) expect_equal(ncol(metadata), 5) expect_equal(nrow(metadata), 1) expect_equal(ncol(attrition), 5) - expect_equal(nrow(attrition), 11) + expect_equal(nrow(attrition), 12) expect_equal(ncol(cdm_source_info), 10) expect_equal(nrow(cdm_source_info), 1)