diff --git a/R/CDMInterface.R b/R/CDMInterface.R index d2b70de8..d67d9bd7 100644 --- a/R/CDMInterface.R +++ b/R/CDMInterface.R @@ -60,17 +60,10 @@ fetchCohortTable <- function(cdm, cohorts, cohortTableName, andromeda, andromeda for (tableName in cohortTableName) { cdm$tp_temp_tbl <- cdm[[tableName]] %>% - dplyr::group_by(.data$subject_id) %>% - dplyr::mutate( - subject_id_origin = .data$subject_id - ) %>% dplyr::filter(.data$cohort_definition_id %in% cohortIds) %>% - dplyr::filter(!!CDMConnector::datediff("cohort_start_date", "cohort_end_date", interval = "day") >= minEraDuration) %>% - dplyr::group_by(.data$subject_id) %>% - dplyr::ungroup() %>% dplyr::inner_join( cdm$person, - by = dplyr::join_by(subject_id_origin == person_id) + by = dplyr::join_by(subject_id == person_id) ) %>% dplyr::inner_join( cdm$concept, @@ -79,9 +72,6 @@ fetchCohortTable <- function(cdm, cohorts, cohortTableName, andromeda, andromeda date_of_birth = as.Date(paste0(as.character(.data$year_of_birth), "-01-01"))) %>% dplyr::mutate( age = !!CDMConnector::datediff("date_of_birth", "cohort_start_date", interval = "year")) %>% - dplyr::mutate( - subject_id_origin = as.character(.data$subject_id_origin) - ) %>% dplyr::rename(sex = "concept_name") %>% dplyr::mutate( temp_date = as.Date("1970-01-01") @@ -93,7 +83,6 @@ fetchCohortTable <- function(cdm, cohorts, cohortTableName, andromeda, andromeda dplyr::select( "cohort_definition_id", "subject_id", - "subject_id_origin", "cohort_start_date", "cohort_end_date", "age", @@ -113,33 +102,50 @@ fetchCohortTable <- function(cdm, cohorts, cohortTableName, andromeda, andromeda cdm <- CDMConnector::dropSourceTable(cdm = cdm, name = "tp_temp_tbl") + # Filter by minEraDuration first (dates are integers = days since epoch) andromeda[[andromedaTableName]] <- andromeda[[andromedaTableName]] %>% - dplyr::mutate(r = dplyr::row_number()) %>% - dplyr::group_by(.data$subject_id_origin) %>% - dplyr::mutate( - subject_id = as.integer(min(.data$r, na.rm = TRUE)) - ) %>% - dplyr::select(-"r") + dplyr::mutate(cohort_definition_id = as.numeric(.data$cohort_definition_id)) %>% + dplyr::filter( + (.data$cohort_end_date - .data$cohort_start_date) >= minEraDuration + ) + + # Attrition: after minEraDuration filter + n <- andromeda[[andromedaTableName]] %>% + dplyr::group_by(.data$subject_id) %>% + dplyr::summarise(n = dplyr::n()) %>% + dplyr::pull() + + appendAttrition( + toAdd = data.frame( + number_records = sum(n), + number_subjects = length(n), + reason_id = 2, + reason = sprintf("Removing records < minEraDuration (%s)", minEraDuration), + time_stamp = as.numeric(Sys.time()) + ), + andromeda = andromeda + ) + # Filter to persons with at least one target cohort record targetId <- as.numeric(targetCohortIds) - + andromeda[[andromedaTableName]] <- andromeda[[andromedaTableName]] %>% - dplyr::mutate(cohort_definition_id = as.numeric(.data$cohort_definition_id)) %>% dplyr::group_by(.data$subject_id) %>% dplyr::filter(any(.data$cohort_definition_id %in% targetId, na.rm = TRUE)) %>% dplyr::ungroup() - + + # Attrition: after target cohort filter n <- andromeda[[andromedaTableName]] %>% dplyr::group_by(.data$subject_id) %>% dplyr::summarise(n = dplyr::n()) %>% dplyr::pull() - + appendAttrition( toAdd = data.frame( number_records = sum(n), number_subjects = length(n), - reason_id = 2, - reason = sprintf("Removing records < minEraDuration (%s)", minEraDuration), + reason_id = 3, + reason = "After filtering to persons with >=1 target cohort record", time_stamp = as.numeric(Sys.time()) ), andromeda = andromeda diff --git a/R/TreatmentPatterns-package.R b/R/TreatmentPatterns-package.R index 429a0709..427ebf5d 100644 --- a/R/TreatmentPatterns-package.R +++ b/R/TreatmentPatterns-package.R @@ -49,7 +49,6 @@ utils::globalVariables( "duration_max", "duration_median", "event_name", - "subject_id_origin", "person_id", "concept_id", "year_of_birth", diff --git a/R/computePathways.R b/R/computePathways.R index da96cf4b..52604e50 100644 --- a/R/computePathways.R +++ b/R/computePathways.R @@ -207,7 +207,7 @@ computePathways <- function( toAdd = data.frame( number_records = attrCounts$nRecords, number_subjects = attrCounts$nSubjects, - reason_id = 9, + reason_id = 11, reason = sprintf("treatment construction done") ), andromeda = andromeda diff --git a/R/constructPathways.R b/R/constructPathways.R index 2bd6b12b..009142fe 100755 --- a/R/constructPathways.R +++ b/R/constructPathways.R @@ -123,7 +123,7 @@ constructPathways <- function(settings, andromeda) { toAdd = data.frame( number_records = attrCounts$nRecords, number_subjects = attrCounts$nSubjects, - reason_id = 10, + reason_id = 12, reason = sprintf("Max path length (%s)", settings$maxPathLength) ), andromeda = andromeda @@ -252,17 +252,37 @@ createTreatmentHistory <- function( Andromeda::createIndex(andromeda$eventCohorts, c("personId", "startDate", "endDate")) Andromeda::createIndex(andromeda$targetCohorts, c("personId", "indexDate", "endDate")) - andromeda[[sprintf("cohortTable_%s", targetCohortId)]] <- dplyr::full_join( + # Join events to targets with date constraints in the join to avoid cartesian products + andromeda[[sprintf("cohortTable_%s", targetCohortId)]] <- dplyr::inner_join( x = andromeda$eventCohorts, y = andromeda$targetCohorts, by = dplyr::join_by( personId == personId, - subject_id_origin == subject_id_origin, - y$indexDate <= x$startDate, - x$startDate <= y$endDate, + startDate >= indexDate, + startDate <= endDate ), suffix = c("Event", "Target") ) + # Attrition: subjects with events within window + nTargetSubjects <- andromeda$targetCohorts %>% + dplyr::distinct(.data$personId) %>% + dplyr::summarise(n = dplyr::n()) %>% + dplyr::pull() + + attrCounts <- fetchAttritionCounts(andromeda, sprintf("cohortTable_%s", targetCohortId)) + nSubjectsWithEvents <- attrCounts$nSubjects + nSubjectsWithoutEvents <- nTargetSubjects - nSubjectsWithEvents + + appendAttrition( + toAdd = data.frame( + number_records = attrCounts$nRecords, + number_subjects = attrCounts$nSubjects, + reason_id = 4, + reason = sprintf("After joining events within window (%s subjects without events)", nSubjectsWithoutEvents) + ), + andromeda = andromeda + ) + andromeda$treatmentHistory <- andromeda[[sprintf("cohortTable_%s", targetCohortId)]] %>% dplyr::select( "personId", @@ -279,25 +299,7 @@ createTreatmentHistory <- function( dplyr::mutate( durationEra = .data$eventEndDate - .data$eventStartDate, eventCohortId = as.character(as.integer(.data$eventCohortId)) - ) %>% - dplyr::filter( - !is.na(.data$indexYear), - !is.na(.data$eventCohortId) ) - - attrCounts <- fetchAttritionCounts(andromeda, "treatmentHistory") - appendAttrition( - toAdd = data.frame( - number_records = attrCounts$nRecords, - number_subjects = attrCounts$nSubjects, - reason_id = 3, - reason = sprintf( - "Removing events outside window (%s: %s | %s: %s)", - startAnchor, windowStart, endAnchor, windowEnd - ) - ), - andromeda = andromeda - ) return(invisible(NULL)) } @@ -363,7 +365,7 @@ doSplitEventCohorts <- function( toAdd = data.frame( number_records = attrCounts$nRecords, number_subjects = attrCounts$nSubjects, - reason_id = 4, + reason_id = 6, reason = sprintf("splitEventCohorts") ), andromeda = andromeda @@ -439,7 +441,7 @@ doEraCollapseNew <- function(andromeda, eraCollapseSize) { toAdd = data.frame( number_records = attrCounts$nRecords, number_subjects = attrCounts$nSubjects, - reason_id = 5, + reason_id = 7, reason = sprintf("Iteration %s: Collapsing eras, eraCollapse (%s)", counter, eraCollapseSize) ), andromeda = andromeda @@ -453,7 +455,7 @@ doEraCollapseNew <- function(andromeda, eraCollapseSize) { toAdd = data.frame( number_records = attrCounts$nRecords, number_subjects = attrCounts$nSubjects, - reason_id = 5, + reason_id = 7, reason = sprintf("No eras needed Collapsing, eraCollapse (%s)", eraCollapseSize) ), andromeda = andromeda @@ -525,7 +527,7 @@ doEraCollapse <- function(andromeda, eraCollapseSize) { toAdd = data.frame( number_records = attrCounts$nRecords, number_subjects = attrCounts$nSubjects, - reason_id = 5, + reason_id = 7, reason = sprintf("Collapsing eras, eraCollapse (%s)", eraCollapseSize) ), andromeda = andromeda @@ -806,7 +808,7 @@ doCombinationWindow <- function( toAdd = data.frame( number_records = attrCounts$nRecords, number_subjects = attrCounts$nSubjects, - reason_id = 6, + reason_id = 8, reason = sprintf("Iteration %s: minPostCombinationDuration (%s), combinatinoWindow (%s)", iterations, minPostCombinationDuration, combinationWindow) ), andromeda = andromeda @@ -829,7 +831,7 @@ doCombinationWindow <- function( toAdd = data.frame( number_records = attrCounts$nRecords, number_subjects = attrCounts$nSubjects, - reason_id = 7, + reason_id = 9, reason = sprintf("After Combination") ), andromeda = andromeda @@ -996,7 +998,7 @@ doFilterTreatments <- function(andromeda, filterTreatments) { toAdd = data.frame( number_records = attrCounts$nRecords, number_subjects = attrCounts$nSubjects, - reason_id = 8, + reason_id = 10, reason = sprintf("filterTreatments (%s)", filterTreatments) ), andromeda = andromeda diff --git a/R/exportPatientLevel.R b/R/exportPatientLevel.R index d3d721f0..e0fd4a6c 100644 --- a/R/exportPatientLevel.R +++ b/R/exportPatientLevel.R @@ -1,10 +1,6 @@ addOriginId <- function(table) { - andromeda <- table$src$con - - andromeda$cohortTable %>% - dplyr::select("personId", "subject_id_origin") %>% - dplyr::inner_join(table, by = dplyr::join_by(personId == personId)) %>% - dplyr::distinct() + # personId already contains the original subject_id (no remapping) + table } writeTreatmentHistory <- function(andromeda, outputPath) { diff --git a/tests/testthat/test-exportPatientLevel.R b/tests/testthat/test-exportPatientLevel.R index 6875785f..0c0b456f 100644 --- a/tests/testthat/test-exportPatientLevel.R +++ b/tests/testthat/test-exportPatientLevel.R @@ -39,14 +39,14 @@ test_that("exportPatientLevel", { attrition <- read.csv(file.path(tempdir(), "attrition.csv")) cdm_source_info <- read.csv(file.path(tempdir(), "cdm_source_info.csv")) - expect_equal(ncol(treatment_history), 13) + expect_equal(ncol(treatment_history), 12) expect_equal(nrow(treatment_history), 553) expect_equal(ncol(metadata), 5) expect_equal(nrow(metadata), 1) expect_equal(ncol(attrition), 5) - expect_equal(nrow(attrition), 11) + expect_equal(nrow(attrition), 12) expect_equal(ncol(cdm_source_info), 10) expect_equal(nrow(cdm_source_info), 1)