diff --git a/Cargo.lock b/Cargo.lock index 4fda3117c..5b2338f65 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -352,6 +352,7 @@ dependencies = [ "air_r_syntax", "amalthea", "anyhow", + "ark", "ark_test", "assert_matches", "async-trait", diff --git a/crates/amalthea/src/comm/event.rs b/crates/amalthea/src/comm/event.rs index 797544cd3..69a07532d 100644 --- a/crates/amalthea/src/comm/event.rs +++ b/crates/amalthea/src/comm/event.rs @@ -5,6 +5,7 @@ * */ +use crossbeam::channel::Sender; use serde_json::Value; use crate::comm::comm_channel::CommMsg; @@ -21,4 +22,10 @@ pub enum CommEvent { /// A Comm was closed Closed(String), + + /// Synchronisation barrier. Shell signals the sender after processing all + /// preceding events. The caller blocks on the paired receiver to guarantee + /// that earlier events (e.g. `Opened`) have been fully handled before + /// continuing. + Barrier(Sender<()>), } diff --git a/crates/amalthea/src/language/shell_handler.rs b/crates/amalthea/src/language/shell_handler.rs index 572bdecff..2e7eb9d90 100644 --- a/crates/amalthea/src/language/shell_handler.rs +++ b/crates/amalthea/src/language/shell_handler.rs @@ -6,6 +6,7 @@ */ use async_trait::async_trait; +use crossbeam::channel::Receiver; use crate::comm::comm_channel::Comm; use crate::comm::comm_channel::CommMsg; @@ -53,17 +54,17 @@ pub trait ShellHandler: Send { req: &IsCompleteRequest, ) -> crate::Result; - /// Handles a request to execute code. - /// - /// The `originator` is an opaque byte array identifying the peer that sent - /// the request; it is needed to perform an input request during execution. + /// Kicks off execution of the given request and returns a channel that + /// will receive the reply once execution completes. Shell select-loops + /// on this receiver together with `comm_event_rx` so it can process + /// comm events (e.g. barrier handshakes) while execution is in progress. /// /// Docs: https://jupyter-client.readthedocs.io/en/stable/messaging.html#execute - async fn handle_execute_request( + fn start_execute_request( &mut self, originator: Originator, req: &ExecuteRequest, - ) -> crate::Result; + ) -> Receiver>; /// Handles a request to provide completions for the given code fragment. /// diff --git a/crates/amalthea/src/socket/shell.rs b/crates/amalthea/src/socket/shell.rs index b4d367a4d..660473797 100644 --- a/crates/amalthea/src/socket/shell.rs +++ b/crates/amalthea/src/socket/shell.rs @@ -11,6 +11,7 @@ use std::sync::Arc; use std::sync::Mutex; use crossbeam::channel::Receiver; +use crossbeam::channel::Select; use crossbeam::channel::Sender; use futures::executor::block_on; use stdext::result::ResultExt; @@ -37,6 +38,8 @@ use crate::wire::comm_info_request::CommInfoRequest; use crate::wire::comm_msg::CommWireMsg; use crate::wire::comm_open::CommOpen; use crate::wire::exception::Exception; +use crate::wire::execute_reply::ExecuteReply; +use crate::wire::execute_request::ExecuteRequest; use crate::wire::jupyter_message::JupyterMessage; use crate::wire::jupyter_message::Message; use crate::wire::jupyter_message::ProtocolMessage; @@ -205,6 +208,10 @@ impl Shell { ); }, + CommEvent::Barrier(done_tx) => { + done_tx.send(()).log_err(); + }, + CommEvent::Message(comm_id, msg) => { let Some(comm) = self.open_comms.iter().find(|c| c.comm_id == comm_id) else { log::warn!("Received message for unknown comm channel {comm_id}: {msg:?}"); @@ -240,6 +247,15 @@ impl Shell { /// Process a message received from the front-end, optionally dispatching /// messages to the IOPub or execution threads fn process_message(&mut self, msg: Message) -> crate::Result<()> { + // Execute requests get special handling: Shell select-loops on both + // the execute response and comm events so it can process + // backend-initiated comm opens (with barrier handshakes) while R is + // still executing, preventing a deadlock where R waits for Shell to + // drain the barrier while Shell waits for the execute response. + if let Message::ExecuteRequest(req) = msg { + return self.handle_execute_request(req); + } + // Extract references to the components we need to pass to handlers. // This allows us to borrow different fields of self independently. let iopub_tx = &self.iopub_tx; @@ -256,13 +272,6 @@ impl Shell { Message::IsCompleteRequest(req) => Self::handle_request(iopub_tx, socket, req, |msg| { block_on(shell_handler.handle_is_complete_request(msg)) }), - Message::ExecuteRequest(req) => { - // FIXME: We should ideally not pass the originator to the language kernel - let originator = Originator::from(&req); - Self::handle_request(iopub_tx, socket, req, |msg| { - block_on(shell_handler.handle_execute_request(originator, msg)) - }) - }, Message::CompleteRequest(req) => Self::handle_request(iopub_tx, socket, req, |msg| { block_on(shell_handler.handle_complete_request(msg)) }), @@ -363,6 +372,63 @@ impl Shell { result.and(Ok(())) } + /// Handle an execute request. Unlike other requests that use the generic + /// `handle_request`, this method select-loops on both the execute response + /// and `comm_event_rx`. This allows Shell to process comm events (e.g. + /// `CommEvent::Barrier` from `comm_open_backend`) while the R thread is + /// still executing, preventing a deadlock where the R thread waits for + /// Shell to drain comm events while Shell waits for the execute response. + fn handle_execute_request(&mut self, req: JupyterMessage) -> crate::Result<()> { + self.iopub_tx + .send(status(req.clone(), ExecutionState::Busy)) + .unwrap(); + + log::info!("Received shell request: {req:?}"); + + // FIXME: We should ideally not pass the originator to the language kernel + let originator = Originator::from(&req); + let response_rx = self + .shell_handler + .start_execute_request(originator, &req.content); + + // Select-loop: drain comm events while waiting for the execute reply. + let result = loop { + let mut sel = Select::new(); + let resp_idx = sel.recv(&response_rx); + sel.recv(&self.comm_event_rx); + + let ready = sel.ready(); + + while let Ok(event) = self.comm_event_rx.try_recv() { + self.process_comm_event(event); + } + + if ready == resp_idx { + break response_rx.recv().unwrap(); + } + }; + + let result = match result { + Ok(reply) => req.send_reply(reply, &self.socket), + Err(crate::Error::ShellErrorReply(error)) => { + req.send_error::(error, &self.socket) + }, + Err(crate::Error::ShellErrorExecuteReply(error, exec_count)) => { + req.send_execute_error(error, exec_count, &self.socket) + }, + Err(err) => { + let error = Exception::internal_error(format!("{err:?}")); + req.send_error::(error, &self.socket) + }, + }; + + self.iopub_tx + .send(status(req.clone(), ExecutionState::Idle)) + .unwrap(); + + result.and(Ok(())) + } + fn handle_notification( iopub_tx: &Sender, not: JupyterMessage, diff --git a/crates/amalthea/tests/client/shell.rs b/crates/amalthea/tests/client/shell.rs index 0db730c3e..a3e019fb1 100644 --- a/crates/amalthea/tests/client/shell.rs +++ b/crates/amalthea/tests/client/shell.rs @@ -82,63 +82,8 @@ impl Shell { warn!("Could not prompt for input: {}", err); } } -} - -#[async_trait] -impl ShellHandler for Shell { - async fn handle_info_request( - &mut self, - _req: &KernelInfoRequest, - ) -> amalthea::Result { - let info = LanguageInfo { - name: String::from("Test"), - version: String::from("1.0"), - file_extension: String::from(".ech"), - mimetype: String::from("text/echo"), - pygments_lexer: None, - codemirror_mode: None, - nbconvert_exporter: None, - positron: None, - }; - Ok(KernelInfoReply { - status: Status::Ok, - banner: format!("Amalthea Echo {}", env!("CARGO_PKG_VERSION")), - implementation: String::from("echo"), - implementation_version: String::from(env!("CARGO_PKG_VERSION")), - debugger: false, - help_links: Vec::new(), - language_info: info, - }) - } - - async fn handle_complete_request( - &self, - _req: &CompleteRequest, - ) -> amalthea::Result { - // No matches in this toy implementation. - Ok(CompleteReply { - matches: Vec::new(), - status: Status::Ok, - cursor_start: 0, - cursor_end: 0, - metadata: json!({}), - }) - } - /// Handle a request to test code for completion. - async fn handle_is_complete_request( - &self, - _req: &IsCompleteRequest, - ) -> amalthea::Result { - // In this echo example, the code is always complete! - Ok(IsCompleteReply { - status: IsComplete::Complete, - indent: String::from(""), - }) - } - - /// Handles an ExecuteRequest; "executes" the code by echoing it. - async fn handle_execute_request( + fn execute( &mut self, originator: Originator, req: &ExecuteRequest, @@ -227,6 +172,72 @@ impl ShellHandler for Shell { user_expressions: serde_json::Value::Null, }) } +} + +#[async_trait] +impl ShellHandler for Shell { + async fn handle_info_request( + &mut self, + _req: &KernelInfoRequest, + ) -> amalthea::Result { + let info = LanguageInfo { + name: String::from("Test"), + version: String::from("1.0"), + file_extension: String::from(".ech"), + mimetype: String::from("text/echo"), + pygments_lexer: None, + codemirror_mode: None, + nbconvert_exporter: None, + positron: None, + }; + Ok(KernelInfoReply { + status: Status::Ok, + banner: format!("Amalthea Echo {}", env!("CARGO_PKG_VERSION")), + implementation: String::from("echo"), + implementation_version: String::from(env!("CARGO_PKG_VERSION")), + debugger: false, + help_links: Vec::new(), + language_info: info, + }) + } + + async fn handle_complete_request( + &self, + _req: &CompleteRequest, + ) -> amalthea::Result { + // No matches in this toy implementation. + Ok(CompleteReply { + matches: Vec::new(), + status: Status::Ok, + cursor_start: 0, + cursor_end: 0, + metadata: json!({}), + }) + } + + /// Handle a request to test code for completion. + async fn handle_is_complete_request( + &self, + _req: &IsCompleteRequest, + ) -> amalthea::Result { + // In this echo example, the code is always complete! + Ok(IsCompleteReply { + status: IsComplete::Complete, + indent: String::from(""), + }) + } + + /// Handles an ExecuteRequest; "executes" the code by echoing it. + fn start_execute_request( + &mut self, + originator: Originator, + req: &ExecuteRequest, + ) -> crossbeam::channel::Receiver> { + let (tx, rx) = crossbeam::channel::bounded(1); + let result = self.execute(originator, req); + tx.send(result).unwrap(); + rx + } /// Handles an introspection request async fn handle_inspect_request(&self, req: &InspectRequest) -> amalthea::Result { diff --git a/crates/ark/Cargo.toml b/crates/ark/Cargo.toml index 8ba3283f7..8c245020f 100644 --- a/crates/ark/Cargo.toml +++ b/crates/ark/Cargo.toml @@ -10,6 +10,9 @@ edition.workspace = true license.workspace = true rust-version.workspace = true +[features] +testing = [] + [lints] workspace = true @@ -73,6 +76,7 @@ winsafe.workspace = true yaml-rust.workspace = true [dev-dependencies] +ark = { workspace = true, features = ["testing"] } ark_test.workspace = true assert_matches.workspace = true insta.workspace = true diff --git a/crates/ark/src/comm_handler.rs b/crates/ark/src/comm_handler.rs index db647f776..6d4dcf6b9 100644 --- a/crates/ark/src/comm_handler.rs +++ b/crates/ark/src/comm_handler.rs @@ -18,9 +18,10 @@ use serde::de::DeserializeOwned; use serde::Serialize; use stdext::result::ResultExt; +use crate::console::Console; + /// Context provided to `CommHandler` methods, giving access to the outgoing -/// channel and close-request mechanism. In the future, we'll provide access to -/// more of the Console state, such as the currently active environment. +/// channel and close-request mechanism. #[derive(Debug)] pub struct CommHandlerContext { pub outgoing_tx: CommOutgoingTx, @@ -71,18 +72,24 @@ pub trait CommHandler: Debug { /// Initialise handler state on the R thread (initial scan, first event, /// etc.). Default is no-op. - fn handle_open(&mut self, _ctx: &CommHandlerContext) {} + fn handle_open(&mut self, _ctx: &CommHandlerContext, _console: &Console) {} /// Handle an incoming message (RPC or data). - fn handle_msg(&mut self, msg: CommMsg, ctx: &CommHandlerContext); + fn handle_msg(&mut self, msg: CommMsg, ctx: &CommHandlerContext, console: &Console); /// Handle comm close. Default is no-op. - fn handle_close(&mut self, _ctx: &CommHandlerContext) {} + fn handle_close(&mut self, _ctx: &CommHandlerContext, _console: &Console) {} /// Called when the environment changes. The `event` indicates what /// triggered the change so handlers can decide whether to react. /// Default is no-op. - fn handle_environment(&mut self, _event: &EnvironmentChanged, _ctx: &CommHandlerContext) {} + fn handle_environment( + &mut self, + _event: &EnvironmentChanged, + _ctx: &CommHandlerContext, + _console: &Console, + ) { + } } /// Why the environment changed. diff --git a/crates/ark/src/console.rs b/crates/ark/src/console.rs index 66ba4fa3d..8e512ceaa 100644 --- a/crates/ark/src/console.rs +++ b/crates/ark/src/console.rs @@ -146,7 +146,7 @@ use crate::lsp::state_handlers::ConsoleInputs; use crate::modules; use crate::modules::ARK_ENVS; use crate::plots::graphics_device; -use crate::plots::graphics_device::GraphicsDeviceNotification; +use crate::plots::graphics_device::DeviceContext; use crate::r_task; use crate::r_task::BoxFuture; use crate::r_task::QueuedRTask; @@ -177,7 +177,7 @@ thread_local! { pub static CONSOLE: RefCell> = panic!("Must access `CONSOLE` from the R thread"); } -pub(crate) struct Console { +pub struct Console { pub(crate) positron_ns: Option, kernel_request_rx: Receiver, @@ -338,4 +338,7 @@ pub(crate) struct Console { /// Comm handlers registered on the R thread (keyed by comm ID). comms: HashMap, + + /// Graphics device state (plot recording, rendering, comm management). + device_context: DeviceContext, } diff --git a/crates/ark/src/console/console_comm.rs b/crates/ark/src/console/console_comm.rs index 11c283c91..1f6a233d9 100644 --- a/crates/ark/src/console/console_comm.rs +++ b/crates/ark/src/console/console_comm.rs @@ -25,7 +25,7 @@ impl Console { log::warn!("Received message for unknown registered comm {comm_id}"); return; }; - comm.handler.handle_msg(msg, &comm.ctx); + comm.handler.handle_msg(msg, &comm.ctx, Console::get()); self.drain_closed(); } @@ -34,13 +34,19 @@ impl Console { log::warn!("Received close for unknown registered comm {comm_id}"); return; }; - comm.handler.handle_close(&comm.ctx); + comm.handler.handle_close(&comm.ctx, Console::get()); } /// Register a backend-initiated comm on the R thread. /// /// Creates the `CommSocket` and `CommHandlerContext`, calls `handle_open`, - /// sends `CommEvent::Opened` to amalthea, and returns the comm ID. + /// sends `CommEvent::Opened` to Amalthea's Shell thread, and returns the + /// comm ID. + /// + /// Blocks until Shell has fully processed the open (sent `comm_open` on + /// IOPub and registered the comm for routing). This guarantees that any + /// `comm_msg` sent by the caller afterwards are ordered after the + /// `comm_open` on IOPub. pub(crate) fn comm_open_backend( &mut self, comm_name: &str, @@ -57,7 +63,7 @@ impl Console { ); let ctx = CommHandlerContext::new(comm.outgoing_tx.clone(), self.comm_event_tx.clone()); - handler.handle_open(&ctx); + handler.handle_open(&ctx, Console::get()); self.comms .insert(comm_id.clone(), ConsoleComm { handler, ctx }); @@ -65,6 +71,13 @@ impl Console { self.comm_event_tx .send(CommEvent::Opened(comm, open_metadata))?; + // Block until Shell has processed the Opened event, ensuring the + // `comm_open` message is on IOPub before we return. Any updates + // the caller sends after this point are guaranteed to follow it. + let (done_tx, done_rx) = crossbeam::channel::bounded(0); + self.comm_event_tx.send(CommEvent::Barrier(done_tx))?; + done_rx.recv()?; + Ok(comm_id) } @@ -82,13 +95,13 @@ impl Console { mut handler: Box, ) { let ctx = CommHandlerContext::new(outgoing_tx, self.comm_event_tx.clone()); - handler.handle_open(&ctx); + handler.handle_open(&ctx, Console::get()); if comm_name == UI_COMM_NAME { if let Some(old_id) = self.ui_comm_id.take() { log::info!("Replacing an existing UI comm."); if let Some(mut old) = self.comm_remove(&old_id) { - old.handler.handle_close(&old.ctx); + old.handler.handle_close(&old.ctx, Console::get()); } } self.ui_comm_id = Some(comm_id.clone()); @@ -99,7 +112,8 @@ impl Console { pub(super) fn comm_notify_environment_changed(&mut self, event: &EnvironmentChanged) { for (_, comm) in self.comms.iter_mut() { - comm.handler.handle_environment(event, &comm.ctx); + comm.handler + .handle_environment(event, &comm.ctx, Console::get()); } self.drain_closed(); } diff --git a/crates/ark/src/console/console_repl.rs b/crates/ark/src/console/console_repl.rs index e35fc4529..16ce3d4a2 100644 --- a/crates/ark/src/console/console_repl.rs +++ b/crates/ark/src/console/console_repl.rs @@ -342,7 +342,6 @@ impl Console { dap: Arc>, session_mode: SessionMode, default_repos: DefaultRepos, - graphics_device_rx: AsyncUnboundedReceiver, console_notification_rx: AsyncUnboundedReceiver, ) { // Set the main thread ID. @@ -497,17 +496,9 @@ impl Console { } })); - // Initialize the GD context on this thread. - // Note that we do it after init is complete to avoid deadlocking - // integration tests by spawning an async task. The deadlock is caused - // by https://github.com/posit-dev/ark/blob/bd827e735970ca17102aeddfbe2c3ccf26950a36/crates/ark/src/r_task.rs#L261. - // We should be able to remove this escape hatch in `r_task()` by - // instantiating an `Console` in unit tests as well. - graphics_device::init_graphics_device( - console.comm_event_tx.clone(), - console.iopub_tx().clone(), - graphics_device_rx, - ); + // R-side graphics device initialization. The `DeviceContext` + // itself is already created as part of `Console::new()`. + graphics_device::init_graphics_device(); // Now that R has started and libr and ark have fully initialized, run site and user // level R profiles, in that order @@ -612,6 +603,8 @@ impl Console { dap: Arc>, session_mode: SessionMode, ) -> Self { + let device_context = DeviceContext::new(iopub_tx.clone()); + Self { r_request_rx, comm_event_tx, @@ -658,6 +651,7 @@ impl Console { read_console_shutdown: Cell::new(false), debug_filter: ConsoleFilter::new(), comms: HashMap::new(), + device_context, } } @@ -675,10 +669,66 @@ impl Console { /// Access a reference to the singleton instance of this struct /// /// SAFETY: Accesses must occur after `Console::start()` initializes it. - pub(crate) fn get() -> &'static Self { + pub fn get() -> &'static Self { Console::get_mut() } + /// Install a minimal stopgap `Console` in the thread-local for unit tests + /// that need a `&Console` (e.g. to pass to `CommHandler` methods) but + /// don't go through the full `Console::start()` path. + /// + /// All internal channels are created with their counterpart immediately + /// dropped, so the Console is inert: it never processes requests, tasks, + /// or kernel messages. Any attempt to send to IOPub or other channels + /// will return a disconnected error, which surfaces problems early + /// rather than silently hanging. + /// + /// For tests that exercise real Console behaviour (execution, comms + /// lifecycle, IOPub output), prefer full integration tests with + /// `DummyArkFrontend` which spins up a real Console. + /// + /// Idempotent per thread. Does not set `R_INIT`, so the `r_task()` escape + /// hatch for unit tests continues to work. + #[cfg(feature = "testing")] + pub fn test_init() { + use std::cell::Cell; + thread_local! { + static INITIALIZED: Cell = const { Cell::new(false) }; + } + + INITIALIZED.with(|init| { + if init.get() { + return; + } + init.set(true); + + let (_, tasks_interrupt_rx) = crossbeam::channel::unbounded(); + let (_, tasks_idle_rx) = crossbeam::channel::unbounded(); + let (_, tasks_idle_any_rx) = crossbeam::channel::unbounded(); + let (comm_event_tx, _) = crossbeam::channel::unbounded(); + let (r_request_tx, r_request_rx) = crossbeam::channel::unbounded(); + let (stdin_request_tx, _) = crossbeam::channel::unbounded(); + let (_, stdin_reply_rx) = crossbeam::channel::unbounded(); + let (iopub_tx, _) = crossbeam::channel::unbounded(); + let (_, kernel_request_rx) = crossbeam::channel::unbounded(); + let dap = Dap::new_shared(r_request_tx); + + CONSOLE.set(UnsafeCell::new(Console::new( + tasks_interrupt_rx, + tasks_idle_rx, + tasks_idle_any_rx, + comm_event_tx, + r_request_rx, + stdin_request_tx, + stdin_reply_rx, + iopub_tx, + kernel_request_rx, + dap, + SessionMode::Console, + ))); + }); + } + /// Access a mutable reference to the singleton instance of this struct /// /// SAFETY: Accesses must occur after `Console::start()` initializes it. @@ -709,6 +759,10 @@ impl Console { &self.comm_event_tx } + pub(crate) fn device_context(&self) -> &DeviceContext { + &self.device_context + } + /// Run a closure while capturing console output. /// Returns the closure's result paired with any captured output. pub(crate) fn with_capture(f: impl FnOnce() -> T) -> (T, String) { @@ -2336,13 +2390,6 @@ impl Console { // might end up being executed on the LSP thread. // https://github.com/rstudio/positron/issues/431 unsafe { R_RunPendingFinalizers() }; - - // Check for Positron render requests. - // - // TODO: This should move to a spawned task that'd be woken up by - // incoming messages on plot comms. This way we'll prevent the delays - // introduced by timeout-based event polling. - graphics_device::on_process_idle_events(); } pub(super) fn eval_env(&self) -> RObject { diff --git a/crates/ark/src/data_explorer/r_data_explorer.rs b/crates/ark/src/data_explorer/r_data_explorer.rs index 43b208cdf..4d267414a 100644 --- a/crates/ark/src/data_explorer/r_data_explorer.rs +++ b/crates/ark/src/data_explorer/r_data_explorer.rs @@ -481,13 +481,18 @@ impl CommHandler for RDataExplorer { serde_json::json!({ "title": self.title, "inline_only": inline_only }) } - fn handle_msg(&mut self, msg: CommMsg, ctx: &CommHandlerContext) { + fn handle_msg(&mut self, msg: CommMsg, ctx: &CommHandlerContext, _console: &Console) { handle_rpc_request(&ctx.outgoing_tx, DATA_EXPLORER_COMM_NAME, msg, |req| { self.handle_rpc(req, ctx) }); } - fn handle_environment(&mut self, event: &EnvironmentChanged, ctx: &CommHandlerContext) { + fn handle_environment( + &mut self, + event: &EnvironmentChanged, + ctx: &CommHandlerContext, + _console: &Console, + ) { let EnvironmentChanged::Execution { .. } = event else { return; }; diff --git a/crates/ark/src/fixtures/utils.rs b/crates/ark/src/fixtures/utils.rs index 5a8a01d46..67cfd5fc2 100644 --- a/crates/ark/src/fixtures/utils.rs +++ b/crates/ark/src/fixtures/utils.rs @@ -9,6 +9,8 @@ use std::sync::Once; use tree_sitter::Point; +#[cfg(feature = "testing")] +use crate::console::Console; use crate::modules; use crate::modules::ARK_ENVS; @@ -21,6 +23,10 @@ pub fn r_test_init() { let ns = modules::initialize().unwrap(); modules::initialize_options(ns.sexp).unwrap(); }); + // Per-thread: install a minimal Console singleton so that unit tests + // can pass `&Console` to comm handlers via `Console::get()`. + #[cfg(feature = "testing")] + Console::test_init(); } pub fn point_from_cursor(x: &str) -> (String, Point) { diff --git a/crates/ark/src/modules/positron/graphics.R b/crates/ark/src/modules/positron/graphics.R index 3eebd8b0f..5bc804349 100644 --- a/crates/ark/src/modules/positron/graphics.R +++ b/crates/ark/src/modules/positron/graphics.R @@ -75,6 +75,10 @@ setHook("before.grid.newpage", action = "replace", function(...) { grDevices::deviceIsInteractive(ARK_GRAPHICS_DEVICE_NAME) } +current_plot_id <- function() { + .ps.Call("ps_graphics_current_plot_id") +} + # Create a recording of the current plot. # # This saves the plot's display list, so it can be used to re-render plots as diff --git a/crates/ark/src/plots/graphics_device.rs b/crates/ark/src/plots/graphics_device.rs index 4698bc31f..97e258a9a 100644 --- a/crates/ark/src/plots/graphics_device.rs +++ b/crates/ark/src/plots/graphics_device.rs @@ -15,7 +15,6 @@ use std::io::BufReader; use std::io::Read; use amalthea::comm::comm_channel::CommMsg; -use amalthea::comm::event::CommEvent; use amalthea::comm::plot_comm::IntrinsicSize; use amalthea::comm::plot_comm::PlotBackendReply; use amalthea::comm::plot_comm::PlotBackendRequest; @@ -29,8 +28,7 @@ use amalthea::comm::plot_comm::PlotResult; use amalthea::comm::plot_comm::PlotSize; use amalthea::comm::plot_comm::PlotUnit; use amalthea::comm::plot_comm::UpdateParams; -use amalthea::socket::comm::CommInitiator; -use amalthea::socket::comm::CommSocket; +use amalthea::socket::comm::CommOutgoingTx; use amalthea::socket::iopub::IOPubMessage; use amalthea::wire::display_data::DisplayData; use amalthea::wire::execute_request::CodeLocation; @@ -41,7 +39,6 @@ use anyhow::anyhow; use anyhow::Context; use base64::engine::general_purpose; use base64::Engine; -use crossbeam::channel::Select; use crossbeam::channel::Sender; use harp::exec::RFunction; use harp::exec::RFunctionExt; @@ -53,68 +50,25 @@ use libr::SEXP; use serde_json::json; use stdext::result::ResultExt; use stdext::unwrap; -use tokio::sync::mpsc::UnboundedReceiver as AsyncUnboundedReceiver; use uuid::Uuid; +use crate::comm_handler::handle_rpc_request; +use crate::comm_handler::CommHandler; +use crate::comm_handler::CommHandlerContext; use crate::console::Console; use crate::console::SessionMode; use crate::modules::ARK_ENVS; use crate::r_task; -use crate::r_task::RTask; -#[derive(Debug)] -pub(crate) enum GraphicsDeviceNotification { - DidChangePlotRenderSettings(PlotRenderSettings), -} - -thread_local! { - // Safety: Set once by `Console` on initialization - static DEVICE_CONTEXT: RefCell = panic!("Must access `DEVICE_CONTEXT` from the R thread"); -} - -const POSITRON_PLOT_CHANNEL_ID: &str = "positron.plot"; - -// Expose thread initialization via function so we can keep the structs private. -// Must be called from the main R thread. -pub(crate) fn init_graphics_device( - comm_event_tx: Sender, - iopub_tx: Sender, - graphics_device_rx: AsyncUnboundedReceiver, -) { - DEVICE_CONTEXT.set(DeviceContext::new(comm_event_tx, iopub_tx)); +pub const PLOT_COMM_NAME: &str = "positron.plot"; +/// Perform R-side initialization of the graphics device. +/// Must be called from the main R thread after Console is initialized. +pub(crate) fn init_graphics_device() { // Declare our graphics device as interactive if let Err(err) = RFunction::from(".ps.graphics.register_as_interactive").call() { log::error!("Failed to register Ark graphics device as interactive: {err:?}"); }; - - // Launch an R thread task to process messages from the frontend - r_task::spawn(RTask::interrupt(async move || { - process_notifications(graphics_device_rx).await - })); -} - -async fn process_notifications( - mut graphics_device_rx: AsyncUnboundedReceiver, -) { - log::trace!("Now listening for graphics device notifications"); - - loop { - while let Some(notification) = graphics_device_rx.recv().await { - log::trace!("Got graphics device notification: {notification:#?}"); - - match notification { - GraphicsDeviceNotification::DidChangePlotRenderSettings(plot_render_settings) => { - // Safety: Note that `DEVICE_CONTEXT` is accessed at - // interrupt time. Other methods in this file should be - // written in accordance and avoid causing R interrupt - // checks while they themselves access the device. - DEVICE_CONTEXT - .with_borrow(|ctx| ctx.prerender_settings.replace(plot_render_settings)); - }, - } - } - } } /// Wrapped callbacks of the original graphics device we shadow @@ -150,10 +104,15 @@ struct PlotContext { intrinsic_size: Option, } -struct DeviceContext { - /// Channel for sending [CommEvent]s to Positron when plot events occur - comm_event_tx: Sender, - +/// Graphics device state: plot recording, rendering, and comm management. +/// +/// Fields use `Cell`/`RefCell` for interior mutability because the R graphics +/// device callbacks are C function pointers that receive `&DeviceContext` (via +/// `Console::get().device_context()`). There is no way to thread `&mut` through +/// R's callback registration layer. A future refactor could wrap the C-to-Rust +/// bridge so that the Rust-facing hook methods receive `&mut self` explicitly, +/// containing the `Console::get()` unsoundness in one place. +pub(crate) struct DeviceContext { /// Channel for sending [IOPubMessage::DisplayData] and /// [IOPubMessage::UpdateDisplayData] to Jupyter frontends when plot events occur iopub_tx: Sender, @@ -196,9 +155,9 @@ struct DeviceContext { /// device specifications (i.e. for Positron's Plots pane). id: RefCell, - /// Mapping of plot ID to the communication socket used for communicating its - /// rendered results to the frontend. - sockets: RefCell>, + /// Mapping of `PlotId` to comm ID, used for sending update events to + /// existing plot comms via `CommOutgoingTx`. + comm_ids: RefCell>, /// Per-plot context captured at creation time (metadata and optional intrinsic size). plot_contexts: RefCell>, @@ -230,16 +189,15 @@ struct DeviceContext { } impl DeviceContext { - fn new(comm_event_tx: Sender, iopub_tx: Sender) -> Self { + pub fn new(iopub_tx: Sender) -> Self { Self { - comm_event_tx, iopub_tx, has_changes: Cell::new(false), is_new_page: Cell::new(true), is_drawing: Cell::new(false), should_render: Cell::new(true), id: RefCell::new(Self::new_id()), - sockets: RefCell::new(HashMap::new()), + comm_ids: RefCell::new(HashMap::new()), plot_contexts: RefCell::new(HashMap::new()), kind_counters: RefCell::new(HashMap::new()), wrapped_callbacks: WrappedDeviceCallbacks::default(), @@ -257,6 +215,10 @@ impl DeviceContext { } } + pub fn set_prerender_settings(&self, settings: PlotRenderSettings) { + self.prerender_settings.replace(settings); + } + /// Set the current execution context (called when an execute request starts) fn set_execution_context( &self, @@ -358,7 +320,13 @@ impl DeviceContext { fn hook_holdflush(&self, level: i32) { // Be extra safe and check `level <= 0` rather than just `level == 0` in case // our shadowed device returns a negative `level` - self.should_render.replace(level <= 0); + let is_released = level <= 0; + let was_rendering = self.should_render.replace(is_released); + + // Flush deferred changes on hold→release transition + if !was_rendering && is_released { + self.process_changes(); + } } #[tracing::instrument(level = "trace", skip_all, fields(mode = %mode))] @@ -496,111 +464,6 @@ impl DeviceContext { format!("{} {}", kind, counter) } - /// Process outstanding RPC requests received from Positron - /// - /// At idle time we loop through our set of plot channels and check if Positron has - /// responded on any of them stating that it is ready for us to replay and render - /// the actual plot, and then send back the bytes that represent that plot. - /// - /// Note that we only send back rendered plots at idle time. This means that if you - /// do something like: - /// - /// ```r - /// for (i in 1:5) { - /// plot(i) - /// Sys.sleep(1) - /// } - /// ``` - /// - /// Then it goes something like this: - /// - At each new page event we tell Positron there we have a new plot for it - /// - Positron sets up 5 blank plot windows and sends back an RPC requesting the plot - /// data - /// - AFTER the entire for loop has finished and we hit idle time, we drop into - /// `process_rpc_requests()` and render all 5 plots at once - /// - /// Practically this seems okay, it is just something to keep in mind. - #[tracing::instrument(level = "trace", skip_all)] - fn process_rpc_requests(&self) { - // Don't try to render a plot if we're currently drawing. - if self.is_drawing.get() { - log::trace!("Refusing to render due to `is_drawing`"); - return; - } - - // Don't try to render a plot if someone is asking us not to, i.e. `dev.hold()` - if !self.should_render.get() { - log::trace!("Refusing to render due to `should_render`"); - return; - } - - // Collect existing sockets into a vector of tuples. - // Necessary for handling Select in a clean way. - let sockets = { - // Refcell Safety: Clone the hashmap so we don't hold a reference for too long - let sockets = self.sockets.borrow().clone(); - sockets.into_iter().collect::>() - }; - - // Dynamically load all incoming channels within the sockets into a single `Select` - let mut select = Select::new(); - for (_id, sockets) in sockets.iter() { - select.recv(&sockets.incoming_rx); - } - - // Check for incoming plot render requests. - // Totally possible to have >1 requests pending, especially if we've plotted - // multiple things in a single chunk of R code. The `Err` case is likely just - // that no channels have any messages, so we don't log in that case. - while let Ok(selection) = select.try_select() { - let socket = sockets - .get(selection.index()) - .expect("Socket should exist for the selection index"); - let id = &socket.0; - let socket = &socket.1; - - // Receive on the "selected" channel - let message = match selection.recv(&socket.incoming_rx) { - Ok(message) => message, - Err(error) => { - // If the channel is disconnected, log and remove it so we don't try - // and `recv()` on it ever again - log::error!("{error:?}"); - // Refcell Safety: Short borrows in the file. - self.sockets.borrow_mut().remove(id); - - // Process remaining messages. Safe to do because we have - // removed the `DeviceContext`'s copy off the sockets but we - // are working through our own copy of them. - continue; - }, - }; - - match message { - CommMsg::Rpc { .. } => { - log::trace!("Handling `RPC` for plot `id` {id}"); - socket.handle_request(message, |req| self.handle_rpc(req, id)); - }, - - // Note that ideally this handler should be invoked before we - // check for `should_render`. I.e. we should acknowledge a plot - // has been closed on the frontend side even when `dev.hold()` - // is active. Doing so would require some more careful - // bookkeeping of the state though, and since this is a very - // unlikely sequence of action nothing really bad happens with - // the current approach, we decided to keep handling here. - CommMsg::Close => { - log::trace!("Handling `Close` for plot `id` {id}"); - self.close_plot(id) - }, - - message => { - log::error!("Received unexpected comm message for plot `id` {id}: {message:?}") - }, - } - } - } - #[tracing::instrument(level = "trace", skip_all, fields(id = %id))] fn handle_rpc( &self, @@ -684,14 +547,10 @@ impl DeviceContext { } #[tracing::instrument(level = "trace", skip(self))] - fn close_plot(&self, id: &PlotId) { - // RefCell safety: Short borrows in the file - self.sockets.borrow_mut().remove(id); + fn on_plot_closed(&self, id: &PlotId) { + self.comm_ids.borrow_mut().remove(id); self.plot_contexts.borrow_mut().remove(id); - // The plot data is stored at R level. Assumes we're called on the R - // thread at idle time so there's no race issues (see - // `on_process_idle_events()`). if let Err(err) = RFunction::from("remove_recording") .param("id", id) .call_in(ARK_ENVS.positron_ns) @@ -724,17 +583,21 @@ impl DeviceContext { fn process_changes(&self) { let id = self.id(); - if !self.has_changes.replace(false) { + if !self.has_changes.get() { log::trace!("No changes to process for plot `id` {id}"); return; } log::trace!("Processing changes for plot `id` {id}"); - // Record the changes so we can replay them when Positron asks us for them. - // Recording here overrides an existing recording for `id` if something has - // changed between then and now, which is what we want, for example, we want - // it when running this line by line: + // Always record the current display list, even when rendering is held. + // `ps_graphics_before_plot_new` calls us to snapshot the display list + // before a new page clears it. Skipping the recording here would + // permanently lose the previous plot's state. + // + // Recording here overrides an existing recording for `id` if something + // has changed between then and now, which is what we want, for example, + // we want it when running this line by line: // // ```r // par(mfrow = c(2, 1)) @@ -743,6 +606,15 @@ impl DeviceContext { // ``` Self::record_plot(&id); + if !self.should_render.get() { + // Keep `has_changes` set so we re-enter this branch after the hold + // is released (via `hook_holdflush`) and send the notification then. + log::trace!("Deferring notification for plot `id` {id} (rendering held)"); + return; + } + + self.has_changes.replace(false); + if self.is_new_page.replace(false) { self.process_new_plot(&id); } else { @@ -778,22 +650,13 @@ impl DeviceContext { let ctx = self.capture_execution_context(); self.store_plot_context(id, &ctx); - // Let Positron know that we just created a new plot. - let socket = CommSocket::new( - CommInitiator::BackEnd, - id.to_string(), - POSITRON_PLOT_CHANNEL_ID.to_string(), - self.iopub_tx.clone(), - ); - // Use render settings from the execute request if available, otherwise fall back // to the default prerender settings. let settings = ctx .render_settings .unwrap_or_else(|| self.prerender_settings.get()); - // Prepare a pre-rendering of the plot so Positron has something to display immediately - let data = match self.render_plot(id, &settings) { + let open_data = match self.render_plot(id, &settings) { Ok(pre_render) => { let mime_type = Self::get_mime_type(&PlotRenderFormat::Png); @@ -811,14 +674,19 @@ impl DeviceContext { }, }; - let event = CommEvent::Opened(socket.clone(), data); - if let Err(error) = self.comm_event_tx.send(event) { - log::error!("{error:?}"); - } + let plot_comm = PlotComm { + id: id.clone(), + open_data, + }; - // Save our new socket. - // Refcell Safety: Short borrows in the file. - self.sockets.borrow_mut().insert(id.clone(), socket); + match Console::get_mut().comm_open_backend(PLOT_COMM_NAME, Box::new(plot_comm)) { + Ok(comm_id) => { + self.comm_ids.borrow_mut().insert(id.clone(), comm_id); + }, + Err(err) => { + log::error!("Failed to register plot comm: {err:?}"); + }, + } } #[tracing::instrument(level = "trace", skip_all, fields(id = %id))] @@ -888,17 +756,14 @@ impl DeviceContext { fn process_update_plot_positron(&self, id: &PlotId) { log::trace!("Notifying Positron of plot update"); - // Refcell Safety: Make sure not to call other methods from this whole block. - let sockets = self.sockets.borrow(); - - // Find our socket - let socket = unwrap!(sockets.get(id), None => { - // If socket doesn't exist, bail, nothing to update (should be rare, likely a bug?) - log::error!("Can't find socket to update with id: {id}."); - return; - }); + let comm_id = match self.comm_ids.borrow().get(id).cloned() { + Some(id) => id, + None => { + log::error!("Can't find comm to update with id: {id}."); + return; + }, + }; - // Create a pre-rendering of the updated plot let settings = self.prerender_settings.get(); let update_params = match self.render_plot(id, &settings) { Ok(pre_render) => { @@ -922,11 +787,10 @@ impl DeviceContext { let value = serde_json::to_value(PlotFrontendEvent::Update(update_params)).unwrap(); - // Tell Positron we have an updated plot with optional pre-rendering - socket - .outgoing_tx + let outgoing_tx = CommOutgoingTx::new(comm_id, self.iopub_tx.clone()); + outgoing_tx .send(CommMsg::Data(value)) - .context("Failed to send update message for id {id}.") + .context(format!("Failed to send update message for id {id}.")) .log_err(); } @@ -993,36 +857,24 @@ impl DeviceContext { fn render_plot(&self, id: &PlotId, settings: &PlotRenderSettings) -> anyhow::Result { log::trace!("Rendering plot"); - let image_path = r_task(|| { - RFunction::from(".ps.graphics.render_plot_from_recording") - .param("id", id) - .param("width", RObject::try_from(settings.size.width)?) - .param("height", RObject::try_from(settings.size.height)?) - .param("pixel_ratio", settings.pixel_ratio) - .param("format", settings.format.to_string()) - .call()? - .to::() - }); - - let image_path = match image_path { - Ok(image_path) => image_path, - Err(error) => { - return Err(anyhow::anyhow!( - "Failed to render plot with `id` {id} due to: {error}." - )) - }, - }; + let image_path: String = RFunction::from(".ps.graphics.render_plot_from_recording") + .param("id", id) + .param("width", RObject::try_from(settings.size.width)?) + .param("height", RObject::try_from(settings.size.height)?) + .param("pixel_ratio", settings.pixel_ratio) + .param("format", settings.format.to_string()) + .call()? + .try_into() + .map_err(|err: harp::Error| anyhow!("Failed to render plot with `id` {id}: {err:?}"))?; log::trace!("Rendered plot to {image_path}"); - // Read contents into bytes. let conn = File::open(image_path)?; let mut reader = BufReader::new(conn); let mut buffer = vec![]; reader.read_to_end(&mut buffer)?; - // what an odd interface let data = general_purpose::STANDARD_NO_PAD.encode(buffer); Ok(data) @@ -1049,6 +901,31 @@ impl DeviceContext { } } +/// Per-plot comm handler registered in Console's comm table. +/// Delegates RPC handling and lifecycle events to the shared `DeviceContext`. +#[derive(Debug)] +struct PlotComm { + id: PlotId, + open_data: serde_json::Value, +} + +impl CommHandler for PlotComm { + fn open_metadata(&self) -> serde_json::Value { + self.open_data.clone() + } + + fn handle_msg(&mut self, msg: CommMsg, ctx: &CommHandlerContext, console: &Console) { + let dc = console.device_context(); + handle_rpc_request(&ctx.outgoing_tx, PLOT_COMM_NAME, msg, |req| { + dc.handle_rpc(req, &self.id) + }); + } + + fn handle_close(&mut self, _ctx: &CommHandlerContext, console: &Console) { + console.device_context().on_plot_closed(&self.id); + } +} + // TODO: This macro needs to be updated every time we introduce support // for a new graphics device. Is there a better way? macro_rules! with_device { @@ -1101,15 +978,6 @@ impl From<&PlotId> for RObject { } } -/// Hook applied at idle time (`R_ProcessEvents()` time) to process any outstanding -/// RPC requests from Positron -/// -/// This is called a lot, so we don't trace log each entry -#[tracing::instrument(level = "trace", skip_all)] -pub(crate) fn on_process_idle_events() { - DEVICE_CONTEXT.with_borrow(|cell| cell.process_rpc_requests()); -} - /// Default DPI for converting inches to pixels. /// Matches R's default: 96 on macOS, 72 on Linux/Windows. /// See `default_resolution_in_pixels_per_inch()` in graphics.R. @@ -1240,15 +1108,13 @@ pub(crate) fn on_execute_request( intrinsic_size: Option, ) { log::trace!("Entering on_execute_request"); - DEVICE_CONTEXT.with_borrow(|cell| { - cell.set_execution_context( - execution_id, - code, - code_location, - render_settings, - intrinsic_size, - ) - }); + Console::get().device_context().set_execution_context( + execution_id, + code, + code_location, + render_settings, + intrinsic_size, + ); } /// Hook applied after a code chunk has finished executing @@ -1271,11 +1137,10 @@ pub(crate) fn on_execute_request( #[tracing::instrument(level = "trace", skip_all)] pub(crate) fn on_did_execute_request() { log::trace!("Entering on_did_execute_request"); - DEVICE_CONTEXT.with_borrow(|cell| { - cell.process_changes(); - cell.clear_execution_context(); - cell.clear_pending_origin(); - }); + let dc = Console::get().device_context(); + dc.process_changes(); + dc.clear_execution_context(); + dc.clear_pending_origin(); } /// Activation callback @@ -1288,11 +1153,10 @@ pub(crate) fn on_did_execute_request() { unsafe extern "C-unwind" fn callback_activate(dev: pDevDesc) { log::trace!("Entering callback_activate"); - DEVICE_CONTEXT.with_borrow(|cell| { - if let Some(callback) = cell.wrapped_callbacks.activate.get() { - callback(dev); - } - }); + let dc = Console::get().device_context(); + if let Some(callback) = dc.wrapped_callbacks.activate.get() { + callback(dev); + } } /// Deactivation callback @@ -1303,42 +1167,40 @@ unsafe extern "C-unwind" fn callback_activate(dev: pDevDesc) { unsafe extern "C-unwind" fn callback_deactivate(dev: pDevDesc) { log::trace!("Entering callback_deactivate"); - DEVICE_CONTEXT.with_borrow(|cell| { - // We run our hook first to record before we deactivate the underlying device, - // in case device deactivation messes with the display list - cell.hook_deactivate(); - if let Some(callback) = cell.wrapped_callbacks.deactivate.get() { - callback(dev); - } - }); + let dc = Console::get().device_context(); + // We run our hook first to record before we deactivate the underlying device, + // in case device deactivation messes with the display list + dc.hook_deactivate(); + if let Some(callback) = dc.wrapped_callbacks.deactivate.get() { + callback(dev); + } } #[tracing::instrument(level = "trace", skip_all, fields(level_delta = %level_delta))] unsafe extern "C-unwind" fn callback_holdflush(dev: pDevDesc, level_delta: i32) -> i32 { log::trace!("Entering callback_holdflush"); - DEVICE_CONTEXT.with_borrow(|cell| { - // If our wrapped device has a `holdflush()` method, we rely on it to apply - // the `level_delta` (typically `+1` or `-1`) and return the new level. Otherwise - // we follow the lead of `devholdflush()` in R and use a resolved `level` of `0`. - // Notably, `grDevices::png()` with a Cairo backend does not have a holdflush - // hook. - // https://github.com/wch/r-source/blob/8cebcc0a5d99890839e5171f398da643d858dcca/src/library/grDevices/src/devices.c#L129-L138 - let level = match cell.wrapped_callbacks.holdflush.get() { - Some(callback) => { - let level = callback(dev, level_delta); - log::trace!("Using resolved holdflush level from wrapped callback: {level}"); - level - }, - None => { - let level = 0; - log::trace!("Using default holdflush level: {level}"); - level - }, - }; - cell.hook_holdflush(level); - level - }) + let dc = Console::get().device_context(); + // If our wrapped device has a `holdflush()` method, we rely on it to apply + // the `level_delta` (typically `+1` or `-1`) and return the new level. Otherwise + // we follow the lead of `devholdflush()` in R and use a resolved `level` of `0`. + // Notably, `grDevices::png()` with a Cairo backend does not have a holdflush + // hook. + // https://github.com/wch/r-source/blob/8cebcc0a5d99890839e5171f398da643d858dcca/src/library/grDevices/src/devices.c#L129-L138 + let level = match dc.wrapped_callbacks.holdflush.get() { + Some(callback) => { + let level = callback(dev, level_delta); + log::trace!("Using resolved holdflush level from wrapped callback: {level}"); + level + }, + None => { + let level = 0; + log::trace!("Using default holdflush level: {level}"); + level + }, + }; + dc.hook_holdflush(level); + level } // mode = 0, graphics off @@ -1348,24 +1210,22 @@ unsafe extern "C-unwind" fn callback_holdflush(dev: pDevDesc, level_delta: i32) unsafe extern "C-unwind" fn callback_mode(mode: i32, dev: pDevDesc) { log::trace!("Entering callback_mode"); - DEVICE_CONTEXT.with_borrow(|cell| { - if let Some(callback) = cell.wrapped_callbacks.mode.get() { - callback(mode, dev); - } - cell.hook_mode(mode); - }); + let dc = Console::get().device_context(); + if let Some(callback) = dc.wrapped_callbacks.mode.get() { + callback(mode, dev); + } + dc.hook_mode(mode); } #[tracing::instrument(level = "trace", skip_all)] unsafe extern "C-unwind" fn callback_new_page(dd: pGEcontext, dev: pDevDesc) { log::trace!("Entering callback_new_page"); - DEVICE_CONTEXT.with_borrow(|cell| { - if let Some(callback) = cell.wrapped_callbacks.newPage.get() { - callback(dd, dev); - } - cell.hook_new_page(); - }); + let dc = Console::get().device_context(); + if let Some(callback) = dc.wrapped_callbacks.newPage.get() { + callback(dd, dev); + } + dc.hook_new_page(); } unsafe fn ps_graphics_device_impl() -> anyhow::Result { @@ -1388,26 +1248,24 @@ unsafe fn ps_graphics_device_impl() -> anyhow::Result { with_device!(ge_device, |ge_device, device| { (*ge_device).displayListOn = 1; - DEVICE_CONTEXT.with_borrow(|cell| { - let wrapped_callbacks = &cell.wrapped_callbacks; + let wrapped_callbacks = &Console::get().device_context().wrapped_callbacks; - // Safety: The callbacks are stored in simple cells. + // Safety: The callbacks are stored in simple cells. - wrapped_callbacks.activate.replace((*device).activate); - (*device).activate = Some(callback_activate); + wrapped_callbacks.activate.replace((*device).activate); + (*device).activate = Some(callback_activate); - wrapped_callbacks.deactivate.replace((*device).deactivate); - (*device).deactivate = Some(callback_deactivate); + wrapped_callbacks.deactivate.replace((*device).deactivate); + (*device).deactivate = Some(callback_deactivate); - wrapped_callbacks.holdflush.replace((*device).holdflush); - (*device).holdflush = Some(callback_holdflush); + wrapped_callbacks.holdflush.replace((*device).holdflush); + (*device).holdflush = Some(callback_holdflush); - wrapped_callbacks.mode.replace((*device).mode); - (*device).mode = Some(callback_mode); + wrapped_callbacks.mode.replace((*device).mode); + (*device).mode = Some(callback_mode); - wrapped_callbacks.newPage.replace((*device).newPage); - (*device).newPage = Some(callback_new_page); - }); + wrapped_callbacks.newPage.replace((*device).newPage); + (*device).newPage = Some(callback_new_page); }); Ok(R_NilValue) @@ -1450,11 +1308,9 @@ unsafe extern "C-unwind" fn ps_graphics_device() -> anyhow::Result { unsafe extern "C-unwind" fn ps_graphics_before_plot_new(_name: SEXP) -> anyhow::Result { log::trace!("Entering ps_graphics_before_plot_new"); - DEVICE_CONTEXT.with_borrow(|cell| { - // Process changes related to the last plot before opening a new page. - // Particularly important if we make multiple plots in a single chunk. - cell.process_changes(); - }); + // Process changes related to the last plot before opening a new page. + // Particularly important if we make multiple plots in a single chunk. + Console::get().device_context().process_changes(); Ok(harp::r_null()) } @@ -1469,39 +1325,45 @@ unsafe extern "C-unwind" fn ps_graphics_get_metadata(id: SEXP) -> anyhow::Result let id_str: String = RObject::view(id).try_into()?; let plot_id = PlotId(id_str); - DEVICE_CONTEXT.with_borrow(|cell| { - let contexts = cell.plot_contexts.borrow(); - match contexts.get(&plot_id) { - Some(ctx) => { - let info = &ctx.metadata; - let origin_uri = info.origin.as_ref().map(|o| o.uri.as_str()).unwrap_or(""); - - // Create a list with the metadata values - let values: Vec = vec![ - RObject::from(info.name.as_str()), - RObject::from(info.kind.as_str()), - RObject::from(info.execution_id.as_str()), - RObject::from(info.code.as_str()), - RObject::from(origin_uri), - ]; - let list = RObject::try_from(values)?; - - // Set the names attribute - let names: Vec = vec![ - "name".to_string(), - "kind".to_string(), - "execution_id".to_string(), - "code".to_string(), - "origin_uri".to_string(), - ]; - let names = RObject::from(names); - libr::Rf_setAttrib(list.sexp, libr::R_NamesSymbol, names.sexp); - - Ok(list.sexp) - }, - None => Ok(harp::r_null()), - } - }) + let contexts = Console::get().device_context().plot_contexts.borrow(); + match contexts.get(&plot_id) { + Some(ctx) => { + let info = &ctx.metadata; + let origin_uri = info.origin.as_ref().map(|o| o.uri.as_str()).unwrap_or(""); + + // Create a list with the metadata values + let values: Vec = vec![ + RObject::from(info.name.as_str()), + RObject::from(info.kind.as_str()), + RObject::from(info.execution_id.as_str()), + RObject::from(info.code.as_str()), + RObject::from(origin_uri), + ]; + let list = RObject::try_from(values)?; + + // Set the names attribute + let names: Vec = vec![ + "name".to_string(), + "kind".to_string(), + "execution_id".to_string(), + "code".to_string(), + "origin_uri".to_string(), + ]; + let names = RObject::from(names); + libr::Rf_setAttrib(list.sexp, libr::R_NamesSymbol, names.sexp); + + Ok(list.sexp) + }, + None => Ok(harp::r_null()), + } +} + +/// Return the current plot ID. Used by tests to verify that layout panels +/// share the same page (same ID) and that overflow creates a new page. +#[harp::register] +unsafe extern "C-unwind" fn ps_graphics_current_plot_id() -> anyhow::Result { + let id = Console::get().device_context().id(); + Ok(RObject::from(&id).sexp) } /// Push a source file URI onto the source context stack. @@ -1509,7 +1371,7 @@ unsafe extern "C-unwind" fn ps_graphics_get_metadata(id: SEXP) -> anyhow::Result #[harp::register] unsafe extern "C-unwind" fn ps_graphics_push_source_context(uri: SEXP) -> anyhow::Result { let uri_str: String = RObject::view(uri).try_into()?; - DEVICE_CONTEXT.with_borrow(|cell| cell.push_source_context(uri_str)); + Console::get().device_context().push_source_context(uri_str); Ok(harp::r_null()) } @@ -1517,7 +1379,7 @@ unsafe extern "C-unwind" fn ps_graphics_push_source_context(uri: SEXP) -> anyhow /// Called from the `source()` hook when leaving a sourced file. #[harp::register] unsafe extern "C-unwind" fn ps_graphics_pop_source_context() -> anyhow::Result { - DEVICE_CONTEXT.with_borrow(|cell| cell.pop_source_context()); + Console::get().device_context().pop_source_context(); Ok(harp::r_null()) } diff --git a/crates/ark/src/shell.rs b/crates/ark/src/shell.rs index 157f56f96..35ab1d6f7 100644 --- a/crates/ark/src/shell.rs +++ b/crates/ark/src/shell.rs @@ -40,7 +40,6 @@ use harp::ParseResult; use log::*; use serde_json::json; use stdext::unwrap; -use tokio::sync::mpsc::UnboundedSender as AsyncUnboundedSender; use crate::ark_comm::ArkComm; use crate::console::Console; @@ -48,7 +47,7 @@ use crate::console::KernelInfo; use crate::data_explorer::r_data_explorer::DATA_EXPLORER_COMM_NAME; use crate::help::r_help::RHelp; use crate::help_proxy; -use crate::plots::graphics_device::GraphicsDeviceNotification; +use crate::plots::graphics_device::PLOT_COMM_NAME; use crate::r_task; use crate::request::KernelRequest; use crate::request::RRequest; @@ -61,7 +60,6 @@ pub struct Shell { kernel_request_tx: Sender, kernel_init_rx: BusReader, kernel_info: Option, - graphics_device_tx: AsyncUnboundedSender, } #[derive(Debug)] @@ -75,14 +73,12 @@ impl Shell { r_request_tx: Sender, kernel_init_rx: BusReader, kernel_request_tx: Sender, - graphics_device_tx: AsyncUnboundedSender, ) -> Self { Self { r_request_tx, kernel_request_tx, kernel_init_rx, kernel_info: None, - graphics_device_tx, } } @@ -179,11 +175,11 @@ impl ShellHandler for Shell { /// Handles an ExecuteRequest by sending the code to the R execution thread /// for processing. - async fn handle_execute_request( + fn start_execute_request( &mut self, originator: Originator, req: &ExecuteRequest, - ) -> amalthea::Result { + ) -> crossbeam::channel::Receiver> { let (response_tx, response_rx) = unbounded::>(); let mut req_clone = req.clone(); req_clone.code = convert_line_endings(&req_clone.code, LineEnding::Posix); @@ -200,7 +196,7 @@ impl ShellHandler for Shell { trace!("Code sent to R: {}", req_clone.code); - response_rx.recv().unwrap() + response_rx } /// Handles an introspection request @@ -244,12 +240,7 @@ impl ShellHandler for Shell { ) -> amalthea::Result { match target { Comm::Variables => handle_comm_open_variables(comm), - Comm::Ui => handle_comm_open_ui( - comm, - self.kernel_request_tx.clone(), - self.graphics_device_tx.clone(), - data, - ), + Comm::Ui => handle_comm_open_ui(comm, self.kernel_request_tx.clone(), data), Comm::Help => handle_comm_open_help(comm), Comm::Other(target_name) if target_name == "ark" => ArkComm::handle_comm_open(comm), _ => Ok(false), @@ -264,7 +255,7 @@ impl ShellHandler for Shell { originator: Originator, ) -> amalthea::Result { match comm_name { - DATA_EXPLORER_COMM_NAME | UI_COMM_NAME => { + DATA_EXPLORER_COMM_NAME | PLOT_COMM_NAME | UI_COMM_NAME => { self.dispatch_kernel_request(|done_tx| KernelRequest::CommMsg { comm_id: comm_id.to_string(), msg, @@ -283,7 +274,7 @@ impl ShellHandler for Shell { comm_name: &str, ) -> amalthea::Result { match comm_name { - DATA_EXPLORER_COMM_NAME | UI_COMM_NAME => { + DATA_EXPLORER_COMM_NAME | PLOT_COMM_NAME | UI_COMM_NAME => { self.dispatch_kernel_request(|done_tx| KernelRequest::CommClose { comm_id: comm_id.to_string(), done_tx, @@ -323,10 +314,9 @@ fn handle_comm_open_variables(comm: CommSocket) -> amalthea::Result { fn handle_comm_open_ui( comm: CommSocket, kernel_request_tx: Sender, - graphics_device_tx: AsyncUnboundedSender, data: serde_json::Value, ) -> amalthea::Result { - let handler = UiComm::new(graphics_device_tx, data); + let handler = UiComm::new(data); let (done_tx, done_rx) = bounded(0); kernel_request_tx diff --git a/crates/ark/src/start.rs b/crates/ark/src/start.rs index f0e3dfab8..95358b66d 100644 --- a/crates/ark/src/start.rs +++ b/crates/ark/src/start.rs @@ -25,7 +25,6 @@ use crate::console::SessionMode; use crate::control::Control; use crate::dap; use crate::lsp; -use crate::plots::graphics_device::GraphicsDeviceNotification; use crate::repos::DefaultRepos; use crate::request::KernelRequest; use crate::request::RRequest; @@ -79,18 +78,12 @@ pub fn start_kernel( // StdIn socket thread let (stdin_request_tx, stdin_request_rx) = bounded::(1); - // Communication channel between the graphics device (running on the R - // thread) and the shell thread - let (graphics_device_tx, graphics_device_rx) = - tokio::sync::mpsc::unbounded_channel::(); - // Create the shell. let kernel_init_rx = kernel_init_tx.add_rx(); let shell = Box::new(Shell::new( r_request_tx.clone(), kernel_init_rx, kernel_request_tx, - graphics_device_tx, )); // Create the control handler; this is used to handle shutdown/interrupt and @@ -154,7 +147,6 @@ pub fn start_kernel( dap, session_mode, default_repos, - graphics_device_rx, console_notification_rx, ) } diff --git a/crates/ark/src/ui/ui_comm.rs b/crates/ark/src/ui/ui_comm.rs index bffdfa1eb..ecd7c9823 100644 --- a/crates/ark/src/ui/ui_comm.rs +++ b/crates/ark/src/ui/ui_comm.rs @@ -25,7 +25,6 @@ use harp::exec::RFunctionExt; use harp::object::RObject; use serde_json::Value; use stdext::result::ResultExt; -use tokio::sync::mpsc::UnboundedSender as AsyncUnboundedSender; use crate::comm_handler::handle_comm_message; use crate::comm_handler::CommHandler; @@ -34,7 +33,6 @@ use crate::comm_handler::EnvironmentChanged; use crate::console::Console; use crate::console::ConsoleOutputCapture; use crate::modules::ARK_ENVS; -use crate::plots::graphics_device::GraphicsDeviceNotification; pub const UI_COMM_NAME: &str = "positron.ui"; @@ -48,13 +46,12 @@ struct UiCommOpenData { /// Comm handler for the Positron UI comm. #[derive(Debug)] pub struct UiComm { - graphics_device_tx: AsyncUnboundedSender, working_directory: PathBuf, comm_open_data: UiCommOpenData, } impl CommHandler for UiComm { - fn handle_open(&mut self, ctx: &CommHandlerContext) { + fn handle_open(&mut self, ctx: &CommHandlerContext, _console: &Console) { // Set initial console width from the comm_open data, if provided. if let Some(width) = self.comm_open_data.console_width { if let Err(err) = RFunction::from(".ps.rpc.setConsoleWidth") @@ -73,7 +70,7 @@ impl CommHandler for UiComm { self.refresh(&input_prompt, &continuation_prompt, ctx); } - fn handle_msg(&mut self, msg: CommMsg, ctx: &CommHandlerContext) { + fn handle_msg(&mut self, msg: CommMsg, ctx: &CommHandlerContext, _console: &Console) { let this = &*self; handle_comm_message( &ctx.outgoing_tx, @@ -84,7 +81,12 @@ impl CommHandler for UiComm { ); } - fn handle_environment(&mut self, event: &EnvironmentChanged, ctx: &CommHandlerContext) { + fn handle_environment( + &mut self, + event: &EnvironmentChanged, + ctx: &CommHandlerContext, + _console: &Console, + ) { let EnvironmentChanged::Execution { input_prompt, continuation_prompt, @@ -97,10 +99,7 @@ impl CommHandler for UiComm { } impl UiComm { - pub(crate) fn new( - graphics_device_tx: AsyncUnboundedSender, - comm_open_data: Value, - ) -> Self { + pub(crate) fn new(comm_open_data: Value) -> Self { let comm_open_data: UiCommOpenData = serde_json::from_value(comm_open_data).unwrap_or_else(|err| { log::warn!("Failed to deserialize UI comm_open data: {err:?}"); @@ -110,7 +109,6 @@ impl UiComm { }); Self { - graphics_device_tx, working_directory: PathBuf::new(), comm_open_data, } @@ -178,11 +176,9 @@ impl UiComm { )); } - self.graphics_device_tx - .send(GraphicsDeviceNotification::DidChangePlotRenderSettings( - params.settings, - )) - .map_err(|err| anyhow::anyhow!("Failed to send plot render settings: {err}"))?; + Console::get() + .device_context() + .set_prerender_settings(params.settings); Ok(()) } @@ -302,8 +298,7 @@ mod tests { let (comm_event_tx, _) = bounded::(10); let ctx = CommHandlerContext::new(outgoing_tx, comm_event_tx); - let (graphics_device_tx, _) = tokio::sync::mpsc::unbounded_channel(); - let handler = UiComm::new(graphics_device_tx, serde_json::Value::Null); + let handler = UiComm::new(serde_json::Value::Null); (handler, ctx) } @@ -328,7 +323,7 @@ mod tests { })) .unwrap(), }; - handler.handle_msg(msg, &ctx); + handler.handle_msg(msg, &ctx, Console::get()); // Assert that the console width changed let new_width: i32 = harp::get_option("width").try_into().unwrap(); @@ -344,7 +339,7 @@ mod tests { })) .unwrap(), }; - handler.handle_msg(msg, &ctx); + handler.handle_msg(msg, &ctx, Console::get()); old_width }); @@ -390,7 +385,7 @@ mod tests { })) .unwrap(), }; - handler.handle_msg(msg, &ctx); + handler.handle_msg(msg, &ctx, Console::get()); }); let response = iopub_rx.recv_comm_msg(); diff --git a/crates/ark/tests/data_explorer.rs b/crates/ark/tests/data_explorer.rs index 69e761975..7c9c28067 100644 --- a/crates/ark/tests/data_explorer.rs +++ b/crates/ark/tests/data_explorer.rs @@ -70,6 +70,7 @@ use amalthea::socket::iopub::IOPubMessage; use ark::comm_handler::CommHandler; use ark::comm_handler::CommHandlerContext; use ark::comm_handler::EnvironmentChanged; +use ark::console::Console; use ark::data_explorer::format::format_column; use ark::data_explorer::format::format_string; use ark::data_explorer::r_data_explorer::DataExplorerMode; @@ -175,7 +176,7 @@ impl TestSetup { let inner = &self.inner; r_task(|| { let TestInner(handler, ctx) = &mut *inner.lock().unwrap(); - handler.handle_msg(msg, ctx); + handler.handle_msg(msg, ctx, Console::get()); }); let iopub_msg = self.iopub_rx.recv_timeout(RECV_TIMEOUT).unwrap(); @@ -197,6 +198,7 @@ impl TestSetup { continuation_prompt: String::from("+ "), }, ctx, + Console::get(), ); ctx.is_closed() }); @@ -783,7 +785,7 @@ fn expect_column_profile_results( let inner = &setup.inner; r_task(|| { let TestInner(handler, ctx) = &mut *inner.lock().unwrap(); - handler.handle_msg(msg, ctx); + handler.handle_msg(msg, ctx, Console::get()); }); let msg = setup.iopub_rx.recv_comm_msg(); diff --git a/crates/ark/tests/kernel-hooks-session.rs b/crates/ark/tests/kernel-hooks-session.rs index fb288979c..89b73a191 100644 --- a/crates/ark/tests/kernel-hooks-session.rs +++ b/crates/ark/tests/kernel-hooks-session.rs @@ -14,8 +14,6 @@ fn execute(frontend: &DummyArkFrontend, comm_id: &str, code: &str) { frontend.send_execute_request(code, ExecuteRequestOptions::default()); frontend.recv_iopub_busy(); frontend.recv_iopub_execute_input(); - frontend.recv_ui_busy(comm_id, true); - frontend.recv_ui_busy(comm_id, false); frontend.recv_ui_prompt_state(comm_id); frontend.recv_iopub_idle(); frontend.recv_shell_execute_reply(); diff --git a/crates/ark/tests/kernel-notebook-data-explorer.rs b/crates/ark/tests/kernel-notebook-data-explorer.rs index d164c8a2b..60b798ff1 100644 --- a/crates/ark/tests/kernel-notebook-data-explorer.rs +++ b/crates/ark/tests/kernel-notebook-data-explorer.rs @@ -8,29 +8,6 @@ use amalthea::fixtures::dummy_frontend::ExecuteRequestOptions; use ark_test::DummyArkPositronNotebook; -/// Drain the UI comm messages that arrive during execution (busy=true, -/// busy=false, prompt_state). These are CommMsg messages on the UI comm's -/// channel that interleave with the execute result on IOPub. -fn drain_ui_comm_msgs(frontend: &DummyArkPositronNotebook, ui_comm_id: &str) { - // busy=true - let msg = frontend.recv_iopub_comm_msg(); - assert_eq!(msg.comm_id, ui_comm_id); - assert_eq!(msg.data["method"], "busy"); - assert_eq!(msg.data["params"]["busy"], true); - - // busy=false - let msg = frontend.recv_iopub_comm_msg(); - assert_eq!(msg.comm_id, ui_comm_id); - assert_eq!(msg.data["method"], "busy"); - assert_eq!(msg.data["params"]["busy"], false); -} - -fn drain_ui_comm_prompt_state(frontend: &DummyArkPositronNotebook, ui_comm_id: &str) { - let msg = frontend.recv_iopub_comm_msg(); - assert_eq!(msg.comm_id, ui_comm_id); - assert_eq!(msg.data["method"], "prompt_state"); -} - #[test] fn test_notebook_inline_data_explorer() { let frontend = DummyArkPositronNotebook::lock(); @@ -43,8 +20,12 @@ fn test_notebook_inline_data_explorer() { frontend.recv_iopub_busy(); frontend.recv_iopub_execute_input(); - // Drain UI comm busy events - drain_ui_comm_msgs(&frontend, &ui_comm_id); + // The comm_open for the inline data explorer now arrives during execution + // (before the execute result) because comm_open_backend blocks until Shell + // has published it on IOPub. + let comm_open = frontend.recv_iopub_comm_open(); + assert_eq!(comm_open.target_name, "positron.dataExplorer"); + assert_eq!(comm_open.data["inline_only"], true); let result_data = frontend.recv_iopub_execute_result_data(); @@ -63,18 +44,14 @@ fn test_notebook_inline_data_explorer() { assert!(de_data["comm_id"].as_str().is_some()); assert!(de_data["title"].as_str().is_some()); + // The comm_id in the MIME payload must match the comm_open + assert_eq!(comm_open.comm_id, de_data["comm_id"].as_str().unwrap()); + // prompt_state arrives after execute_result - drain_ui_comm_prompt_state(&frontend, &ui_comm_id); + frontend.recv_ui_prompt_state(&ui_comm_id); frontend.recv_iopub_idle(); frontend.recv_shell_execute_reply(); - - // The comm_open for the inline data explorer arrives after Idle - // (it goes through Shell's comm event channel) - let comm_open = frontend.recv_iopub_comm_open(); - assert_eq!(comm_open.target_name, "positron.dataExplorer"); - assert_eq!(comm_open.data["inline_only"], true); - assert_eq!(comm_open.comm_id, de_data["comm_id"].as_str().unwrap()); } #[test] @@ -86,15 +63,13 @@ fn test_notebook_no_inline_data_explorer_for_non_data_frame() { frontend.recv_iopub_busy(); frontend.recv_iopub_execute_input(); - drain_ui_comm_msgs(&frontend, &ui_comm_id); - let result_data = frontend.recv_iopub_execute_result_data(); // Should have text/plain but NOT the data explorer MIME type assert!(result_data.contains_key("text/plain")); assert!(!result_data.contains_key("application/vnd.positron.dataExplorer+json")); - drain_ui_comm_prompt_state(&frontend, &ui_comm_id); + frontend.recv_ui_prompt_state(&ui_comm_id); frontend.recv_iopub_idle(); frontend.recv_shell_execute_reply(); diff --git a/crates/ark/tests/plots.rs b/crates/ark/tests/plots.rs index 0fcc42d2a..bd690a583 100644 --- a/crates/ark/tests/plots.rs +++ b/crates/ark/tests/plots.rs @@ -699,3 +699,481 @@ fn test_plot_default_size_without_metadata() { frontend.recv_iopub_idle(); frontend.recv_shell_execute_reply(); } + +/// Test that `dev.hold()` suppresses intermediate plot output. +/// +/// Without hold, each `plot()` call emits a separate `display_data`. +/// With hold active, intermediate plots are suppressed and only the +/// final state after `dev.flush()` is emitted. +#[test] +fn test_dev_hold_suppresses_intermediate_plots() { + let frontend = DummyArkFrontend::lock(); + + // Hold, draw two intermediate plots, then flush. + // Only the final plot should produce output. + let code = r#" +invisible(dev.hold()) +plot(1:5) +plot(1:3) +invisible(dev.flush()) +"#; + frontend.send_execute_request(code, ExecuteRequestOptions::default()); + frontend.recv_iopub_busy(); + frontend.recv_iopub_execute_input(); + frontend.recv_iopub_display_data(); + frontend.recv_iopub_idle(); + frontend.recv_shell_execute_reply(); +} + +/// Test that `dev.hold()` persists across execute requests. +/// +/// A hold started in one request should suppress output until +/// `dev.flush()` is called in a subsequent request. +#[test] +fn test_dev_hold_across_execute_requests() { + let frontend = DummyArkFrontend::lock(); + + // Hold and plot without flushing. No display_data should appear. + frontend.send_execute_request( + "invisible(dev.hold())\nplot(1:5)", + ExecuteRequestOptions::default(), + ); + frontend.recv_iopub_busy(); + frontend.recv_iopub_execute_input(); + frontend.recv_iopub_idle(); + frontend.recv_shell_execute_reply(); + + // Flush in a separate request. The held plot should now appear. + frontend.send_execute_request("invisible(dev.flush())", ExecuteRequestOptions::default()); + frontend.recv_iopub_busy(); + frontend.recv_iopub_execute_input(); + frontend.recv_iopub_display_data(); + frontend.recv_iopub_idle(); + frontend.recv_shell_execute_reply(); +} + +// Positron-path plot tests (dynamic plots via comm channels) +// +// These tests connect the UI comm to enable the Positron plot path +// (comm-based dynamic plots) instead of the Jupyter protocol path +// (display_data / update_display_data). +// +// In the Positron path: +// - New plots open a "positron.plot" comm (via CommEvent::Opened through +// Shell, arriving on IOPub after idle). +// - Plot updates send a comm_msg directly on IOPub (arriving before idle). +// +// Regression tests for https://github.com/posit-dev/ark/pull/1100 + +/// Positron path: a single plot opens a plot comm. +#[test] +fn test_positron_simple_plot() { + let frontend = DummyArkFrontend::lock(); + frontend.open_ui_comm(); + + frontend.send_execute_request("plot(1:10)", ExecuteRequestOptions::default()); + frontend.recv_iopub_busy(); + frontend.recv_iopub_execute_input(); + let open = frontend.recv_iopub_comm_open(); + assert_eq!(open.target_name, "positron.plot"); + frontend.recv_iopub_ui_prompt_state(); + frontend.recv_iopub_idle(); + frontend.recv_shell_execute_reply(); +} + +/// Positron path: two plots in a single request each open their own comm. +#[test] +fn test_positron_multiple_plots() { + let frontend = DummyArkFrontend::lock(); + frontend.open_ui_comm(); + + frontend.send_execute_request("plot(1:10)\nplot(2:20)", ExecuteRequestOptions::default()); + frontend.recv_iopub_busy(); + frontend.recv_iopub_execute_input(); + let open1 = frontend.recv_iopub_comm_open(); + let open2 = frontend.recv_iopub_comm_open(); + assert_eq!(open1.target_name, "positron.plot"); + assert_eq!(open2.target_name, "positron.plot"); + assert_ne!(open1.comm_id, open2.comm_id); + frontend.recv_iopub_ui_prompt_state(); + frontend.recv_iopub_idle(); + frontend.recv_shell_execute_reply(); +} + +/// Positron path: `par(mfrow)` creates one plot comm with panel updates. +/// +/// The first panel opens a plot comm; the second panel sends an update +/// on the same comm. Plot ID stays the same (no new page). +#[test] +fn test_positron_par_multi_panel() { + let frontend = DummyArkFrontend::lock(); + frontend.open_ui_comm(); + + let code = r#" +par(mfrow = c(2, 1)) +plot(1:10) +id1 <- .ps.internal(current_plot_id()) +plot(2:20) +id2 <- .ps.internal(current_plot_id()) +stopifnot(id1 == id2) +"#; + frontend.send_execute_request(code, ExecuteRequestOptions::default()); + frontend.recv_iopub_busy(); + frontend.recv_iopub_execute_input(); + + let open = frontend.recv_iopub_comm_open(); + assert_eq!(open.target_name, "positron.plot"); + + // Panel update arrives after comm_open (barrier ensures ordering) + let update = frontend.recv_iopub_comm_msg(); + assert_eq!(update.comm_id, open.comm_id); + + frontend.recv_iopub_ui_prompt_state(); + frontend.recv_iopub_idle(); + frontend.recv_shell_execute_reply(); +} + +/// Positron path: `layout()` + multi-plot works like `par(mfrow)`. +/// +/// Same as `test_positron_par_multi_panel` but with `layout()` inside a +/// function call, which exercises a slightly different R code path. +/// +/// Regression: https://github.com/posit-dev/ark/pull/1100#discussion_r2942816670 +#[test] +fn test_positron_layout_multi_plot() { + let frontend = DummyArkFrontend::lock(); + frontend.open_ui_comm(); + + let code = r#" +plt2 = function() { + layout(matrix(1:2, 2)) + plot(1, 1) + id1 <- .ps.internal(current_plot_id()) + plot(1, 1) + id2 <- .ps.internal(current_plot_id()) + stopifnot(id1 == id2) +} +plt2() +"#; + frontend.send_execute_request(code, ExecuteRequestOptions::default()); + frontend.recv_iopub_busy(); + frontend.recv_iopub_execute_input(); + + let open = frontend.recv_iopub_comm_open(); + assert_eq!(open.target_name, "positron.plot"); + + // Second panel update arrives after comm_open (barrier ensures ordering) + let update = frontend.recv_iopub_comm_msg(); + assert_eq!(update.comm_id, open.comm_id); + + frontend.recv_iopub_ui_prompt_state(); + frontend.recv_iopub_idle(); + frontend.recv_shell_execute_reply(); +} + +/// Positron path: `dev.hold()` suppresses intermediate plot output. +/// +/// Only the final state after `dev.flush()` produces a plot comm. +#[test] +fn test_positron_dev_hold_suppresses() { + let frontend = DummyArkFrontend::lock(); + frontend.open_ui_comm(); + + let code = r#" +invisible(dev.hold()) +plot(1:5) +plot(1:3) +invisible(dev.flush()) +"#; + frontend.send_execute_request(code, ExecuteRequestOptions::default()); + frontend.recv_iopub_busy(); + frontend.recv_iopub_execute_input(); + // Only one plot comm for the final state + let open = frontend.recv_iopub_comm_open(); + assert_eq!(open.target_name, "positron.plot"); + + frontend.recv_iopub_ui_prompt_state(); + frontend.recv_iopub_idle(); + frontend.recv_shell_execute_reply(); +} + +/// Positron path: `dev.hold()` persists across execute requests. +/// +/// A hold started in one request should suppress output until +/// `dev.flush()` is called in a subsequent request. +#[test] +fn test_positron_dev_hold_across_requests() { + let frontend = DummyArkFrontend::lock(); + frontend.open_ui_comm(); + + // Hold and plot without flushing. No plot comm should open. + frontend.send_execute_request( + "invisible(dev.hold())\nplot(1:5)", + ExecuteRequestOptions::default(), + ); + frontend.recv_iopub_busy(); + frontend.recv_iopub_execute_input(); + frontend.recv_iopub_ui_prompt_state(); + frontend.recv_iopub_idle(); + frontend.recv_shell_execute_reply(); + + // Flush in a separate request. The held plot should now appear. + frontend.send_execute_request("invisible(dev.flush())", ExecuteRequestOptions::default()); + frontend.recv_iopub_busy(); + frontend.recv_iopub_execute_input(); + let open = frontend.recv_iopub_comm_open(); + assert_eq!(open.target_name, "positron.plot"); + frontend.recv_iopub_ui_prompt_state(); + frontend.recv_iopub_idle(); + frontend.recv_shell_execute_reply(); +} + +/// Positron path: three separate requests each produce a plot comm. +/// +/// Simulates running different packages (e.g. rpart, sf, rpart) one at a +/// time, each producing their own plot. +/// +/// Regression: https://github.com/posit-dev/ark/pull/1100#discussion_r2942842898 +#[test] +fn test_positron_sequential_plots() { + let frontend = DummyArkFrontend::lock(); + frontend.open_ui_comm(); + + for i in 1..=3 { + let code = format!("plot({i}:10)"); + frontend.send_execute_request(&code, ExecuteRequestOptions::default()); + frontend.recv_iopub_busy(); + frontend.recv_iopub_execute_input(); + let open = frontend.recv_iopub_comm_open(); + assert_eq!(open.target_name, "positron.plot"); + frontend.recv_iopub_ui_prompt_state(); + frontend.recv_iopub_idle(); + frontend.recv_shell_execute_reply(); + } +} + +/// Positron path: switching to `png()` and back preserves our plot. +/// +/// The png device is separate from the positron device and should not +/// produce plot comms. +#[test] +fn test_positron_graphics_device_swap() { + let frontend = DummyArkFrontend::lock(); + frontend.open_ui_comm(); + + let code = r#" +plot(1:10) +grDevices::png(tempfile(fileext = ".png")) +plot(1:20) +invisible(dev.off()) +"#; + frontend.send_execute_request(code, ExecuteRequestOptions::default()); + frontend.recv_iopub_busy(); + frontend.recv_iopub_execute_input(); + // Only one plot comm for the first plot (on our device) + let open = frontend.recv_iopub_comm_open(); + assert_eq!(open.target_name, "positron.plot"); + + frontend.recv_iopub_ui_prompt_state(); + frontend.recv_iopub_idle(); + frontend.recv_shell_execute_reply(); +} + +/// Positron path: plotting in a loop produces one comm per iteration. +#[test] +fn test_positron_loop_plots() { + let frontend = DummyArkFrontend::lock(); + frontend.open_ui_comm(); + + let code = r#" +for (i in 1:3) { + plot(i) +} +"#; + frontend.send_execute_request(code, ExecuteRequestOptions::default()); + frontend.recv_iopub_busy(); + frontend.recv_iopub_execute_input(); + let open1 = frontend.recv_iopub_comm_open(); + let open2 = frontend.recv_iopub_comm_open(); + let open3 = frontend.recv_iopub_comm_open(); + assert_eq!(open1.target_name, "positron.plot"); + assert_eq!(open2.target_name, "positron.plot"); + assert_eq!(open3.target_name, "positron.plot"); + frontend.recv_iopub_ui_prompt_state(); + frontend.recv_iopub_idle(); + frontend.recv_shell_execute_reply(); +} + +/// Positron path: `par(mfrow)` with 4 plots in a 3-panel layout. +/// +/// The first 3 plots fill the layout (1 new + 2 updates on the same comm). +/// The 4th plot overflows to a new page, opening a second comm. +#[test] +fn test_positron_par_overflow_to_new_page() { + let frontend = DummyArkFrontend::lock(); + frontend.open_ui_comm(); + + let code = r#" +par(mfrow = c(3, 1)) +plot(1:3) +plot(4:6) +plot(7:9) +plot(10:12) +par(mfrow = c(1, 1)) +"#; + frontend.send_execute_request(code, ExecuteRequestOptions::default()); + frontend.recv_iopub_busy(); + frontend.recv_iopub_execute_input(); + + // First page comm (barrier ensures comm_open arrives before updates) + let open1 = frontend.recv_iopub_comm_open(); + assert_eq!(open1.target_name, "positron.plot"); + + // Panels 2 and 3 update the first page + let update1 = frontend.recv_iopub_comm_msg(); + let update2 = frontend.recv_iopub_comm_msg(); + assert_eq!(update1.comm_id, open1.comm_id); + assert_eq!(update2.comm_id, open1.comm_id); + + // Second page comm (4th plot overflows to a new page) + let open2 = frontend.recv_iopub_comm_open(); + assert_eq!(open2.target_name, "positron.plot"); + assert_ne!(open1.comm_id, open2.comm_id); + + frontend.recv_iopub_ui_prompt_state(); + frontend.recv_iopub_idle(); + frontend.recv_shell_execute_reply(); +} + +/// Positron path: `dev.hold()` / `dev.flush()` run one line at a time. +/// +/// Each line is a separate execute request, simulating interactive use. +#[test] +fn test_positron_dev_hold_flush_interactive() { + let frontend = DummyArkFrontend::lock(); + frontend.open_ui_comm(); + + // Hold + frontend.send_execute_request("invisible(dev.hold())", ExecuteRequestOptions::default()); + frontend.recv_iopub_busy(); + frontend.recv_iopub_execute_input(); + frontend.recv_iopub_ui_prompt_state(); + frontend.recv_iopub_idle(); + frontend.recv_shell_execute_reply(); + + // Draw first plot (held, no comm should open) + frontend.send_execute_request("plot(1:10)", ExecuteRequestOptions::default()); + frontend.recv_iopub_busy(); + frontend.recv_iopub_execute_input(); + frontend.recv_iopub_ui_prompt_state(); + frontend.recv_iopub_idle(); + frontend.recv_shell_execute_reply(); + + // Draw over it (still held) + frontend.send_execute_request("abline(1, 2)", ExecuteRequestOptions::default()); + frontend.recv_iopub_busy(); + frontend.recv_iopub_execute_input(); + frontend.recv_iopub_ui_prompt_state(); + frontend.recv_iopub_idle(); + frontend.recv_shell_execute_reply(); + + // Flush - the combined plot should now appear + frontend.send_execute_request("invisible(dev.flush())", ExecuteRequestOptions::default()); + frontend.recv_iopub_busy(); + frontend.recv_iopub_execute_input(); + let open = frontend.recv_iopub_comm_open(); + assert_eq!(open.target_name, "positron.plot"); + frontend.recv_iopub_ui_prompt_state(); + frontend.recv_iopub_idle(); + frontend.recv_shell_execute_reply(); +} + +/// Positron path: `comm_open` arrives while R is still executing. +/// +/// Uses `readline()` as a synchronisation barrier: R blocks on stdin after +/// plotting, so receiving `comm_open` before the input request proves the +/// comm was published mid-execution (not deferred to the cleanup phase). +#[test] +fn test_positron_plot_comm_open_during_execution() { + let frontend = DummyArkFrontend::lock(); + frontend.open_ui_comm(); + + let options = ExecuteRequestOptions { + allow_stdin: true, + ..Default::default() + }; + + let code = r#" +for (i in 1:3) plot(i) +readline("sync>") +"#; + frontend.send_execute_request(code, options); + frontend.recv_iopub_busy(); + frontend.recv_iopub_execute_input(); + + // All 3 comm_opens arrive while R is still executing (before readline) + let open1 = frontend.recv_iopub_comm_open(); + let open2 = frontend.recv_iopub_comm_open(); + let open3 = frontend.recv_iopub_comm_open(); + assert_eq!(open1.target_name, "positron.plot"); + assert_eq!(open2.target_name, "positron.plot"); + assert_eq!(open3.target_name, "positron.plot"); + + // R is blocked on readline(), proving the comms arrived mid-execution + let prompt = frontend.recv_stdin_input_request(); + assert_eq!(prompt, "sync>"); + + // Unblock R + frontend.send_stdin_input_reply(String::from("")); + + // readline() return value + frontend.recv_iopub_execute_result(); + frontend.recv_iopub_ui_prompt_state(); + frontend.recv_iopub_idle(); + frontend.recv_shell_execute_reply(); +} + +/// Positron path: `layout()` panels share the same plot ID, overflow gets a new one. +/// +/// Verifies that the second `plot()` call within a 2-panel layout doesn't +/// trigger a new page (plot ID stays the same), while a third `plot()` that +/// overflows the layout creates a new page (new plot ID). +#[test] +fn test_positron_layout_plot_id_stability() { + let frontend = DummyArkFrontend::lock(); + frontend.open_ui_comm(); + + let code = r#" +layout(matrix(1:2, 2)) +plot(1, 1) +id1 <- .ps.internal(current_plot_id()) +plot(1, 1) +id2 <- .ps.internal(current_plot_id()) +plot(1, 1) +id3 <- .ps.internal(current_plot_id()) +stopifnot( + id1 == id2, + id2 != id3 +) +"#; + frontend.send_execute_request(code, ExecuteRequestOptions::default()); + frontend.recv_iopub_busy(); + frontend.recv_iopub_execute_input(); + + // First panel opens a plot comm + let open1 = frontend.recv_iopub_comm_open(); + assert_eq!(open1.target_name, "positron.plot"); + + // Second panel updates the same comm (no new page) + let update = frontend.recv_iopub_comm_msg(); + assert_eq!(update.comm_id, open1.comm_id); + + // Third plot overflows to a new page + let open2 = frontend.recv_iopub_comm_open(); + assert_eq!(open2.target_name, "positron.plot"); + assert_ne!(open1.comm_id, open2.comm_id); + + frontend.recv_iopub_ui_prompt_state(); + frontend.recv_iopub_idle(); + frontend.recv_shell_execute_reply(); +} diff --git a/crates/ark/tests/ui-prompt-state.rs b/crates/ark/tests/ui-prompt-state.rs index fbf65a616..ab06e038d 100644 --- a/crates/ark/tests/ui-prompt-state.rs +++ b/crates/ark/tests/ui-prompt-state.rs @@ -14,6 +14,7 @@ use ark_test::DummyArkFrontend; fn test_prompt_state_after_execution() { let frontend = DummyArkFrontend::lock(); let comm_id = frontend.open_ui_comm(); + frontend.set_ignore_ui_busy(false); frontend.send_execute_request("1 + 1", ExecuteRequestOptions::default()); frontend.recv_iopub_busy(); @@ -34,6 +35,7 @@ fn test_prompt_state_after_execution() { fn test_prompt_state_custom_prompt() { let frontend = DummyArkFrontend::lock(); let comm_id = frontend.open_ui_comm(); + frontend.set_ignore_ui_busy(false); // Change the prompt frontend.send_execute_request( @@ -67,6 +69,7 @@ fn test_prompt_state_custom_prompt() { fn test_prompt_state_browser() { let frontend = DummyArkFrontend::lock(); let comm_id = frontend.open_ui_comm(); + frontend.set_ignore_ui_busy(false); // Enter the browser. The busy sequence differs from normal execution: // R briefly goes idle entering the browser's ReadConsole, then busy @@ -102,6 +105,7 @@ fn test_prompt_state_browser() { fn test_prompt_state_custom_continuation() { let frontend = DummyArkFrontend::lock(); let comm_id = frontend.open_ui_comm(); + frontend.set_ignore_ui_busy(false); frontend.send_execute_request( "options(continue = '... ')", diff --git a/crates/ark_test/src/dummy_frontend.rs b/crates/ark_test/src/dummy_frontend.rs index 0f7d201dc..884294ddf 100644 --- a/crates/ark_test/src/dummy_frontend.rs +++ b/crates/ark_test/src/dummy_frontend.rs @@ -71,6 +71,15 @@ pub struct DummyArkFrontend { streams_handled: Cell, /// Whether we're currently in a debug context (between start_debug and stop_debug) in_debug: Cell, + /// Comm ID of the open UI comm, if any. Set by `open_ui_comm()` so + /// that `recv_iopub_ui_busy()` and `recv_iopub_prompt_state()` can + /// identify UI comm messages. + ui_comm_id: RefCell>, + /// When true (the default), UI comm `busy` events are silently + /// skipped by `recv_iopub_next()`. R fires busy(true)/busy(false) + /// per expression in multi-line requests, producing a variable + /// number of events that most tests don't care about. + ignore_ui_busy: Cell, /// Comm ID of the open variables comm, if any. variables_comm_id: RefCell>, /// Buffered variables comm events, auto-collected by `recv_iopub_next()`. @@ -155,6 +164,8 @@ impl DummyArkFrontend { pending_iopub_messages: RefCell::new(VecDeque::new()), streams_handled: Cell::new(false), in_debug: Cell::new(false), + ui_comm_id: RefCell::new(None), + ignore_ui_busy: Cell::new(true), variables_comm_id: RefCell::new(None), variables_events: RefCell::new(VecDeque::new()), } @@ -235,7 +246,7 @@ impl DummyArkFrontend { } } - /// Try to buffer a known message (stream or variables comm). + /// Try to buffer a known message (stream, UI comm refresh, or variables comm). /// Traces the message if it was buffered. Returns `true` if the message was consumed. /// /// Variables comm events still race with Idle (not yet migrated to @@ -247,6 +258,14 @@ impl DummyArkFrontend { self.buffer_stream(&data.content); true }, + Message::CommMsg(ref data) + if self.ignore_ui_busy.get() && + self.is_ui_comm(&data.content.comm_id) && + data.content.data.get("method").and_then(|v| v.as_str()) == Some("busy") => + { + trace_iopub_msg(msg); + true + }, Message::CommMsg(ref data) if self.is_variables_comm(&data.content.comm_id) => { trace_iopub_msg(msg); self.buffer_variables_event(&data.content.data); @@ -256,6 +275,56 @@ impl DummyArkFrontend { } } + fn is_ui_comm(&self, comm_id: &str) -> bool { + self.ui_comm_id + .borrow() + .as_deref() + .is_some_and(|id| id == comm_id) + } + + /// Ignore UI comm `busy` events in `recv_iopub_next()`. + /// + /// R fires `busy(true)`/`busy(false)` for each top-level expression + /// in a multi-line request, so the number of events depends on how + /// many expressions the code contains. Enable this in tests that + /// don't care about busy transitions (e.g. plot tests). + pub fn set_ignore_ui_busy(&self, ignore: bool) { + self.ignore_ui_busy.set(ignore); + } + + /// Receive from IOPub and assert a UI comm `busy` event. + /// Automatically skips any Stream messages. + #[track_caller] + pub fn recv_iopub_ui_busy(&self, expected: bool) { + let msg = self.recv_iopub_next(); + match msg { + Message::CommMsg(data) if self.is_ui_comm(&data.content.comm_id) => { + assert_eq!( + data.content.data.get("method").and_then(|v| v.as_str()), + Some("busy") + ); + assert_eq!(data.content.data["params"]["busy"], expected); + }, + other => panic!("Expected UI busy={expected} CommMsg, got {other:?}"), + } + } + + /// Receive from IOPub and assert a UI comm `prompt_state` event. + /// Automatically skips any Stream messages. + #[track_caller] + pub fn recv_iopub_ui_prompt_state(&self) { + let msg = self.recv_iopub_next(); + match msg { + Message::CommMsg(data) if self.is_ui_comm(&data.content.comm_id) => { + assert_eq!( + data.content.data.get("method").and_then(|v| v.as_str()), + Some("prompt_state") + ); + }, + other => panic!("Expected prompt_state CommMsg on UI comm, got {other:?}"), + } + } + fn is_variables_comm(&self, comm_id: &str) -> bool { self.variables_comm_id .borrow() @@ -890,11 +959,9 @@ impl DummyArkFrontend { ); self.recv_iopub_busy(); self.recv_iopub_execute_input(); - self.recv_iopub_idle(); - self.recv_shell_execute_reply(); - // CommOpen goes through Shell's comm event channel, so it arrives - // after Idle. + // Shell drains comm events during execution, so CommOpen arrives + // within the Busy/Idle window. let comm_open = self.recv_iopub_comm_open(); assert_eq!( comm_open.target_name, "positron.dataExplorer", @@ -902,6 +969,9 @@ impl DummyArkFrontend { comm_open.target_name ); + self.recv_iopub_idle(); + self.recv_shell_execute_reply(); + comm_open.comm_id } @@ -1094,10 +1164,12 @@ impl DummyArkFrontend { // The UI comm runs on the R thread via CommHandler. The comm_open // blocks Shell while the handler's `handle_open()` runs, so events // arrive deterministically within the Busy/Idle window. + // + // `handle_open()` calls `refresh()` which sends prompt_state then + // working_directory. Receive them explicitly here, before enabling + // auto-buffering for subsequent UI comm events. self.recv_iopub_busy(); - // `handle_open()` calls `refresh()` which sends prompt_state then - // working_directory. let prompt_state = self.recv_iopub_comm_msg(); assert_eq!(prompt_state.comm_id, comm_id); assert_eq!( @@ -1114,6 +1186,13 @@ impl DummyArkFrontend { self.recv_iopub_idle(); + // Store the UI comm ID so `recv_iopub_ui_busy()` and + // `recv_iopub_prompt_state()` can identify UI comm messages. + // `ignore_ui_busy` is on by default, so UI busy events are + // auto-skipped. Tests that care (e.g. `ui-prompt-state`) can + // call `set_ignore_ui_busy(false)` to receive them inline. + *self.ui_comm_id.borrow_mut() = Some(comm_id.clone()); + comm_id } diff --git a/crates/echo/src/shell.rs b/crates/echo/src/shell.rs index f62a6ecd5..6fe3f0197 100644 --- a/crates/echo/src/shell.rs +++ b/crates/echo/src/shell.rs @@ -113,11 +113,59 @@ impl ShellHandler for Shell { } /// Handles an ExecuteRequest; "executes" the code by echoing it. - async fn handle_execute_request( + fn start_execute_request( &mut self, _originator: Originator, req: &ExecuteRequest, - ) -> amalthea::Result { + ) -> crossbeam::channel::Receiver> { + let (tx, rx) = crossbeam::channel::bounded(1); + let result = self.execute(req); + tx.send(result).unwrap(); + rx + } + + /// Handles an introspection request + async fn handle_inspect_request(&self, req: &InspectRequest) -> amalthea::Result { + let data = match req.code.as_str() { + "err" => { + json!({"text/plain": "This generates an error!"}) + }, + "teapot" => { + json!({"text/plain": "This is clearly a teapot."}) + }, + _ => serde_json::Value::Null, + }; + Ok(InspectReply { + status: Status::Ok, + found: data != serde_json::Value::Null, + data, + metadata: json!({}), + }) + } + + async fn handle_history_request( + &self, + _req: &HistoryRequest, + ) -> amalthea::Result { + Ok(HistoryReply { + status: Status::Ok, + history: vec![], + }) + } + + async fn handle_comm_open( + &self, + _target: Comm, + _comm: CommSocket, + _data: serde_json::Value, + ) -> amalthea::Result { + // No comms in this toy implementation. + Ok(false) + } +} + +impl Shell { + fn execute(&mut self, req: &ExecuteRequest) -> amalthea::Result { // Increment counter if we are storing this execution in history if req.store_history { self.execution_count += 1; @@ -185,43 +233,4 @@ impl ShellHandler for Shell { user_expressions: serde_json::Value::Null, }) } - - /// Handles an introspection request - async fn handle_inspect_request(&self, req: &InspectRequest) -> amalthea::Result { - let data = match req.code.as_str() { - "err" => { - json!({"text/plain": "This generates an error!"}) - }, - "teapot" => { - json!({"text/plain": "This is clearly a teapot."}) - }, - _ => serde_json::Value::Null, - }; - Ok(InspectReply { - status: Status::Ok, - found: data != serde_json::Value::Null, - data, - metadata: json!({}), - }) - } - - async fn handle_history_request( - &self, - _req: &HistoryRequest, - ) -> amalthea::Result { - Ok(HistoryReply { - status: Status::Ok, - history: vec![], - }) - } - - async fn handle_comm_open( - &self, - _target: Comm, - _comm: CommSocket, - _data: serde_json::Value, - ) -> amalthea::Result { - // No comms in this toy implementation. - Ok(false) - } }