From 240d95c1d88bd82f872b2a521571172f573447c7 Mon Sep 17 00:00:00 2001 From: Lionel Henry Date: Wed, 10 Sep 2025 10:41:14 +0200 Subject: [PATCH 1/6] Inject static environment in generator factory body --- R/generator.R | 312 ++++++++++++++++++++++++++------------------------ 1 file changed, 165 insertions(+), 147 deletions(-) diff --git a/R/generator.R b/R/generator.R index fa336f9..98b4e59 100644 --- a/R/generator.R +++ b/R/generator.R @@ -115,183 +115,201 @@ gen <- function(expr) { } generator0 <- function(fn, type = "generator") { + # Declarations for R CMD check, unused state_machine <- NULL + debugged <- NULL + fmls <- formals(fn) env <- environment(fn) - # Flipped when `coro_debug()` is applied on a generator factory - debugged <- FALSE + static <- env( + state_machine = NULL, + fmls = fmls, + env = env, + # Flipped when `coro_debug()` is applied on a generator factory + debugged = FALSE, + ) - `_parent` <- environment() + body <- quote({ + # Evaluate here so the formals of the generator factory do not + # mask our variables. To see how `_static` is defined, see below. + `_private` <- rlang::env(`_static`) + `_private`$generator_env <- base::environment() + `_private`$caller_env <- base::parent.frame() + + base::local(envir = `_private`, { + generator_env <- environment()$generator_env + caller_env <- environment()$caller_env + + # Prevent lints about unknown bindings + exits <- NULL + exited <- NULL + cleanup <- NULL + close_active_iterators <- NULL + + info <- machine_info(type, env = caller_env) + + # Generate the state machine lazily at runtime + if (is_null(state_machine)) { + state_machine <<- walk_states(body(fn), info = info) + } + + ops <- info$async_ops + if (!is_null(ops) && !is_installed(ops$package)) { + abort(sprintf("The %s package must be installed.", ops$package)) + } + + env <- new_generator_env(env, info) + user_env <- env$user_env + + # The compiler caches function bodies, so inline a weak reference to avoid + # leaks (#36). This weak reference is injected inside the body of the + # generator instance to work around a scoping issue. See where we install + # the user's exit handlers. + weak_env <- new_weakref(env) + + # Forward arguments inside the user space of the state machine + lapply(names(fmls), function(arg) { + env_bind_arg(user_env, arg, frame = generator_env) + }) - # Create the generator factory (returned by `generator()` and - # entered by `async()`) - factory <- new_function( - fmls, - quote({ - # Evaluate here so the formals of the generator factory do not - # mask our variables - `_private` <- rlang::env(`_parent`) - `_private`$generator_env <- base::environment() - `_private`$caller_env <- base::parent.frame() - - base::local(envir = `_private`, { - generator_env <- environment()$generator_env - caller_env <- environment()$caller_env - - # Prevent lints about unknown bindings - exits <- NULL - exited <- NULL - cleanup <- NULL - close_active_iterators <- NULL - - info <- machine_info(type, env = caller_env) - - # Generate the state machine lazily at runtime - if (is_null(state_machine)) { - state_machine <<- walk_states(body(fn), info = info) + # Flipped when `f` is pressed in the browser + undebugged <- FALSE + + # Called on cleanup to close all iterators active in + # ongoing `for` loops + close_active_iterators <- function() { + # The list is ordered from outermost to innermost for loops. Close them + # in reverse order, from most nested to least nested. + for (iter in rev(env$iterators)) { + if (!is_null(iter)) { + iter_close(iter) + } } + } - ops <- info$async_ops - if (!is_null(ops) && !is_installed(ops$package)) { - abort(sprintf("The %s package must be installed.", ops$package)) - } + env$close_active_iterators <- close_active_iterators - env <- new_generator_env(env, info) - user_env <- env$user_env + env$cleanup <- function() { + env$close_active_iterators() - # The compiler caches function bodies, so inline a weak reference to avoid - # leaks (#36). This weak reference is injected inside the body of the - # generator instance to work around a scoping issue. See where we install - # the user's exit handlers. - weak_env <- new_weakref(env) + # Prevent user exit handlers from running again + env$exits <- NULL + } - # Forward arguments inside the user space of the state machine - lapply(names(fmls), function(arg) { - env_bind_arg(user_env, arg, frame = generator_env) - }) + # Create the generator instance. This is a function that resumes + # a state machine. + instance <- inject(function(arg, close = FALSE) { + # Forward generator argument inside the state machine environment + delayedAssign("arg", arg, assign.env = env) + delayedAssign("close", close, assign.env = env) + + if (!undebugged && (debugged || is_true(peek_option("coro_debug")))) { + env_browse(user_env) - # Flipped when `f` is pressed in the browser - undebugged <- FALSE - - # Called on cleanup to close all iterators active in - # ongoing `for` loops - close_active_iterators <- function() { - # The list is ordered from outermost to innermost for loops. Close them - # in reverse order, from most nested to least nested. - for (iter in rev(env$iterators)) { - if (!is_null(iter)) { - iter_close(iter) + defer({ + # `f` was pressed, disable debugging for this generator + if (!env_is_browsed(user_env)) { + undebugged <<- TRUE } - } + }) } - env$close_active_iterators <- close_active_iterators - - env$cleanup <- function() { - env$close_active_iterators() - - # Prevent user exit handlers from running again - env$exits <- NULL + if (is_true(env$exhausted)) { + return(exhausted()) } - # Create the generator instance. This is a function that resumes - # a state machine. - instance <- inject(function(arg, close = FALSE) { - # Forward generator argument inside the state machine environment - delayedAssign("arg", arg, assign.env = env) - delayedAssign("close", close, assign.env = env) - - if (!undebugged && (debugged || is_true(peek_option("coro_debug")))) { - env_browse(user_env) - - defer({ - # `f` was pressed, disable debugging for this generator - if (!env_is_browsed(user_env)) { - undebugged <<- TRUE - } + if (close) { + # Prevent returning here as closing should be idempotent. We set + # ourselves as exhausted _before_ running any cleanup in case of + # failures. An exit handler shouldn't fail and it's expected that any + # failure prevents other handlers from running, including when an + # attempt is made at resuming the closed generator. + env$exhausted <- TRUE + + # First close active iterators. Should be first since they might be + # relying on resources set by the user. + close_active_iterators() + + # Now run the user's exit expressions. Achieved by running restoring + # user exits in the user environment and running an empty eval there. + # Unlike in the state machine path, where these expressions are meant + # to only run in case of unexpected exits, we don't disable them + # before exiting so they will actually run here. + evalq( + envir = user_env, + base::evalq(envir = rlang::wref_key(!!weak_env), { + env_poke_exits(user_env, exits) }) - } + ) - if (is_true(env$exhausted)) { - return(exhausted()) - } - - if (close) { - # Prevent returning here as closing should be idempotent. We set - # ourselves as exhausted _before_ running any cleanup in case of - # failures. An exit handler shouldn't fail and it's expected that any - # failure prevents other handlers from running, including when an - # attempt is made at resuming the closed generator. - env$exhausted <- TRUE - - # First close active iterators. Should be first since they might be - # relying on resources set by the user. - close_active_iterators() - - # Now run the user's exit expressions. Achieved by running restoring - # user exits in the user environment and running an empty eval there. - # Unlike in the state machine path, where these expressions are meant - # to only run in case of unexpected exits, we don't disable them - # before exiting so they will actually run here. - evalq( - envir = user_env, - base::evalq(envir = rlang::wref_key(!!weak_env), { - env_poke_exits(user_env, exits) - }) - ) - - return(exhausted()) - } + return(exhausted()) + } - # Disable generator on error, interrupt, debugger quit, etc. - # There is no safe way of resuming a generator that didn't - # suspend normally. - if (is_true(env$jumped)) { - # In case a scheduler calls back the generator for error - # handling or cleanup - if (!missing(arg)) { - force(arg) - } - abort( - "This function has been disabled because of an unexpected exit." - ) + # Disable generator on error, interrupt, debugger quit, etc. + # There is no safe way of resuming a generator that didn't + # suspend normally. + if (is_true(env$jumped)) { + # In case a scheduler calls back the generator for error + # handling or cleanup + if (!missing(arg)) { + force(arg) } + abort( + "This function has been disabled because of an unexpected exit." + ) + } - # Resume state machine. Set up an execution env in the user - # environment first to serve as a target for on.exit() - # expressions. Then evaluate state machine in its private - # environment. - env$jumped <- TRUE - env$exited <- TRUE - - out <- evalq(envir = user_env, { - base::evalq(envir = rlang::wref_key(!!weak_env), { - defer(if (exited) cleanup()) - env_poke_exits(user_env, exits) - !!state_machine - }) + # Resume state machine. Set up an execution env in the user + # environment first to serve as a target for on.exit() + # expressions. Then evaluate state machine in its private + # environment. + env$jumped <- TRUE + env$exited <- TRUE + + out <- evalq(envir = user_env, { + base::evalq(envir = rlang::wref_key(!!weak_env), { + defer(if (exited) cleanup()) + env_poke_exits(user_env, exits) + !!state_machine }) - env$jumped <- FALSE - - out }) + env$jumped <- FALSE - env$.self <- instance - - if (is_string(type, "async")) { - # Step into the generator right away - invisible(instance(NULL)) - } else { - structure(instance, class = "coro_generator_instance") - } + out }) + + env$.self <- instance + + if (is_string(type, "async")) { + # Step into the generator right away + invisible(instance(NULL)) + } else { + structure(instance, class = "coro_generator_instance") + } }) - ) + }) + body <- expr({ + `_static` <- !!static + !!!body + }) + + # Create the generator factory (returned by `generator()` and + # entered by `async()`) + factory <- new_function(fmls, body) structure(factory, class = c(paste0("coro_", type), "function")) } +static_env <- function(fn) { + if (is_generator_factory(fn)) { + # Extract injected environment in first assignment in `{` + body(fn)[[2]][[3]] + } else { + fn_env(fn) + } +} + # Creates a child of the coro namespace that holds all the variables # used by the generator runtime new_generator_env <- function(parent, info) { @@ -376,7 +394,7 @@ print_generator <- function(x, ..., internals = FALSE, reproducible = FALSE) { invisible(x) } print_state_machine <- function(x, ...) { - machine <- with(env(fn_env(x)), { + machine <- with(env(static_env(x)), { info <- machine_info(type, env = global_env()) state_machine %||% walk_states(body(fn), info = info) }) From 47f5659c8586f6a9e1b00ae64c9c1de018e10170 Mon Sep 17 00:00:00 2001 From: Lionel Henry Date: Wed, 10 Sep 2025 11:19:27 +0200 Subject: [PATCH 2/6] Prevent compiler from handling generator factories and instances To prevent memory leaks via constant pool --- R/generator.R | 31 ++++++++++++++++++++++--------- 1 file changed, 22 insertions(+), 9 deletions(-) diff --git a/R/generator.R b/R/generator.R index 98b4e59..af5e1c5 100644 --- a/R/generator.R +++ b/R/generator.R @@ -115,22 +115,26 @@ gen <- function(expr) { } generator0 <- function(fn, type = "generator") { - # Declarations for R CMD check, unused + # Generated at runtime state_machine <- NULL - debugged <- NULL + + # Flipped when `coro_debug()` is applied on a generator factory + debugged <- FALSE fmls <- formals(fn) env <- environment(fn) - static <- env( - state_machine = NULL, - fmls = fmls, - env = env, - # Flipped when `coro_debug()` is applied on a generator factory - debugged = FALSE, - ) + static <- environment() body <- quote({ + # This is to prevent the compiler from JIT-compiling the generator factory, + # which would cause the injected static environment to leak via the constant + # pool. See https://github.com/r-lib/coro/issues/36. Functions that may call + # `browser()` are never compiled. + if (FALSE) { + browser() + } + # Evaluate here so the formals of the generator factory do not # mask our variables. To see how `_static` is defined, see below. `_private` <- rlang::env(`_static`) @@ -200,6 +204,15 @@ generator0 <- function(fn, type = "generator") { # Create the generator instance. This is a function that resumes # a state machine. instance <- inject(function(arg, close = FALSE) { + # This is to prevent the compiler from JIT-compiling the generator factory, + # which would cause the injected static environment to leak via the constant + # pool. See https://github.com/r-lib/coro/issues/36. Functions that may call + # `browser()` are never compiled. Note that the compiler will repeatedly + # attempt to compile the function. + if (FALSE) { + browser() + } + # Forward generator argument inside the state machine environment delayedAssign("arg", arg, assign.env = env) delayedAssign("close", close, assign.env = env) From c8d2523a66a6f6aeac44c8cdb9d3c58bf6319e03 Mon Sep 17 00:00:00 2001 From: Lionel Henry Date: Wed, 10 Sep 2025 12:00:49 +0200 Subject: [PATCH 3/6] Forward generator environment dynamically It might have changed, e.g. in R6 methods --- R/generator.R | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/R/generator.R b/R/generator.R index af5e1c5..e1e9cb8 100644 --- a/R/generator.R +++ b/R/generator.R @@ -122,8 +122,6 @@ generator0 <- function(fn, type = "generator") { debugged <- FALSE fmls <- formals(fn) - env <- environment(fn) - static <- environment() body <- quote({ @@ -140,6 +138,7 @@ generator0 <- function(fn, type = "generator") { `_private` <- rlang::env(`_static`) `_private`$generator_env <- base::environment() `_private`$caller_env <- base::parent.frame() + `_private`$env <- base::parent.env(base::environment()) base::local(envir = `_private`, { generator_env <- environment()$generator_env @@ -310,7 +309,7 @@ generator0 <- function(fn, type = "generator") { # Create the generator factory (returned by `generator()` and # entered by `async()`) - factory <- new_function(fmls, body) + factory <- new_function(fmls, body, env = environment(fn)) structure(factory, class = c(paste0("coro_", type), "function")) } @@ -380,7 +379,7 @@ print.coro_generator <- function(x, ..., internals = FALSE) { } #' @export print.coro_generator_instance <- function(x, ..., internals = FALSE) { - type <- env_get(fn_env(x), "type", inherit = TRUE) + type <- env_get(static_env(x), "type", inherit = TRUE) if (is_string(type, "async_generator")) { writeLines("") @@ -392,7 +391,7 @@ print.coro_generator_instance <- function(x, ..., internals = FALSE) { } print_generator <- function(x, ..., internals = FALSE, reproducible = FALSE) { - fn <- env_get(fn_env(x), "fn", inherit = TRUE) + fn <- env_get(static_env(x), "fn", inherit = TRUE) if (reproducible) { fn <- zap_env(fn) From a42f99ef9b72f3ee106324a9770a837abc87049c Mon Sep 17 00:00:00 2001 From: Lionel Henry Date: Thu, 11 Sep 2025 15:56:43 +0200 Subject: [PATCH 4/6] More robustly prevent lints about unknown bindings --- R/generator.R | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/R/generator.R b/R/generator.R index e1e9cb8..52548d5 100644 --- a/R/generator.R +++ b/R/generator.R @@ -141,15 +141,22 @@ generator0 <- function(fn, type = "generator") { `_private`$env <- base::parent.env(base::environment()) base::local(envir = `_private`, { + # Prevent lints about unknown bindings + if (FALSE) { + state_machine <- NULL + type <- NULL + fn <- NULL + fmls <- NULL + debugged <- NULL + exits <- NULL + exited <- NULL + cleanup <- NULL + close_active_iterators <- NULL + } + generator_env <- environment()$generator_env caller_env <- environment()$caller_env - # Prevent lints about unknown bindings - exits <- NULL - exited <- NULL - cleanup <- NULL - close_active_iterators <- NULL - info <- machine_info(type, env = caller_env) # Generate the state machine lazily at runtime From e1d84fa50bbe55124a6a659d7a5a0a0b4a84708d Mon Sep 17 00:00:00 2001 From: Lionel Henry Date: Thu, 11 Sep 2025 16:34:23 +0200 Subject: [PATCH 5/6] Add tests and NEWS bullet --- DESCRIPTION | 1 + NEWS.md | 5 +++++ tests/testthat/test-async.R | 21 +++++++++++++++++++++ tests/testthat/test-generator.R | 21 +++++++++++++++++++++ 4 files changed, 48 insertions(+) diff --git a/DESCRIPTION b/DESCRIPTION index 0f03bcb..d0276ff 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -24,6 +24,7 @@ Suggests: later (>= 1.1.0), magrittr (>= 2.0.0), promises, + R6, reticulate, rmarkdown, testthat (>= 3.0.0) diff --git a/NEWS.md b/NEWS.md index 7393124..980fbb3 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,5 +1,10 @@ # coro (development version) +* Async functions and generators can now be R6 methods (#63). + +* A memory leak that crept back in (see #36) was fixed with a more robust approach. + + # coro 1.1.0 * Iterator functions are now allowed to have a `close` argument. diff --git a/tests/testthat/test-async.R b/tests/testthat/test-async.R index f302454..180ce8a 100644 --- a/tests/testthat/test-async.R +++ b/tests/testthat/test-async.R @@ -364,3 +364,24 @@ test_that("async functions do not cause CMD check notes (#40)", { )) ) }) + +test_that("async methods in R6 classes", { + testthat::skip_if_not_installed("R6") + + AsyncClass <- R6::R6Class( + classname = "AsyncClass", + public = list( + async_resolved = async(function() "value"), + async_pending = async(function() await("value")) + ) + ) + class <- AsyncClass$new() + + later::with_temp_loop({ + out <- class$async_resolved() + expect_promise(out, "value", "fulfilled") + + out <- class$async_pending() + expect_promise(out, status = "pending") + }) +}) diff --git a/tests/testthat/test-generator.R b/tests/testthat/test-generator.R index e245981..779408b 100644 --- a/tests/testthat/test-generator.R +++ b/tests/testthat/test-generator.R @@ -536,3 +536,24 @@ test_that("returning early doesn't yield values (#51)", { }) expect_equal(collect(g()), list()) }) + +test_that("generator methods in R6 classes", { + testthat::skip_if_not_installed("R6") + + GenClass <- R6::R6Class( + "GenClass", + public = list( + gen_method = generator(function() { + yield(1) + yield(2) + }) + ) + ) + + obj <- GenClass$new() + g <- obj$gen_method() + + expect_equal(g(), 1) + expect_equal(g(), 2) + expect_exhausted(g()) +}) From 831dd86a0ef75e36f6f0e4463360d03a10c030e5 Mon Sep 17 00:00:00 2001 From: Lionel Henry Date: Sat, 13 Sep 2025 12:25:45 +0200 Subject: [PATCH 6/6] Fix R CMD check notes --- R/generator.R | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/R/generator.R b/R/generator.R index 52548d5..86b91ee 100644 --- a/R/generator.R +++ b/R/generator.R @@ -309,10 +309,14 @@ generator0 <- function(fn, type = "generator") { }) }) - body <- expr({ - `_static` <- !!static - !!!body - }) + # Don't use `expr({ !!!body })` because that causes R CMD check issues due to + # `length(body)` not matching the length of srcrefs of `{` + # https://github.com/r-lib/rlang/issues/1821 + body <- call2( + "{", + expr(`_static` <- !!static), + !!!as.list(body[-1]) + ) # Create the generator factory (returned by `generator()` and # entered by `async()`)