diff --git a/DESCRIPTION b/DESCRIPTION index 0f03bcb..d0276ff 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -24,6 +24,7 @@ Suggests: later (>= 1.1.0), magrittr (>= 2.0.0), promises, + R6, reticulate, rmarkdown, testthat (>= 3.0.0) diff --git a/NEWS.md b/NEWS.md index 7393124..980fbb3 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,5 +1,10 @@ # coro (development version) +* Async functions and generators can now be R6 methods (#63). + +* A memory leak that crept back in (see #36) was fixed with a more robust approach. + + # coro 1.1.0 * Iterator functions are now allowed to have a `close` argument. diff --git a/R/generator.R b/R/generator.R index fa336f9..86b91ee 100644 --- a/R/generator.R +++ b/R/generator.R @@ -115,183 +115,224 @@ gen <- function(expr) { } generator0 <- function(fn, type = "generator") { + # Generated at runtime state_machine <- NULL - fmls <- formals(fn) - env <- environment(fn) # Flipped when `coro_debug()` is applied on a generator factory debugged <- FALSE - `_parent` <- environment() + fmls <- formals(fn) + static <- environment() + + body <- quote({ + # This is to prevent the compiler from JIT-compiling the generator factory, + # which would cause the injected static environment to leak via the constant + # pool. See https://github.com/r-lib/coro/issues/36. Functions that may call + # `browser()` are never compiled. + if (FALSE) { + browser() + } - # Create the generator factory (returned by `generator()` and - # entered by `async()`) - factory <- new_function( - fmls, - quote({ - # Evaluate here so the formals of the generator factory do not - # mask our variables - `_private` <- rlang::env(`_parent`) - `_private`$generator_env <- base::environment() - `_private`$caller_env <- base::parent.frame() - - base::local(envir = `_private`, { - generator_env <- environment()$generator_env - caller_env <- environment()$caller_env - - # Prevent lints about unknown bindings + # Evaluate here so the formals of the generator factory do not + # mask our variables. To see how `_static` is defined, see below. + `_private` <- rlang::env(`_static`) + `_private`$generator_env <- base::environment() + `_private`$caller_env <- base::parent.frame() + `_private`$env <- base::parent.env(base::environment()) + + base::local(envir = `_private`, { + # Prevent lints about unknown bindings + if (FALSE) { + state_machine <- NULL + type <- NULL + fn <- NULL + fmls <- NULL + debugged <- NULL exits <- NULL exited <- NULL cleanup <- NULL close_active_iterators <- NULL + } - info <- machine_info(type, env = caller_env) + generator_env <- environment()$generator_env + caller_env <- environment()$caller_env - # Generate the state machine lazily at runtime - if (is_null(state_machine)) { - state_machine <<- walk_states(body(fn), info = info) - } + info <- machine_info(type, env = caller_env) - ops <- info$async_ops - if (!is_null(ops) && !is_installed(ops$package)) { - abort(sprintf("The %s package must be installed.", ops$package)) - } + # Generate the state machine lazily at runtime + if (is_null(state_machine)) { + state_machine <<- walk_states(body(fn), info = info) + } - env <- new_generator_env(env, info) - user_env <- env$user_env + ops <- info$async_ops + if (!is_null(ops) && !is_installed(ops$package)) { + abort(sprintf("The %s package must be installed.", ops$package)) + } - # The compiler caches function bodies, so inline a weak reference to avoid - # leaks (#36). This weak reference is injected inside the body of the - # generator instance to work around a scoping issue. See where we install - # the user's exit handlers. - weak_env <- new_weakref(env) + env <- new_generator_env(env, info) + user_env <- env$user_env - # Forward arguments inside the user space of the state machine - lapply(names(fmls), function(arg) { - env_bind_arg(user_env, arg, frame = generator_env) - }) + # The compiler caches function bodies, so inline a weak reference to avoid + # leaks (#36). This weak reference is injected inside the body of the + # generator instance to work around a scoping issue. See where we install + # the user's exit handlers. + weak_env <- new_weakref(env) - # Flipped when `f` is pressed in the browser - undebugged <- FALSE - - # Called on cleanup to close all iterators active in - # ongoing `for` loops - close_active_iterators <- function() { - # The list is ordered from outermost to innermost for loops. Close them - # in reverse order, from most nested to least nested. - for (iter in rev(env$iterators)) { - if (!is_null(iter)) { - iter_close(iter) - } + # Forward arguments inside the user space of the state machine + lapply(names(fmls), function(arg) { + env_bind_arg(user_env, arg, frame = generator_env) + }) + + # Flipped when `f` is pressed in the browser + undebugged <- FALSE + + # Called on cleanup to close all iterators active in + # ongoing `for` loops + close_active_iterators <- function() { + # The list is ordered from outermost to innermost for loops. Close them + # in reverse order, from most nested to least nested. + for (iter in rev(env$iterators)) { + if (!is_null(iter)) { + iter_close(iter) } } - - env$close_active_iterators <- close_active_iterators - - env$cleanup <- function() { - env$close_active_iterators() - - # Prevent user exit handlers from running again - env$exits <- NULL + } + + env$close_active_iterators <- close_active_iterators + + env$cleanup <- function() { + env$close_active_iterators() + + # Prevent user exit handlers from running again + env$exits <- NULL + } + + # Create the generator instance. This is a function that resumes + # a state machine. + instance <- inject(function(arg, close = FALSE) { + # This is to prevent the compiler from JIT-compiling the generator factory, + # which would cause the injected static environment to leak via the constant + # pool. See https://github.com/r-lib/coro/issues/36. Functions that may call + # `browser()` are never compiled. Note that the compiler will repeatedly + # attempt to compile the function. + if (FALSE) { + browser() } - # Create the generator instance. This is a function that resumes - # a state machine. - instance <- inject(function(arg, close = FALSE) { - # Forward generator argument inside the state machine environment - delayedAssign("arg", arg, assign.env = env) - delayedAssign("close", close, assign.env = env) - - if (!undebugged && (debugged || is_true(peek_option("coro_debug")))) { - env_browse(user_env) - - defer({ - # `f` was pressed, disable debugging for this generator - if (!env_is_browsed(user_env)) { - undebugged <<- TRUE - } - }) - } - - if (is_true(env$exhausted)) { - return(exhausted()) - } + # Forward generator argument inside the state machine environment + delayedAssign("arg", arg, assign.env = env) + delayedAssign("close", close, assign.env = env) - if (close) { - # Prevent returning here as closing should be idempotent. We set - # ourselves as exhausted _before_ running any cleanup in case of - # failures. An exit handler shouldn't fail and it's expected that any - # failure prevents other handlers from running, including when an - # attempt is made at resuming the closed generator. - env$exhausted <- TRUE - - # First close active iterators. Should be first since they might be - # relying on resources set by the user. - close_active_iterators() - - # Now run the user's exit expressions. Achieved by running restoring - # user exits in the user environment and running an empty eval there. - # Unlike in the state machine path, where these expressions are meant - # to only run in case of unexpected exits, we don't disable them - # before exiting so they will actually run here. - evalq( - envir = user_env, - base::evalq(envir = rlang::wref_key(!!weak_env), { - env_poke_exits(user_env, exits) - }) - ) - - return(exhausted()) - } + if (!undebugged && (debugged || is_true(peek_option("coro_debug")))) { + env_browse(user_env) - # Disable generator on error, interrupt, debugger quit, etc. - # There is no safe way of resuming a generator that didn't - # suspend normally. - if (is_true(env$jumped)) { - # In case a scheduler calls back the generator for error - # handling or cleanup - if (!missing(arg)) { - force(arg) + defer({ + # `f` was pressed, disable debugging for this generator + if (!env_is_browsed(user_env)) { + undebugged <<- TRUE } - abort( - "This function has been disabled because of an unexpected exit." - ) - } + }) + } - # Resume state machine. Set up an execution env in the user - # environment first to serve as a target for on.exit() - # expressions. Then evaluate state machine in its private - # environment. - env$jumped <- TRUE - env$exited <- TRUE + if (is_true(env$exhausted)) { + return(exhausted()) + } - out <- evalq(envir = user_env, { + if (close) { + # Prevent returning here as closing should be idempotent. We set + # ourselves as exhausted _before_ running any cleanup in case of + # failures. An exit handler shouldn't fail and it's expected that any + # failure prevents other handlers from running, including when an + # attempt is made at resuming the closed generator. + env$exhausted <- TRUE + + # First close active iterators. Should be first since they might be + # relying on resources set by the user. + close_active_iterators() + + # Now run the user's exit expressions. Achieved by running restoring + # user exits in the user environment and running an empty eval there. + # Unlike in the state machine path, where these expressions are meant + # to only run in case of unexpected exits, we don't disable them + # before exiting so they will actually run here. + evalq( + envir = user_env, base::evalq(envir = rlang::wref_key(!!weak_env), { - defer(if (exited) cleanup()) env_poke_exits(user_env, exits) - !!state_machine }) - }) - env$jumped <- FALSE + ) - out - }) - - env$.self <- instance + return(exhausted()) + } - if (is_string(type, "async")) { - # Step into the generator right away - invisible(instance(NULL)) - } else { - structure(instance, class = "coro_generator_instance") + # Disable generator on error, interrupt, debugger quit, etc. + # There is no safe way of resuming a generator that didn't + # suspend normally. + if (is_true(env$jumped)) { + # In case a scheduler calls back the generator for error + # handling or cleanup + if (!missing(arg)) { + force(arg) + } + abort( + "This function has been disabled because of an unexpected exit." + ) } + + # Resume state machine. Set up an execution env in the user + # environment first to serve as a target for on.exit() + # expressions. Then evaluate state machine in its private + # environment. + env$jumped <- TRUE + env$exited <- TRUE + + out <- evalq(envir = user_env, { + base::evalq(envir = rlang::wref_key(!!weak_env), { + defer(if (exited) cleanup()) + env_poke_exits(user_env, exits) + !!state_machine + }) + }) + env$jumped <- FALSE + + out }) + + env$.self <- instance + + if (is_string(type, "async")) { + # Step into the generator right away + invisible(instance(NULL)) + } else { + structure(instance, class = "coro_generator_instance") + } }) + }) + + # Don't use `expr({ !!!body })` because that causes R CMD check issues due to + # `length(body)` not matching the length of srcrefs of `{` + # https://github.com/r-lib/rlang/issues/1821 + body <- call2( + "{", + expr(`_static` <- !!static), + !!!as.list(body[-1]) ) + # Create the generator factory (returned by `generator()` and + # entered by `async()`) + factory <- new_function(fmls, body, env = environment(fn)) structure(factory, class = c(paste0("coro_", type), "function")) } +static_env <- function(fn) { + if (is_generator_factory(fn)) { + # Extract injected environment in first assignment in `{` + body(fn)[[2]][[3]] + } else { + fn_env(fn) + } +} + # Creates a child of the coro namespace that holds all the variables # used by the generator runtime new_generator_env <- function(parent, info) { @@ -349,7 +390,7 @@ print.coro_generator <- function(x, ..., internals = FALSE) { } #' @export print.coro_generator_instance <- function(x, ..., internals = FALSE) { - type <- env_get(fn_env(x), "type", inherit = TRUE) + type <- env_get(static_env(x), "type", inherit = TRUE) if (is_string(type, "async_generator")) { writeLines("") @@ -361,7 +402,7 @@ print.coro_generator_instance <- function(x, ..., internals = FALSE) { } print_generator <- function(x, ..., internals = FALSE, reproducible = FALSE) { - fn <- env_get(fn_env(x), "fn", inherit = TRUE) + fn <- env_get(static_env(x), "fn", inherit = TRUE) if (reproducible) { fn <- zap_env(fn) @@ -376,7 +417,7 @@ print_generator <- function(x, ..., internals = FALSE, reproducible = FALSE) { invisible(x) } print_state_machine <- function(x, ...) { - machine <- with(env(fn_env(x)), { + machine <- with(env(static_env(x)), { info <- machine_info(type, env = global_env()) state_machine %||% walk_states(body(fn), info = info) }) diff --git a/tests/testthat/test-async.R b/tests/testthat/test-async.R index f302454..180ce8a 100644 --- a/tests/testthat/test-async.R +++ b/tests/testthat/test-async.R @@ -364,3 +364,24 @@ test_that("async functions do not cause CMD check notes (#40)", { )) ) }) + +test_that("async methods in R6 classes", { + testthat::skip_if_not_installed("R6") + + AsyncClass <- R6::R6Class( + classname = "AsyncClass", + public = list( + async_resolved = async(function() "value"), + async_pending = async(function() await("value")) + ) + ) + class <- AsyncClass$new() + + later::with_temp_loop({ + out <- class$async_resolved() + expect_promise(out, "value", "fulfilled") + + out <- class$async_pending() + expect_promise(out, status = "pending") + }) +}) diff --git a/tests/testthat/test-generator.R b/tests/testthat/test-generator.R index e245981..779408b 100644 --- a/tests/testthat/test-generator.R +++ b/tests/testthat/test-generator.R @@ -536,3 +536,24 @@ test_that("returning early doesn't yield values (#51)", { }) expect_equal(collect(g()), list()) }) + +test_that("generator methods in R6 classes", { + testthat::skip_if_not_installed("R6") + + GenClass <- R6::R6Class( + "GenClass", + public = list( + gen_method = generator(function() { + yield(1) + yield(2) + }) + ) + ) + + obj <- GenClass$new() + g <- obj$gen_method() + + expect_equal(g(), 1) + expect_equal(g(), 2) + expect_exhausted(g()) +})