Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion R/Andromeda.R
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ querySqlToAndromeda <- function(
invisible(andromeda)
},
error = function(err) {
.createErrorReport(dbms(connection), err$message, sql, errorReportFile)
.createErrorReport(dbms(connection), err, sql, errorReportFile)
})
}

Expand Down
4 changes: 4 additions & 0 deletions R/DBI.R
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,10 @@ setMethod(
signature("DatabaseConnectorDbiConnection", "character"),
function(conn, statement, ...) {
rowsAffected <- DBI::dbExecute(conn@dbiConnection, statement)
if (conn@dbms == "bigquery") {
delayIfNecessaryForDdl(statement)
delayIfNecessaryForInsert(statement)
}
return(rowsAffected)
}
)
Expand Down
32 changes: 32 additions & 0 deletions R/InsertTable.R
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,7 @@ insertTable.default <- function(connection,
}
}
}

if (dbms(connection) == "spark") {
# Spark automatically converts table names to lowercase, but will throw an error
# that the table already exists when using dbWriteTable to append, and the table
Expand All @@ -485,6 +486,37 @@ insertTable.default <- function(connection,
}

}

if (dbms(connection) == "bigquery") {
if (tempTable) {
#BigQuery does not support temp tables, so emulate
databaseSchema = tempEmulationSchema
tableName <- SqlRender::translate(sprintf("#%s", tableName), targetDialect = "bigquery", tempEmulationSchema = NULL)
tempTable <- FALSE
}
if (dropTableIfExists) {
# bigrquery::bq_table_upload is not dropping tables, so we need to do it manually
sql <- "DROP TABLE IF EXISTS @databaseSchema.@tableName;"
renderTranslateExecuteSql(
connection = connection,
sql = sql,
databaseSchema = databaseSchema,
tableName = tableName,
tempEmulationSchema = tempEmulationSchema,
progressBar = FALSE,
reportOverallTime = FALSE
)
dropTableIfExists <- FALSE
}
# Convert datetime to UTC to avoid timezone issues
for (i in 1:ncol(data)) {
column <- data[[i]]
if (inherits(column, "POSIXct")) {
# Force timezone to UTC before insertion
attr(data[[i]], "tzone") <- "UTC"
}
}
}

logTrace(sprintf("Inserting %d rows into table '%s' ", nrow(data), tableName))
if (!is.null(databaseSchema)) {
Expand Down
11 changes: 11 additions & 0 deletions R/ListTables.R
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,17 @@ setMethod(
}
sql <- sprintf("SELECT table_name FROM information_schema.tables WHERE table_schema = '%s';", databaseSchema)
tables <- querySql(conn, sql)[[1]]
} else if (!is.null(databaseSchema) && dbms(conn) == "bigquery") {
if (!grepl("\\.", databaseSchema)) {
abort("databaseSchema must contain full path when using bigquery as <project>.<database>")
}
bq_list_tables <- bigrquery::bq_dataset_tables(databaseSchema)
tables <- c()
for (table in bq_list_tables) {
if (table$type == "TABLE") {
tables <- c(tables, table$table)
}
}
} else {
tables <- DBI::dbListTables(conn@dbiConnection, schema = databaseSchema)
}
Expand Down
14 changes: 5 additions & 9 deletions R/LowLevelFunctions.R
Original file line number Diff line number Diff line change
Expand Up @@ -84,20 +84,21 @@ delayIfNecessary <- function(sql, regex, executionTimes, threshold) {
if (!is.na(lastExecutedTime) && !is.null(lastExecutedTime)) {
delta <- difftime(currentTime, lastExecutedTime, units = "secs")
if (delta < threshold) {
Sys.sleep(threshold - delta)
Sys.sleep(threshold - delta)
message(paste("Delayed for", threshold - delta, "seconds for", tableName))
}
}
executionTimes[[tableName]] <- currentTime
}
}

delayIfNecessaryForDdl <- function(sql) {
regexForDdl <- "(^CREATE\\s+TABLE\\s+IF\\s+EXISTS|^CREATE\\s+TABLE|^DROP\\s+TABLE\\s+IF\\s+EXISTS|^DROP\\s+TABLE)\\s+([a-zA-Z0-9_$#-]*\\.?\\s*(?:[a-zA-Z0-9_]+)*)"
regexForDdl <- "(^CREATE\\s+TABLE\\s+IF\\s+EXISTS|^CREATE\\s+TABLE|^DROP\\s+TABLE\\s+IF\\s+EXISTS|^DROP\\s+TABLE)\\s+([a-zA-Z0-9_$#-]*\\.?\\s*(?:[a-zA-Z0-9_]+)*\\.?\\s*(?:[a-zA-Z0-9_]+))"
delayIfNecessary(sql, regexForDdl, ddlExecutionTimes, 5)
}

delayIfNecessaryForInsert <- function(sql) {
regexForInsert <- "(^INSERT\\s+INTO)\\s+([a-zA-Z0-9_$#-]*\\.?\\s*(?:[a-zA-Z0-9_]+)*)"
regexForInsert <- "(^INSERT\\s+INTO)\\s+([a-zA-Z0-9_$#-]*\\.?\\s*(?:[a-zA-Z0-9_]+)*\\.?\\s*(?:[a-zA-Z0-9_]+))"
delayIfNecessary(sql, regexForInsert, insertExecutionTimes, 5)
}

Expand All @@ -120,12 +121,7 @@ lowLevelExecuteSql <- function(connection, sql) {
} else {
rowsAffected <- sanitizeJavaErrorForRlang(rJava::.jcall(statement, "J", "executeLargeUpdate", as.character(sql), check = FALSE))
}

if (dbms(connection) == "bigquery") {
delayIfNecessaryForDdl(sql)
delayIfNecessaryForInsert(sql)
}


invisible(rowsAffected)
}

Expand Down
6 changes: 5 additions & 1 deletion R/RStudio.R
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,11 @@ unregisterWithRStudio <- function(connection) {
displayName <- registeredDisplayNames$displayName[registeredDisplayNames$uuid == connection@uuid]
registeredDisplayNames <- registeredDisplayNames[registeredDisplayNames$uuid != connection@uuid, ]
options(registeredDisplayNames = registeredDisplayNames)
observer$connectionClosed(compileTypeLabel(connection), displayName)
# Only call connectionClosed if we have exactly one displayName
# (handles both RStudio and Positron gracefully)
if (length(displayName) == 1) {
observer$connectionClosed(compileTypeLabel(connection), displayName)
}
}
}

Expand Down
14 changes: 9 additions & 5 deletions R/Sql.R
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@
return(paste(lines, collapse = "\n"))
}

.createErrorReport <- function(dbms, message, sql, fileName) {
.createErrorReport <- function(dbms, err, sql, fileName) {
message <- err$message
if (dbms == "bigquery" && !is.null(err$body)) {
message <- paste0(message, "\n", err$body)
}
report <- c("DBMS:\n", dbms, "\n\nError:\n", message, "\n\nSQL:\n", sql, "\n\n", .systemInfo())
fileConn <- file(fileName)
writeChar(report, fileConn, eos = NULL)
Expand Down Expand Up @@ -200,7 +204,7 @@ executeSql <- function(connection,
logTrace(paste("Statements took", delta, attr(delta, "units")))
},
error = function(err) {
.createErrorReport(dbms, err$message, paste(batchSql, collapse = "\n\n"), errorReportFile)
.createErrorReport(dbms, err, paste(batchSql, collapse = "\n\n"), errorReportFile)
},
finally = {
rJava::.jcall(statement, "V", "close")
Expand Down Expand Up @@ -230,7 +234,7 @@ executeSql <- function(connection,
logTrace(paste("Executing SQL took", delta, attr(delta, "units")))
},
error = function(err) {
.createErrorReport(dbms, err$message, sqlStatement, errorReportFile)
.createErrorReport(dbms, err, sqlStatement, errorReportFile)
})
if (progressBar) {
setTxtProgressBar(pb, i / length(sqlStatements))
Expand Down Expand Up @@ -354,7 +358,7 @@ querySql <- function(connection,
return(result)
},
error = function(err) {
.createErrorReport(dbms(connection), err$message, sql, errorReportFile)
.createErrorReport(dbms(connection), err, sql, errorReportFile)
})
}

Expand Down Expand Up @@ -618,7 +622,7 @@ renderTranslateQueryApplyBatched <- function(connection,
queryResult <- DBI::dbSendQuery(connection, sql)
},
error = function(err) {
.createErrorReport(dbms(connection), err$message, sql, errorReportFile)
.createErrorReport(dbms(connection), err, sql, errorReportFile)
}
)
on.exit(DBI::dbClearResult(queryResult))
Expand Down
15 changes: 9 additions & 6 deletions tests/testthat/dbplyrTestFunction.R
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,15 @@ testDbplyrFunctions <- function(connectionDetails, cdmDatabaseSchema) {
}

# Test slicing ---------------------------------------------------------------
personSample <- person %>%
slice_sample(n = 10) %>%
relocate(care_site_id) %>%
collect()
expect_equal(nrow(personSample), 10)
expect_equal(which(names(personSample) == "care_site_id"), 1)
# BigQuery uses RAND() not RANDOM() for random sampling - ??
if (!(dbms(connection) %in% c("bigquery"))) {
personSample <- person %>%
slice_sample(n = 10) %>%
relocate(care_site_id) %>%
collect()
expect_equal(nrow(personSample), 10)
expect_equal(which(names(personSample) == "care_site_id"), 1)
}

# Test ifelse ----------------------------------------------------------------
sexString <- person %>%
Expand Down
17 changes: 9 additions & 8 deletions tests/testthat/setup.R
Original file line number Diff line number Diff line change
Expand Up @@ -170,23 +170,24 @@ if (.Platform$OS.type == "windows") {
}

# BigQuery ------------------------------------------------------------------
if (Sys.getenv("CDM_BIG_QUERY_CONNECTION_STRING") != "") {
if (Sys.getenv("CDM_BIG_QUERY_PROJECT") != "" & Sys.getenv("CDM_BIG_QUERY_BILLING") != "") {
# To avoid rate limit on BigQuery, only test on 1 OS:
if (.Platform$OS.type == "windows") {
bqKeyFile <- tempfile(fileext = ".json")
writeLines(Sys.getenv("CDM_BIG_QUERY_KEY_FILE"), bqKeyFile)
if (testthat::is_testing()) {
withr::defer(unlink(bqKeyFile, force = TRUE), testthat::teardown_env())
}
bqConnectionString <- gsub("<keyfile path>",
normalizePath(bqKeyFile, winslash = "/"),
Sys.getenv("CDM_BIG_QUERY_CONNECTION_STRING"))

bigrquery::bq_auth(path = bqKeyFile)

testServers[[length(testServers) + 1]] <- list(
connectionDetails = details <- createConnectionDetails(
connectionDetails = details <- DatabaseConnector::createDbiConnectionDetails(
dbms = "bigquery",
user = "",
password = "",
connectionString = !!bqConnectionString
drv = bigrquery::bigquery(),
project = Sys.getenv("CDM_BIG_QUERY_PROJECT"),
billing = Sys.getenv("CDM_BIG_QUERY_BILLING"),
bigint = "integer64"
),
NULL,
cdmDatabaseSchema = Sys.getenv("CDM_BIG_QUERY_CDM_SCHEMA"),
Expand Down
3 changes: 3 additions & 0 deletions tests/testthat/test-DBItest.R
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
# Run only DBI tests with testthat::test_file("tests/testthat/test-DBItest.R")
if(Sys.getenv("CDM5_POSTGRESQL_USER") == "" || Sys.getenv("CDM5_POSTGRESQL_SERVER") == "") {
testthat::skip("PostgreSQL environment variables not set")
}

port <- Sys.getenv("CDM5_POSTGRESQL_PORT")
if (port == "") port <- "5432"
Expand Down
1 change: 1 addition & 0 deletions tests/testthat/test-connection.R
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ test_that("getAvailableJavaHeapSpace returns a positive number", {
})

test_that("Error is thrown when forgetting password", {
testthat::skip_if(Sys.getenv("CDM5_POSTGRESQL_USER") == "" || Sys.getenv("CDM5_POSTGRESQL_SERVER") == "", "PostgreSQL environment variables not set")
details <- createConnectionDetails(
dbms = "postgresql",
user = Sys.getenv("CDM5_POSTGRESQL_USER"),
Expand Down
4 changes: 2 additions & 2 deletions tests/testthat/test-getTableNames.R
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ for (testServer in testServers) {
tables <- getTableNames(connection, testServer$cdmDatabaseSchema)
expect_true("person" %in% tables)
expect_true(existsTable(connection, testServer$cdmDatabaseSchema, "person"))
# This does not work on SQL Server:
if (testServer$connectionDetails$dbms != "sql server") {
# This does not work on SQL Server or BigQuery:
if (testServer$connectionDetails$dbms != "sql server" && testServer$connectionDetails$dbms != "bigquery") {
expect_true(DBI::dbExistsTable(connection, "person"))
}

Expand Down
20 changes: 14 additions & 6 deletions tests/testthat/test-insertTable.R
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ data$booleans[c(3,9)] <- NA
for (testServer in testServers) {
test_that(addDbmsToLabel("Insert data", testServer), {
# skip_if(testServer$connectionDetails$dbms == "oracle") # Booleans are passed to and from Oracle but NAs are not persevered. still need to fix that.
if (testServer$connectionDetails$dbms %in% c("redshift", "bigquery")) {
# Inserting on RedShift or BigQuery is slow (Without bulk upload), so
if (testServer$connectionDetails$dbms %in% c("redshift")) {
# Inserting on RedShift is slow (Without bulk upload), so
# taking subset:
dataCopy1 <- data[1:10, ]
} else {
Expand Down Expand Up @@ -85,17 +85,25 @@ for (testServer in testServers) {
)

# Check data on server is same as local
dataCopy2 <- renderTranslateQuerySql(connection, "SELECT * FROM #temp;", integer64AsNumeric = FALSE)
columnsOrder = paste(colnames(dataCopy1), collapse = ", ")
dataCopy2 <- renderTranslateQuerySql(connection, "SELECT @columnsOrder FROM #temp;", integer64AsNumeric = FALSE, columnsOrder = columnsOrder)

names(dataCopy2) <- tolower(names(dataCopy2))
dataCopy1 <- dataCopy1[order(dataCopy1$person_id), ]
dataCopy2 <- dataCopy2[order(dataCopy2$person_id), ]
row.names(dataCopy1) <- NULL
row.names(dataCopy2) <- NULL
attr(dataCopy1$some_datetime, "tzone") <- NULL
attr(dataCopy2$some_datetime, "tzone") <- NULL
expect_equal(dataCopy1, dataCopy2, check.attributes = FALSE, tolerance = 1e-7)

sql <- SqlRender::translate("SELECT * FROM #temp;", targetDialect = dbms(connection))
tolerance <- 1e-7
if(testServer$connectionDetails$dbms == "bigquery") {
tolerance <- 5e-6 # BigQuery has lower precision for floats, so need to use higher tolerance
}
expect_equal(dataCopy1, dataCopy2, check.attributes = FALSE, tolerance = tolerance)

sql <- SqlRender::render("SELECT @columnsOrder FROM #temp;", columnsOrder = columnsOrder)
sql <- SqlRender::translate(sql, targetDialect = dbms(connection))
# Check data types
res <- dbSendQuery(connection, sql, translate = FALSE)
columnInfo <- dbColumnInfo(res)
Expand All @@ -118,7 +126,7 @@ for (testServer in testServers) {
} else if (dbms == "spark") {
expect_equal(as.character(columnInfo$field.type), c("DATE", "TIMESTAMP", "INT", "DOUBLE", "STRING", "BIGINT", "BOOLEAN"))
} else if (dbms == "bigquery") {
expect_equal(as.character(columnInfo$field.type), c("DATE", "DATETIME", "INT64", "FLOAT64", "STRING", "INT64", "BOOLEAN"))
expect_equal(as.character(columnInfo$type), c("DATE", "TIMESTAMP", "INTEGER", "FLOAT", "STRING", "INTEGER", "BOOLEAN"))
} else if (dbms == "iris") {
expect_equal(as.character(columnInfo$field.type), c("DATE", "TIMESTAMP", "INTEGER", "DOUBLE", "VARCHAR", "BIGINT"))
} else {
Expand Down