diff --git a/.Rbuildignore b/.Rbuildignore index a152430..def897d 100644 --- a/.Rbuildignore +++ b/.Rbuildignore @@ -1,3 +1,5 @@ +^renv$ +^renv\.lock$ pom.xml extras docs diff --git a/DESCRIPTION b/DESCRIPTION index 4c1e6a3..9ceb6d4 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -1,8 +1,8 @@ Package: FeatureExtraction Type: Package Title: Generating Features for a Cohort -Version: 3.12.0 -Date: 2025-10-28 +Version: 3.13.0 +Date: 2026-02-28 Authors@R: c( person("Martijn", "Schuemie", , "schuemie@ohdsi.org", role = c("aut")), person("Marc", "Suchard", role = c("aut")), @@ -46,6 +46,6 @@ VignetteBuilder: knitr URL: https://github.com/OHDSI/FeatureExtraction BugReports: https://github.com/OHDSI/FeatureExtraction/issues NeedsCompilation: no -RoxygenNote: 7.3.2 +RoxygenNote: 7.3.3 Encoding: UTF-8 Language: en-US diff --git a/NEWS.md b/NEWS.md index 6ddbdc4..9bdbe65 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,3 +1,15 @@ +FeatureExtraction 3.13.0 +======================= + +New Features: + +- Added ability to store aggregate results from `getDbCovariateData` in the database and added +ability to control all target tables with new `target*Table` parameters (#152, #321) + +Bugfixes: + +- Fixed tests and made sure storage of covariates with `getDbDefaultCovariateData` works and is consistent + FeatureExtraction 3.12.0 ======================= diff --git a/R/GetCovariates.R b/R/GetCovariates.R index cf2793c..f2bf40f 100644 --- a/R/GetCovariates.R +++ b/R/GetCovariates.R @@ -71,6 +71,29 @@ #' privileges where temp tables can be created. #' @param covariateCohortDatabaseSchema The database schema where the cohorts used to define the covariates can be found. #' @param covariateCohortTable The table where the cohorts used to define the covariates can be found. +#' +#' @param exportToTable Whether to export to a table rather than Andromeda object +#' @param dropTableIfExists If targetDatabaseSchema, drop any existing tables. Otherwise, results are merged +#' into existing table data. Overides createTable. +#' @param createTable Run sql to create table? Code does not check if table exists. +#' @param targetDatabaseSchema (Optional) The name of the database schema where the resulting covariates +#' should be stored as a table. If not provided, results will be fetched to R. +#' @param targetCovariateTable (Optional) The name of the table where the resulting covariates will +#' be stored. If not provided, results will be fetched to R. The table can be +#' a permanent table in the \code{targetDatabaseSchema} or a temp table. If +#' it is a temp table, do not specify \code{targetDatabaseSchema}. +#' @param targetCovariateContinuousTable (Optional) The name of the table where the resulting continuous covariates will +#' be stored. If not provided, results will be fetched to R. The table can be +#' a permanent table in the \code{targetDatabaseSchema} or a temp table. If +#' it is a temp table, do not specify \code{targetDatabaseSchema}. +#' @param targetCovariateRefTable (Optional) The name of the table where the covariate reference will be stored. If +#' it is a temp table, do not specify \code{targetDatabaseSchema}. +#' +#' @param targetAnalysisRefTable (Optional) The name of the table where the analysis reference will be stored. If +#' it is a temp table, do not specify \code{targetDatabaseSchema}. +#' @param targetTimeRefTable (Optional) The name of the table for the time reference. If +#' it is a temp table, do not specify \code{targetDatabaseSchema}. +#' #' #' @return #' Returns an object of type \code{covariateData}, containing information on the covariates. @@ -113,6 +136,15 @@ getDbCovariateData <- function(connectionDetails = NULL, cohortIds = c(-1), rowIdField = "subject_id", covariateSettings, + exportToTable = FALSE, + createTable = exportToTable, + dropTableIfExists = exportToTable, + targetDatabaseSchema = NULL, + targetCovariateTable = NULL, + targetCovariateContinuousTable = NULL, + targetCovariateRefTable = NULL, + targetAnalysisRefTable = NULL, + targetTimeRefTable = NULL, aggregated = FALSE, minCharacterizationMean = 0, tempEmulationSchema = getOption("sqlRenderTempEmulationSchema"), @@ -155,6 +187,118 @@ getDbCovariateData <- function(connectionDetails = NULL, } else { cohortDatabaseSchemaTable <- paste(cohortDatabaseSchema, cohortTable, sep = ".") } + + # check for temporal features in any of the settings + if(inherits(covariateSettings, 'covariateSettings')){ + anyTemporal <- covariateSettings$temporal | covariateSettings$temporalSequence + } else{ + anyTemporal <- sum(unlist(lapply( + X = covariateSettings, + FUN = function(x){ + sum(c(x$temporal,x$temporalSequence)) == 1 + }))) > 0 + } + + # Create export tables + # figure out tables + if (exportToTable) { + if(is.null(targetDatabaseSchema)){ + # turn off create table since the tables are temp + tempOutputTables <- TRUE + # covariate tables + if (substr(targetCovariateTable, 1, 1) == "#") { + targetCovariateTable <- targetCovariateTable + } else { + targetCovariateTable <- paste0("#", targetCovariateTable) + } + # cov cont table + if (substr(targetCovariateContinuousTable, 1, 1) == "#") { + targetCovariateContinuousTable <- targetCovariateContinuousTable + } else { + targetCovariateContinuousTable <- paste0("#", targetCovariateContinuousTable) + } + # cov ref table + if (substr(targetCovariateRefTable, 1, 1) == "#") { + targetCovariateRefTable <- targetCovariateRefTable + } else { + targetCovariateRefTable <- paste0("#", targetCovariateRefTable) + } + # analysis ref table + if (substr(targetAnalysisRefTable, 1, 1) == "#") { + targetAnalysisRefTable <- targetAnalysisRefTable + } else { + targetAnalysisRefTable <- paste0("#", targetAnalysisRefTable) + } + # time ref table + if (substr(targetTimeRefTable, 1, 1) == "#") { + targetTimeRefTable <- targetTimeRefTable + } else { + targetTimeRefTable <- paste0("#", targetTimeRefTable) + } + + } else { + tempOutputTables <- FALSE + targetCovariateTable <- paste(targetDatabaseSchema, targetCovariateTable, sep = ".") + targetCovariateContinuousTable <- paste(targetDatabaseSchema, targetCovariateContinuousTable, sep = ".") + targetCovariateRefTable <- paste(targetDatabaseSchema, targetCovariateRefTable, sep = ".") + targetAnalysisRefTable <- paste(targetDatabaseSchema, targetAnalysisRefTable, sep = ".") + targetTimeRefTable <- paste(targetDatabaseSchema, targetTimeRefTable, sep = ".") + } + + # drop table if required + if(dropTableIfExists){ + message('Dropping export tables') + sql <- SqlRender::loadRenderTranslateSql( + sqlFilename = 'DropExportTables.sql', + packageName = 'FeatureExtraction', + dbms = attr(connection, "dbms"), + tempEmulationSchema = tempEmulationSchema, + temp_tables = tempOutputTables, + covariate_table = targetCovariateTable, + covariate_continuous_table = targetCovariateContinuousTable, + covariate_ref_table = targetCovariateRefTable, + analysis_ref_table = targetAnalysisRefTable, + time_ref_table = targetTimeRefTable + ) + + DatabaseConnector::executeSql( + connection = connection, + sql = sql + ) + } + + if(dropTableIfExists & !createTable){ + stop('Seem to be exporting to tables but create table is FALSE and dropTable is TRUE') + } + + # create the cohort tables if required + if(createTable){ + message('Creating export tables') + sql <- SqlRender::loadRenderTranslateSql( + sqlFilename = 'CreateExportTables.sql', + packageName = 'FeatureExtraction', + dbms = attr(connection, "dbms"), + tempEmulationSchema = tempEmulationSchema, + + aggregated = aggregated, + temporal = anyTemporal, + row_id_field = 'row_id', + + covariate_table = targetCovariateTable, + covariate_continuous_table = targetCovariateContinuousTable, + covariate_ref_table = targetCovariateRefTable, + analysis_ref_table = targetAnalysisRefTable, + time_ref_table = targetTimeRefTable + ) + + DatabaseConnector::executeSql( + connection = connection, + sql = sql + ) + } + + } + sql <- "SELECT cohort_definition_id, COUNT_BIG(*) AS population_size FROM @cohort_database_schema_table {@cohort_ids != -1} ? {WHERE cohort_definition_id IN (@cohort_ids)} GROUP BY cohort_definition_id;" sql <- SqlRender::render( sql = sql, @@ -192,6 +336,7 @@ getDbCovariateData <- function(connectionDetails = NULL, covariateCohortTable ) } + for (i in 1:length(covariateSettings)) { fun <- attr(covariateSettings[[i]], "fun") args <- list( @@ -203,6 +348,11 @@ getDbCovariateData <- function(connectionDetails = NULL, cdmVersion = cdmVersion, rowIdField = rowIdField, covariateSettings = covariateSettings[[i]], + targetCovariateTable = targetCovariateTable, + targetCovariateContinuousTable = targetCovariateContinuousTable, + targetCovariateRefTable = targetCovariateRefTable, + targetAnalysisRefTable = targetAnalysisRefTable, + targetTimeRefTable = targetTimeRefTable, aggregated = aggregated, minCharacterizationMean = minCharacterizationMean ) @@ -224,9 +374,15 @@ getDbCovariateData <- function(connectionDetails = NULL, } else if (hasData(tempCovariateData$covariatesContinuous)) { covariateData$covariatesContinuous <- tempCovariateData$covariatesContinuous } - - Andromeda::appendToTable(covariateData$covariateRef, tempCovariateData$covariateRef) - Andromeda::appendToTable(covariateData$analysisRef, tempCovariateData$analysisRef) + + if(hasData(tempCovariateData$covariateRef)){ + Andromeda::appendToTable(covariateData$covariateRef, tempCovariateData$covariateRef) + } + if(hasData(tempCovariateData$analysisRef)){ + Andromeda::appendToTable(covariateData$analysisRef, tempCovariateData$analysisRef) + } + + if(!exportToTable){ for (name in names(attr(tempCovariateData, "metaData"))) { if (is.null(attr(covariateData, "metaData")[[name]])) { attr(covariateData, "metaData")[[name]] <- attr(tempCovariateData, "metaData")[[name]] @@ -239,11 +395,15 @@ getDbCovariateData <- function(connectionDetails = NULL, ) } } + } # if not exporting } } } - attr(covariateData, "metaData")$populationSize <- populationSize - attr(covariateData, "metaData")$cohortIds <- cohortIds + + if(!is.null(covariateData)){ + attr(covariateData, "metaData")$populationSize <- populationSize + attr(covariateData, "metaData")$cohortIds <- cohortIds + } } - return(covariateData) + return(invisible(covariateData)) } diff --git a/R/GetCovariatesFromOtherCohorts.R b/R/GetCovariatesFromOtherCohorts.R index 947efc9..f295d77 100644 --- a/R/GetCovariatesFromOtherCohorts.R +++ b/R/GetCovariatesFromOtherCohorts.R @@ -22,6 +22,18 @@ #' @param covariateSettings An object of type \code{covariateSettings} as created using the #' \code{\link{createCohortBasedCovariateSettings}} or #' \code{\link{createCohortBasedTemporalCovariateSettings}} functions. +#' @param targetDatabaseSchema (Optional) The name of the database schema where the resulting covariates +#' should be stored. If not provided, results will be fetched to R. +#' @param targetCovariateTable (Optional) The name of the table where the resulting covariates will +#' be stored. If not provided, results will be fetched to R. The table can be +#' a permanent table in the \code{targetDatabaseSchema} or a temp table. If +#' it is a temp table, do not specify \code{targetDatabaseSchema}. +#' +#' @param targetCovariateContinuousTable (Optional) The name of the table where the resulting continuous covariates should be stored. +#' @param targetCovariateRefTable (Optional) The name of the table where the covariate reference will be stored. +#' +#' @param targetAnalysisRefTable (Optional) The name of the table where the analysis reference will be stored. +#' @param targetTimeRefTable (Optional) The name of the table for the time reference #' @param minCharacterizationMean The minimum mean value for binary characterization output. Values below this will be cut off from output. This #' will help reduce the file size of the characterization output, but will remove information #' on covariates that have very low values. The default is 0. @@ -37,6 +49,12 @@ getDbCohortBasedCovariatesData <- function(connection, cdmVersion = "5", rowIdField = "subject_id", covariateSettings, + targetDatabaseSchema = NULL, + targetCovariateTable = NULL, + targetCovariateContinuousTable = NULL, + targetCovariateRefTable = NULL, + targetAnalysisRefTable = NULL, + targetTimeRefTable = NULL, aggregated = FALSE, minCharacterizationMean = 0, tempEmulationSchema = getOption("sqlRenderTempEmulationSchema")) { @@ -153,6 +171,13 @@ getDbCohortBasedCovariatesData <- function(connection, cdmVersion = cdmVersion, rowIdField = rowIdField, covariateSettings = detailledSettings, + + targetCovariateTable = targetCovariateTable, + targetCovariateContinuousTable = targetCovariateContinuousTable, + targetCovariateRefTable = targetCovariateRefTable, + targetAnalysisRefTable = targetAnalysisRefTable, + targetTimeRefTable = targetTimeRefTable, + aggregated = aggregated, minCharacterizationMean = minCharacterizationMean ) diff --git a/R/GetDefaultCovariates.R b/R/GetDefaultCovariates.R index 86a09c8..6243635 100644 --- a/R/GetDefaultCovariates.R +++ b/R/GetDefaultCovariates.R @@ -24,17 +24,19 @@ #' @param covariateSettings Either an object of type \code{covariateSettings} as created using one #' of the createCovariate functions, or a list of such objects. #' @param targetDatabaseSchema (Optional) The name of the database schema where the resulting covariates -#' should be stored. +#' should be stored. If not provided, results will be fetched to R. #' @param targetCovariateTable (Optional) The name of the table where the resulting covariates will #' be stored. If not provided, results will be fetched to R. The table can be #' a permanent table in the \code{targetDatabaseSchema} or a temp table. If #' it is a temp table, do not specify \code{targetDatabaseSchema}. +#' @param targetCovariateContinuousTable (Optional) The name of the table where the resulting continuous covariates should be stored. #' @param targetCovariateRefTable (Optional) The name of the table where the covariate reference will be stored. +#' #' @param targetAnalysisRefTable (Optional) The name of the table where the analysis reference will be stored. +#' @param targetTimeRefTable (Optional) The name of the table for the time reference #' @param minCharacterizationMean The minimum mean value for binary characterization output. Values below this will be cut off from output. This #' will help reduce the file size of the characterization output, but will remove information #' on covariates that have very low values. The default is 0. -#' #' @template GetCovarParams #' #' @examples @@ -52,9 +54,7 @@ #' connection = connection, #' cdmDatabaseSchema = "main", #' cohortTable = "cohort", -#' covariateSettings = createDefaultCovariateSettings(), -#' targetDatabaseSchema = "main", -#' targetCovariateTable = "ut_cov" +#' covariateSettings = createDefaultCovariateSettings() #' ) #' } #' @export @@ -67,10 +67,12 @@ getDbDefaultCovariateData <- function(connection, cdmVersion = "5", rowIdField = "subject_id", covariateSettings, - targetDatabaseSchema, - targetCovariateTable, - targetCovariateRefTable, - targetAnalysisRefTable, + targetDatabaseSchema = NULL, + targetCovariateTable = NULL, + targetCovariateContinuousTable = NULL, + targetCovariateRefTable = NULL, + targetAnalysisRefTable = NULL, + targetTimeRefTable = NULL, aggregated = FALSE, minCharacterizationMean = 0, tempEmulationSchema = getOption("sqlRenderTempEmulationSchema")) { @@ -80,9 +82,7 @@ getDbDefaultCovariateData <- function(connection, if (cdmVersion == "4") { stop("Common Data Model version 4 is not supported") } - if (!missing(targetCovariateTable) && !is.null(targetCovariateTable) && aggregated) { - stop("Writing aggregated results to database is currently not supported") - } + if (!missing(cohortId)) { warning("cohortId argument has been deprecated, please use cohortIds") cohortIds <- cohortId @@ -98,6 +98,19 @@ getDbDefaultCovariateData <- function(connection, minCharacterizationMean <- utils::type.convert(minCharacterizationMean, as.is = TRUE) checkmate::assertNumeric(x = minCharacterizationMean, lower = 0, upper = 1, add = errorMessages) checkmate::reportAssertions(collection = errorMessages) + + + targetTables <- list( + covariates = targetCovariateTable, + covariatesContinuous = targetCovariateContinuousTable, + covariateRef = targetCovariateRefTable, + analysisRef = targetAnalysisRefTable, + timeRef = targetTimeRefTable + ) + # Is the target schema missing or are all the specified tables temp + allTempTables <- all(substr(targetTables,1,1) == "#") + extractToAndromeda <- is.null(targetCovariateTable) + settings <- .toJson(covariateSettings) rJava::J("org.ohdsi.featureExtraction.FeatureExtraction")$init(system.file("", package = "FeatureExtraction")) @@ -129,34 +142,79 @@ getDbDefaultCovariateData <- function(connection, profile <- (!is.null(getOption("dbProfile")) && getOption("dbProfile") == TRUE) DatabaseConnector::executeSql(connection, sql, profile = profile) - if (missing(targetCovariateTable) || is.null(targetCovariateTable)) { - ParallelLogger::logInfo("Fetching data from server") - start <- Sys.time() - # Binary or non-aggregated features - covariateData <- Andromeda::andromeda() - if (!is.null(todo$sqlQueryFeatures)) { - sql <- SqlRender::translate( - sql = todo$sqlQueryFeatures, - targetDialect = attr(connection, "dbms"), - tempEmulationSchema = tempEmulationSchema - ) - + # Now we extract the results into Andromeda tables or as tables + ParallelLogger::logInfo("Fetching data from server") + start <- Sys.time() + covariateData <- Andromeda::andromeda() + + # Binary or non-aggregated features + if (!is.null(todo$sqlQueryFeatures)) { + + # etracting covariate table + if(extractToAndromeda){ + sql <- SqlRender::translate(sql = todo$sqlQueryFeatures, + targetDialect = attr(connection, "dbms"), + tempEmulationSchema = tempEmulationSchema) DatabaseConnector::querySqlToAndromeda( connection = connection, sql = sql, andromeda = covariateData, andromedaTableName = "covariates", snakeCaseToCamelCase = TRUE + ) + } else{ + + # for testing to see column order + #print(todo$sqlQueryFeatures) + + sql <- " + INSERT INTO @target_covariate_table( + + {@temporal | @temporal_sequence} ? {time_id,} + + {@aggregated}?{ + cohort_definition_id, + covariate_id, + sum_value, + average_value + }:{ + covariate_id, + row_id, + covariate_value + } + + ) @sub_query; " + + sql <- SqlRender::render( + sql = sql, + target_covariate_table = targetTables$covariates, + sub_query = gsub(";", "", todo$sqlQueryFeatures), + temporal = covariateSettings$temporal, + temporal_sequence = covariateSettings$temporalSequence, + aggregated = aggregated ) - } - - # Continuous aggregated features - if (!is.null(todo$sqlQueryContinuousFeatures)) { + sql <- SqlRender::translate( - sql = todo$sqlQueryContinuousFeatures, - targetDialect = attr(connection, "dbms"), + sql = sql, + targetDialect = DatabaseConnector::dbms(connection), tempEmulationSchema = tempEmulationSchema ) + DatabaseConnector::executeSql( + connection = connection, + sql = sql + ) + + } + + } + + # Continuous aggregated features + if (!is.null(todo$sqlQueryContinuousFeatures)) { + + if(extractToAndromeda){ + sql <- SqlRender::translate(sql = todo$sqlQueryContinuousFeatures, + targetDialect = attr(connection, "dbms"), + tempEmulationSchema = tempEmulationSchema) DatabaseConnector::querySqlToAndromeda( connection = connection, sql = sql, @@ -164,113 +222,209 @@ getDbDefaultCovariateData <- function(connection, andromedaTableName = "covariatesContinuous", snakeCaseToCamelCase = TRUE ) - } - - # Covariate reference - sql <- SqlRender::translate( - sql = todo$sqlQueryFeatureRef, - targetDialect = attr(connection, "dbms"), - tempEmulationSchema = tempEmulationSchema - ) - - DatabaseConnector::querySqlToAndromeda( - connection = connection, - sql = sql, - andromeda = covariateData, - andromedaTableName = "covariateRef", - snakeCaseToCamelCase = TRUE - ) - collisions <- covariateData$covariateRef %>% - filter(collisions > 0) %>% - collect() - if (nrow(collisions) > 0) { - warning(sprintf( - "Collisions in covariate IDs detected for post-coordinated concepts with covariate IDs %s", - paste(collisions$covariateId, paste = ", ") - )) - } - - # Analysis reference - sql <- SqlRender::translate( - sql = todo$sqlQueryAnalysisRef, - targetDialect = attr(connection, "dbms"), - tempEmulationSchema = tempEmulationSchema - ) - DatabaseConnector::querySqlToAndromeda( - connection = connection, - sql = sql, - andromeda = covariateData, - andromedaTableName = "analysisRef", - snakeCaseToCamelCase = TRUE - ) - - # Time reference - if (!is.null(todo$sqlQueryTimeRef)) { + } else{ + sql <- " + INSERT INTO @target_covariate_continuous_table( + {@aggregated}?{ + + cohort_definition_id, + covariate_id, + {@temporal | @temporal_sequence} ? {time_id,} + count_value, + min_value, + max_value, + average_value, + standard_deviation, + median_value, + p10_value, + p25_value, + p75_value, + p90_value + + }:{ + + covariate_id, + {@temporal | @temporal_sequence} ? {time_id,} + row_id, + covariate_value + + } + + ) @sub_query;" + + sql <- SqlRender::render( + sql = sql, + target_covariate_continuous_table = targetTables$covariatesContinuous, + sub_query = gsub(";", "", todo$sqlQueryContinuousFeatures), + temporal = covariateSettings$temporal, + temporal_sequence = covariateSettings$temporalSequence, + aggregated = aggregated + ) + sql <- SqlRender::translate( - sql = todo$sqlQueryTimeRef, - targetDialect = attr(connection, "dbms"), + sql = sql, + targetDialect = DatabaseConnector::dbms(connection), tempEmulationSchema = tempEmulationSchema ) + DatabaseConnector::executeSql( + connection = connection, + sql = sql + ) + } + + } + + # Covariate reference + if (!is.null(todo$sqlQueryFeatureRef)) { + + if(extractToAndromeda){ + sql <- SqlRender::translate(sql = todo$sqlQueryFeatureRef, + targetDialect = attr(connection, "dbms"), + tempEmulationSchema = tempEmulationSchema) DatabaseConnector::querySqlToAndromeda( connection = connection, sql = sql, andromeda = covariateData, - andromedaTableName = "timeRef", + andromedaTableName = "covariateRef", snakeCaseToCamelCase = TRUE ) - } - - - delta <- Sys.time() - start - ParallelLogger::logInfo("Fetching data took ", signif(delta, 3), " ", attr(delta, "units")) - } else { - # Don't fetch to R , but create on server instead - ParallelLogger::logInfo("Writing data to table") - start <- Sys.time() - convertQuery <- function(sql, databaseSchema, table) { - if (missing(databaseSchema) || is.null(databaseSchema)) { - tableName <- table - } else { - tableName <- paste(databaseSchema, table, sep = ".") - } - return(sub("FROM", paste("INTO", tableName, "FROM"), sql)) - } - # Covariates - if (!is.null(todo$sqlQueryFeatures)) { - sql <- convertQuery(todo$sqlQueryFeatures, targetDatabaseSchema, targetCovariateTable) + collisions <- covariateData$covariateRef %>% + dplyr::filter(collisions > 0) %>% + dplyr::collect() + + if (nrow(collisions) > 0) { + warning(sprintf( + "Collisions in covariate IDs detected for post-coordinated concepts with covariate IDs %s", + paste(collisions$covariateId, paste = ", ") + )) + } + } else{ + sql <- " + INSERT INTO @target_covariate_ref_table( + covariate_id, + covariate_name, + analysis_id, + concept_id, + value_as_concept_id, + collisions + ) @sub_query ;" + + sql <- SqlRender::render( + sql = sql, + target_covariate_ref_table = targetTables$covariateRef, + sub_query = gsub(";", "", todo$sqlQueryFeatureRef) + ) + sql <- SqlRender::translate( sql = sql, - targetDialect = attr(connection, "dbms"), + targetDialect = DatabaseConnector::dbms(connection), tempEmulationSchema = tempEmulationSchema ) - DatabaseConnector::executeSql(connection, sql, progressBar = FALSE, reportOverallTime = FALSE) + DatabaseConnector::executeSql( + connection = connection, + sql = sql + ) } + + } + - # Covariate reference - if (!missing(targetCovariateRefTable) && !is.null(targetCovariateRefTable)) { - sql <- convertQuery(todo$sqlQueryFeatureRef, targetDatabaseSchema, targetCovariateRefTable) + # Analysis reference + if (!is.null(todo$sqlQueryAnalysisRef)) { + + if(extractToAndromeda){ + sql <- SqlRender::translate(sql = todo$sqlQueryAnalysisRef, + targetDialect = attr(connection, "dbms"), + tempEmulationSchema = tempEmulationSchema) + DatabaseConnector::querySqlToAndromeda( + connection = connection, + sql = sql, + andromeda = covariateData, + andromedaTableName = "analysisRef", + snakeCaseToCamelCase = TRUE + ) + } else{ + sql <- " + INSERT INTO @target_analysis_ref_table( + analysis_id, + analysis_name, + domain_id, + {!@temporal} ? { + start_day, + end_day, + } + is_binary, + missing_means_zero + ) @sub_query ;" + + sql <- SqlRender::render( + sql = sql, + target_analysis_ref_table = targetTables$analysisRef, + sub_query = gsub(";", "", todo$sqlQueryAnalysisRef), + temporal = covariateSettings$temporal | covariateSettings$temporalSequence + ) + sql <- SqlRender::translate( sql = sql, - targetDialect = attr(connection, "dbms"), + targetDialect = DatabaseConnector::dbms(connection), tempEmulationSchema = tempEmulationSchema ) - DatabaseConnector::executeSql(connection, sql, progressBar = FALSE, reportOverallTime = FALSE) + DatabaseConnector::executeSql( + connection = connection, + sql = sql + ) } + + } + - # Analysis reference - if (!missing(targetAnalysisRefTable) && !is.null(targetAnalysisRefTable)) { - sql <- convertQuery(todo$sqlQueryAnalysisRef, targetDatabaseSchema, targetAnalysisRefTable) + # Time reference + if (!is.null(todo$sqlQueryTimeRef)) { + + if(extractToAndromeda){ + sql <- SqlRender::translate(sql = todo$sqlQueryTimeRef, + targetDialect = attr(connection, "dbms"), + tempEmulationSchema = tempEmulationSchema) + DatabaseConnector::querySqlToAndromeda( + connection = connection, + sql = sql, + andromeda = covariateData, + andromedaTableName = "timeRef", + snakeCaseToCamelCase = TRUE + ) + } else{ + # TODO - what columns are in time ref table?! + sql <- " + INSERT INTO @target_time_ref_table( + time_part, + time_interval, + sequence_start_day, + sequence_end_day + ) @sub_query;" + + sql <- SqlRender::render( + sql = sql, + target_covariate_ref_table = targetTables$timeRef, + sub_query = gsub(";", "", todo$sqlQueryTimeRef) + ) + sql <- SqlRender::translate( sql = sql, - targetDialect = attr(connection, "dbms"), + targetDialect = DatabaseConnector::dbms(connection), tempEmulationSchema = tempEmulationSchema ) - DatabaseConnector::executeSql(connection, sql, progressBar = FALSE, reportOverallTime = FALSE) + DatabaseConnector::executeSql( + connection = connection, + sql = sql + ) } - delta <- Sys.time() - start - ParallelLogger::logInfo("Writing data took", signif(delta, 3), " ", attr(delta, "units")) + } + + delta <- Sys.time() - start + ParallelLogger::logInfo("Fetching data took ", signif(delta, 3), " ", attr(delta, "units")) + # Drop temp tables sql <- SqlRender::translate( sql = todo$sqlCleanup, @@ -291,7 +445,7 @@ getDbDefaultCovariateData <- function(connection, } } - if (missing(targetCovariateTable) || is.null(targetCovariateTable)) { + if (extractToAndromeda) { attr(covariateData, "metaData") <- list() if (is.null(covariateData$covariates) && is.null(covariateData$covariatesContinuous)) { warning("No data found, probably because no covariates were specified.") @@ -304,5 +458,7 @@ getDbDefaultCovariateData <- function(connection, class(covariateData) <- "CovariateData" attr(class(covariateData), "package") <- "FeatureExtraction" return(covariateData) + } else{ + return(invisible(NULL)) } } diff --git a/R/UnitTestHelperFunctions.R b/R/UnitTestHelperFunctions.R index 205b598..d2bce2b 100644 --- a/R/UnitTestHelperFunctions.R +++ b/R/UnitTestHelperFunctions.R @@ -59,6 +59,7 @@ #' @param minCharacterizationMean The minimum mean value for binary characterization output. Values below this will be cut off from output. This #' will help reduce the file size of the characterization output, but will remove information #' on covariates that have very low values. The default is 0. +#' @param ... Additional arguments, not used. #' @return #' Returns an object of type \code{covariateData}, containing information on the covariates. #' @@ -71,7 +72,8 @@ rowIdField = "subject_id", covariateSettings, aggregated = FALSE, - minCharacterizationMean = 0) { + minCharacterizationMean = 0, + ...) { writeLines("Constructing length of observation covariates") if (covariateSettings$useLengthOfObs == FALSE) { return(NULL) diff --git a/inst/sql/sql_server/CreateExportTables.sql b/inst/sql/sql_server/CreateExportTables.sql new file mode 100644 index 0000000..7065214 --- /dev/null +++ b/inst/sql/sql_server/CreateExportTables.sql @@ -0,0 +1,62 @@ +CREATE TABLE @covariate_table ( + covariate_id BIGINT, + {@temporal}?{time_id BIGINT,} + {@aggregated}?{ + cohort_definition_id BIGINT, + sum_value BIGINT, + average_value FLOAT + }:{ + @row_id_field BIGINT, + covariate_value INT + } + ); + +CREATE TABLE @covariate_continuous_table ( + covariate_id BIGINT, + {@temporal}?{time_id BIGINT,} + + {@aggregated}?{ + cohort_definition_id BIGINT, + count_value BIGINT, + min_value FLOAT, + max_value FLOAT, + average_value FLOAT, + standard_deviation FLOAT, + median_value FLOAT, + p10_value FLOAT, + p25_value FLOAT, + p75_value FLOAT, + p90_value FLOAT + }:{ + @row_id_field BIGINT, + covariate_value FLOAT + } + + ); + +CREATE TABLE @covariate_ref_table ( + covariate_id BIGINT, + covariate_name VARCHAR(512), + analysis_id INT, + concept_id INT, + value_as_concept_id INT, + collisions INT + ); + +CREATE TABLE @analysis_ref_table ( + analysis_id BIGINT, + analysis_name VARCHAR(512), + domain_id VARCHAR(20), + start_day INT, + end_day INT, + is_binary VARCHAR(1), + missing_means_zero VARCHAR(1) + ); + + +CREATE TABLE @time_ref_table ( + time_part VARCHAR(20), + time_interval BIGINT, + sequence_start_day BIGINT, + sequence_end_day BIGINT + ); \ No newline at end of file diff --git a/inst/sql/sql_server/DemographicsTime.sql b/inst/sql/sql_server/DemographicsTime.sql index c509b32..38d6784 100644 --- a/inst/sql/sql_server/DemographicsTime.sql +++ b/inst/sql/sql_server/DemographicsTime.sql @@ -36,20 +36,20 @@ FROM ( cohort.@row_id_field AS row_id, } {@sub_type == 'priorObservation'} ? { - DATEDIFF(DAY, observation_period_start_date, cohort_start_date) AS days + DATEDIFF(DAY, op.observation_period_start_date, cohort_start_date) AS days } {@sub_type == 'postObservation'} ? { - DATEDIFF(DAY, cohort_start_date, observation_period_end_date) AS days + DATEDIFF(DAY, cohort_start_date, op.observation_period_end_date) AS days } {@sub_type == 'inCohort'} ? { DATEDIFF(DAY, cohort_start_date, cohort_end_date) AS days } FROM @cohort_table cohort {@sub_type != 'inCohort'} ? { - INNER JOIN @cdm_database_schema.observation_period - ON cohort.subject_id = observation_period.person_id - AND observation_period_start_date <= cohort_start_date - AND observation_period_end_date >= cohort_start_date + INNER JOIN @cdm_database_schema.observation_period op + ON op.person_id = cohort.subject_id + AND op.observation_period_start_date <= cohort_start_date + AND op.observation_period_end_date >= cohort_start_date } {@cohort_definition_id != -1} ? { WHERE cohort.cohort_definition_id IN (@cohort_definition_id)} ) raw_data; diff --git a/inst/sql/sql_server/DropExportTables.sql b/inst/sql/sql_server/DropExportTables.sql new file mode 100644 index 0000000..9b91034 --- /dev/null +++ b/inst/sql/sql_server/DropExportTables.sql @@ -0,0 +1,16 @@ +{@temp_tables}?{ +IF OBJECT_ID('tempdb..@covariate_table', 'U') IS NOT NULL DROP TABLE @covariate_table; +IF OBJECT_ID('tempdb..@covariate_continuous_table', 'U') IS NOT NULL DROP TABLE @covariate_continuous_table; +IF OBJECT_ID('tempdb..@covariate_ref_table', 'U') IS NOT NULL DROP TABLE @covariate_ref_table; +IF OBJECT_ID('tempdb..@analysis_ref_table', 'U') IS NOT NULL DROP TABLE @analysis_ref_table; +IF OBJECT_ID('tempdb..@time_ref_table', 'U') IS NOT NULL DROP TABLE @time_ref_table; +}:{ +DROP TABLE IF EXISTS @covariate_table; +DROP TABLE IF EXISTS @covariate_continuous_table; +DROP TABLE IF EXISTS @covariate_ref_table; +DROP TABLE IF EXISTS @analysis_ref_table; +DROP TABLE IF EXISTS @time_ref_table; +} + + + diff --git a/man/dot-getDbLooCovariateData.Rd b/man/dot-getDbLooCovariateData.Rd index ab94c1a..7524281 100644 --- a/man/dot-getDbLooCovariateData.Rd +++ b/man/dot-getDbLooCovariateData.Rd @@ -14,7 +14,8 @@ rowIdField = "subject_id", covariateSettings, aggregated = FALSE, - minCharacterizationMean = 0 + minCharacterizationMean = 0, + ... ) } \arguments{ @@ -54,6 +55,8 @@ cohort entry?} \item{minCharacterizationMean}{The minimum mean value for binary characterization output. Values below this will be cut off from output. This will help reduce the file size of the characterization output, but will remove information on covariates that have very low values. The default is 0.} + +\item{...}{Additional arguments, not used.} } \value{ Returns an object of type \code{covariateData}, containing information on the covariates. diff --git a/man/getDbCohortBasedCovariatesData.Rd b/man/getDbCohortBasedCovariatesData.Rd index 933bcd0..510934b 100644 --- a/man/getDbCohortBasedCovariatesData.Rd +++ b/man/getDbCohortBasedCovariatesData.Rd @@ -14,6 +14,12 @@ getDbCohortBasedCovariatesData( cdmVersion = "5", rowIdField = "subject_id", covariateSettings, + targetDatabaseSchema = NULL, + targetCovariateTable = NULL, + targetCovariateContinuousTable = NULL, + targetCovariateRefTable = NULL, + targetAnalysisRefTable = NULL, + targetTimeRefTable = NULL, aggregated = FALSE, minCharacterizationMean = 0, tempEmulationSchema = getOption("sqlRenderTempEmulationSchema") @@ -54,6 +60,22 @@ is more than one period per person.} \code{\link{createCohortBasedCovariateSettings}} or \code{\link{createCohortBasedTemporalCovariateSettings}} functions.} +\item{targetDatabaseSchema}{(Optional) The name of the database schema where the resulting covariates +should be stored. If not provided, results will be fetched to R.} + +\item{targetCovariateTable}{(Optional) The name of the table where the resulting covariates will +be stored. If not provided, results will be fetched to R. The table can be +a permanent table in the \code{targetDatabaseSchema} or a temp table. If +it is a temp table, do not specify \code{targetDatabaseSchema}.} + +\item{targetCovariateContinuousTable}{(Optional) The name of the table where the resulting continuous covariates should be stored.} + +\item{targetCovariateRefTable}{(Optional) The name of the table where the covariate reference will be stored.} + +\item{targetAnalysisRefTable}{(Optional) The name of the table where the analysis reference will be stored.} + +\item{targetTimeRefTable}{(Optional) The name of the table for the time reference} + \item{aggregated}{Should aggregate statistics be computed instead of covariates per cohort entry?} diff --git a/man/getDbCovariateData.Rd b/man/getDbCovariateData.Rd index 92aa19b..b2b2fb5 100644 --- a/man/getDbCovariateData.Rd +++ b/man/getDbCovariateData.Rd @@ -17,6 +17,15 @@ getDbCovariateData( cohortIds = c(-1), rowIdField = "subject_id", covariateSettings, + exportToTable = FALSE, + createTable = exportToTable, + dropTableIfExists = exportToTable, + targetDatabaseSchema = NULL, + targetCovariateTable = NULL, + targetCovariateContinuousTable = NULL, + targetCovariateRefTable = NULL, + targetAnalysisRefTable = NULL, + targetTimeRefTable = NULL, aggregated = FALSE, minCharacterizationMean = 0, tempEmulationSchema = getOption("sqlRenderTempEmulationSchema"), @@ -69,6 +78,35 @@ there is more than one period per person.} \item{covariateSettings}{Either an object of type \code{covariateSettings} as created using one of the createCovariate functions, or a list of such objects.} +\item{exportToTable}{Whether to export to a table rather than Andromeda object} + +\item{createTable}{Run sql to create table? Code does not check if table exists.} + +\item{dropTableIfExists}{If targetDatabaseSchema, drop any existing tables. Otherwise, results are merged +into existing table data. Overides createTable.} + +\item{targetDatabaseSchema}{(Optional) The name of the database schema where the resulting covariates +should be stored as a table. If not provided, results will be fetched to R.} + +\item{targetCovariateTable}{(Optional) The name of the table where the resulting covariates will +be stored. If not provided, results will be fetched to R. The table can be +a permanent table in the \code{targetDatabaseSchema} or a temp table. If +it is a temp table, do not specify \code{targetDatabaseSchema}.} + +\item{targetCovariateContinuousTable}{(Optional) The name of the table where the resulting continuous covariates will +be stored. If not provided, results will be fetched to R. The table can be +a permanent table in the \code{targetDatabaseSchema} or a temp table. If +it is a temp table, do not specify \code{targetDatabaseSchema}.} + +\item{targetCovariateRefTable}{(Optional) The name of the table where the covariate reference will be stored. If +it is a temp table, do not specify \code{targetDatabaseSchema}.} + +\item{targetAnalysisRefTable}{(Optional) The name of the table where the analysis reference will be stored. If +it is a temp table, do not specify \code{targetDatabaseSchema}.} + +\item{targetTimeRefTable}{(Optional) The name of the table for the time reference. If +it is a temp table, do not specify \code{targetDatabaseSchema}.} + \item{aggregated}{Should aggregate statistics be computed instead of covariates per cohort entry? If aggregated is set to FALSE, the results returned will be based on each subject_id and cohort_start_date in your cohort table. If your cohort diff --git a/man/getDbDefaultCovariateData.Rd b/man/getDbDefaultCovariateData.Rd index 5ac6df8..99dda8c 100644 --- a/man/getDbDefaultCovariateData.Rd +++ b/man/getDbDefaultCovariateData.Rd @@ -14,10 +14,12 @@ getDbDefaultCovariateData( cdmVersion = "5", rowIdField = "subject_id", covariateSettings, - targetDatabaseSchema, - targetCovariateTable, - targetCovariateRefTable, - targetAnalysisRefTable, + targetDatabaseSchema = NULL, + targetCovariateTable = NULL, + targetCovariateContinuousTable = NULL, + targetCovariateRefTable = NULL, + targetAnalysisRefTable = NULL, + targetTimeRefTable = NULL, aggregated = FALSE, minCharacterizationMean = 0, tempEmulationSchema = getOption("sqlRenderTempEmulationSchema") @@ -58,17 +60,21 @@ is more than one period per person.} of the createCovariate functions, or a list of such objects.} \item{targetDatabaseSchema}{(Optional) The name of the database schema where the resulting covariates -should be stored.} +should be stored. If not provided, results will be fetched to R.} \item{targetCovariateTable}{(Optional) The name of the table where the resulting covariates will be stored. If not provided, results will be fetched to R. The table can be a permanent table in the \code{targetDatabaseSchema} or a temp table. If it is a temp table, do not specify \code{targetDatabaseSchema}.} +\item{targetCovariateContinuousTable}{(Optional) The name of the table where the resulting continuous covariates should be stored.} + \item{targetCovariateRefTable}{(Optional) The name of the table where the covariate reference will be stored.} \item{targetAnalysisRefTable}{(Optional) The name of the table where the analysis reference will be stored.} +\item{targetTimeRefTable}{(Optional) The name of the table for the time reference} + \item{aggregated}{Should aggregate statistics be computed instead of covariates per cohort entry?} @@ -118,9 +124,7 @@ results <- getDbDefaultCovariateData( connection = connection, cdmDatabaseSchema = "main", cohortTable = "cohort", - covariateSettings = createDefaultCovariateSettings(), - targetDatabaseSchema = "main", - targetCovariateTable = "ut_cov" + covariateSettings = createDefaultCovariateSettings() ) } } diff --git a/tests/testthat/setup.R b/tests/testthat/setup.R index dd59bcf..2007a29 100644 --- a/tests/testthat/setup.R +++ b/tests/testthat/setup.R @@ -2,6 +2,19 @@ library(testthat) library(FeatureExtraction) library(dplyr) +# AGS: This rJava code block was used to add the Java dependencies to the classpath for testing. It is needed to run the tests on all platforms, but it is not needed when running individual test files in RStudio, which is why it is not included in the helper functions file. If we want to run individual test files, we can use the loadRenderTranslateUnitTestSql function defined below, which also adds the Java dependencies to the classpath if they are not already there. +# library(rJava) +# .jinit() +# jar_dirs <- c( +# system.file("java", package = "DatabaseConnector"), +# system.file("java", package = "SqlRender"), +# system.file("java", package = "FeatureExtraction") +# ) +# jar_files <- unlist( +# lapply(jar_dirs, list.files, pattern = "\\.jar$", full.names = TRUE) +# ) +# .jaddClassPath(jar_files) + dbms <- getOption("dbms", default = "sqlite") message("************* Testing on ", dbms, " *************\n") @@ -98,7 +111,7 @@ checkRemoteFileAvailable <- function(remoteFile) { } # Then stop if status > 400 if (httr::http_error(resp)) { - message_for_status(resp) + httr::message_for_status(resp) return(NULL) } return("success") diff --git a/tests/testthat/test-GetDefaultCovariates.R b/tests/testthat/test-GetDefaultCovariates.R index 25bede6..4411827 100644 --- a/tests/testthat/test-GetDefaultCovariates.R +++ b/tests/testthat/test-GetDefaultCovariates.R @@ -42,21 +42,3 @@ test_that("Test exit conditions", { )) }) -# AGS - This test fails and is likely due to a bug when using SqlLite -# test_that("Test target table", { -# connection <- DatabaseConnector::connect(connectionDetails) -# Eunomia::createCohorts(connectionDetails) -# -# results <- getDbDefaultCovariateData(connection = connection, -# cdmDatabaseSchema = "main", -# cohortTable = "cohort", -# covariateSettings = createDefaultCovariateSettings(), -# targetDatabaseSchema = "main", -# targetCovariateTable = "ut_cov", -# targetCovariateRefTable = "ut_cov_ref", -# targetAnalysisRefTable = "ut_cov_analysis_ref") -# -# on.exit(DatabaseConnector::disconnect(connection)) -# }) -# -# unlink(connectionDetails$server())