diff --git a/Cargo.lock b/Cargo.lock index 5b2338f65..12c43d507 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -353,6 +353,7 @@ dependencies = [ "amalthea", "anyhow", "ark", + "ark_macros", "ark_test", "assert_matches", "async-trait", @@ -413,6 +414,14 @@ dependencies = [ "yaml-rust", ] +[[package]] +name = "ark_macros" +version = "0.1.0" +dependencies = [ + "quote", + "syn 2.0.111", +] + [[package]] name = "ark_test" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 73df7d108..5ace3f9f2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,7 @@ resolver = "2" members = [ "crates/amalthea", "crates/ark", + "crates/ark/ark_macros", "crates/echo", "crates/harp", "crates/libr", @@ -38,6 +39,7 @@ aether_syntax = { git = "https://github.com/posit-dev/air", package = "air_r_syn amalthea = { path = "crates/amalthea" } anyhow = "1.0.100" ark = { path = "crates/ark" } +ark_macros = { path = "crates/ark/ark_macros" } ark_test = { path = "crates/ark_test" } assert_matches = "1.5.0" async-trait = "0.1.66" diff --git a/crates/amalthea/src/comm/event.rs b/crates/amalthea/src/comm/event.rs index 69a07532d..3254d4e9f 100644 --- a/crates/amalthea/src/comm/event.rs +++ b/crates/amalthea/src/comm/event.rs @@ -13,8 +13,12 @@ use crate::socket::comm::CommSocket; /// Comm events sent to the frontend via Shell. pub enum CommEvent { - /// A new Comm was opened - Opened(CommSocket, Value), + /// A new Comm was opened. The optional `Sender` is a synchronisation + /// barrier: if provided, Shell signals it after processing the open + /// (sending `comm_open` on IOPub). The caller blocks on the paired + /// receiver to guarantee that the `comm_open` message has been sent + /// before any subsequent messages. + Opened(CommSocket, Value, Option>), /// A message was received on a Comm; the first value is the comm ID, and the /// second value is the message. @@ -22,10 +26,4 @@ pub enum CommEvent { /// A Comm was closed Closed(String), - - /// Synchronisation barrier. Shell signals the sender after processing all - /// preceding events. The caller blocks on the paired receiver to guarantee - /// that earlier events (e.g. `Opened`) have been fully handled before - /// continuing. - Barrier(Sender<()>), } diff --git a/crates/amalthea/src/language/shell_handler.rs b/crates/amalthea/src/language/shell_handler.rs index 2e7eb9d90..e54f06e9a 100644 --- a/crates/amalthea/src/language/shell_handler.rs +++ b/crates/amalthea/src/language/shell_handler.rs @@ -26,10 +26,10 @@ use crate::wire::kernel_info_request::KernelInfoRequest; use crate::wire::originator::Originator; /// Result of a `handle_comm_msg` or `handle_comm_close` call on the -/// `ShellHandler`. `Handled` means the kernel processed the message -/// synchronously (blocking Shell until done). `NotHandled` means amalthea -/// should fall back to the historical `incoming_tx` path. This fallback is -/// temporary until all comms are migrated to the blocking path. +/// `ShellHandler`. `Handled` means the kernel dispatched the message +/// (possibly asynchronously via a completion receiver). `NotHandled` means +/// amalthea should fall back to the historical `incoming_tx` path. This +/// fallback is temporary until all comms are migrated to the new path. pub enum CommHandled { Handled, NotHandled, @@ -81,26 +81,30 @@ pub trait ShellHandler: Send { /// Docs: https://jupyter-client.readthedocs.io/en/stable/messaging.html#history async fn handle_history_request(&self, req: &HistoryRequest) -> crate::Result; - /// Handles a request to open a comm. + /// Handle a request to open a comm. /// - /// https://jupyter-client.readthedocs.io/en/stable/messaging.html#opening-a-comm + /// Returns `(true, Some(receiver))` if the comm was opened and the handler + /// was dispatched asynchronously. Shell will select-loop on the receiver + /// and `comm_event_rx` to drain comm events while the handler runs. /// - /// Returns true if the handler handled the request (and opened the comm), false if it did not. - /// - /// * `target` - The target name of the comm, such as `positron.variables` - /// * `comm` - The comm channel to use to communicate with the frontend - /// * `data` - The `data` payload from the `comm_open` message - async fn handle_comm_open( - &self, + /// Returns `(true, None)` if the comm was opened synchronously. + /// Returns `(false, None)` if the comm was not handled. + fn handle_comm_open( + &mut self, target: Comm, comm: CommSocket, data: serde_json::Value, - ) -> crate::Result; + ) -> crate::Result<(bool, Option>)>; - /// Handle an incoming comm message (RPC or data). Return - /// `CommHandled::Handled` if the message was processed, or - /// `CommHandled::NotHandled` to fall back to the existing - /// `incoming_tx` path. + /// Handle an incoming comm message (RPC or data). + /// + /// Returns `(CommHandled::Handled, Some(receiver))` if the message was + /// dispatched to the R thread. Shell will select-loop on the receiver + /// and `comm_event_rx` to drain comm events (e.g. barriers from + /// `comm_open_backend`) while the handler runs. + /// + /// Returns `(CommHandled::NotHandled, None)` to fall back to the + /// existing `incoming_tx` path. /// /// * `comm_id` - The comm's unique identifier /// * `comm_name` - The comm's target name (e.g. `"positron.dataExplorer"`) @@ -113,13 +117,14 @@ pub trait ShellHandler: Send { _comm_name: &str, _msg: CommMsg, _originator: Originator, - ) -> crate::Result { - Ok(CommHandled::NotHandled) + ) -> crate::Result<(CommHandled, Option>)> { + Ok((CommHandled::NotHandled, None)) } - /// Handle a comm close. Return `CommHandled::Handled` if the close - /// was processed, or `CommHandled::NotHandled` to fall back to the - /// existing `incoming_tx` path. + /// Handle a comm close. + /// + /// Same pattern as `handle_comm_msg`: returns a completion receiver + /// so Shell can drain comm events while the handler runs. /// /// * `comm_id` - The comm's unique identifier /// * `comm_name` - The comm's target name @@ -127,7 +132,7 @@ pub trait ShellHandler: Send { &mut self, _comm_id: &str, _comm_name: &str, - ) -> crate::Result { - Ok(CommHandled::NotHandled) + ) -> crate::Result<(CommHandled, Option>)> { + Ok((CommHandled::NotHandled, None)) } } diff --git a/crates/amalthea/src/socket/shell.rs b/crates/amalthea/src/socket/shell.rs index 660473797..73f8fe1d4 100644 --- a/crates/amalthea/src/socket/shell.rs +++ b/crates/amalthea/src/socket/shell.rs @@ -185,7 +185,7 @@ impl Shell { /// Process a comm lifecycle event from `comm_event_rx`. fn process_comm_event(&mut self, event: CommEvent) { match event { - CommEvent::Opened(comm_socket, data) => { + CommEvent::Opened(comm_socket, data, done_tx) => { // For backend-initiated comms, notify the frontend via IOPub if comm_socket.initiator == CommInitiator::BackEnd { self.iopub_tx @@ -202,16 +202,16 @@ impl Shell { // Add the comm to our list of open comms self.open_comms.push(comm_socket); + if let Some(done_tx) = done_tx { + done_tx.send(()).log_err(); + } + log::info!( "Comm channel opened (backend); there are now {} open comms", self.open_comms.len() ); }, - CommEvent::Barrier(done_tx) => { - done_tx.send(()).log_err(); - }, - CommEvent::Message(comm_id, msg) => { let Some(comm) = self.open_comms.iter().find(|c| c.comm_id == comm_id) else { log::warn!("Received message for unknown comm channel {comm_id}: {msg:?}"); @@ -256,6 +256,18 @@ impl Shell { return self.handle_execute_request(req); } + // Comm messages and closes need the same select-loop treatment as + // execute requests to drain comm events while the handler runs. + if let Message::CommMsg(req) = msg { + return self.handle_comm_msg_request(req); + } + if let Message::CommClose(req) = msg { + return self.handle_comm_close_request(req); + } + if let Message::CommOpen(req) = msg { + return self.handle_comm_open_request(req); + } + // Extract references to the components we need to pass to handlers. // This allows us to borrow different fields of self independently. let iopub_tx = &self.iopub_tx; @@ -281,32 +293,6 @@ impl Shell { Self::handle_comm_info_request(open_comms, msg) }) }, - Message::CommOpen(req) => { - let open_comms = &mut self.open_comms; - let server_handlers = &self.server_handlers; - Self::handle_notification(iopub_tx, req, |msg| { - Self::handle_comm_open( - iopub_tx, - shell_handler, - server_handlers, - open_comms, - msg, - ) - }) - }, - Message::CommMsg(req) => { - let open_comms = &self.open_comms; - let originator = Originator::from(&req); - Self::handle_notification(iopub_tx, req, |msg| { - Self::handle_comm_msg(shell_handler, open_comms, originator, msg) - }) - }, - Message::CommClose(req) => { - let open_comms = &mut self.open_comms; - Self::handle_notification(iopub_tx, req, |msg| { - Self::handle_comm_close(shell_handler, open_comms, msg) - }) - }, Message::HistoryRequest(req) => Self::handle_request(iopub_tx, socket, req, |msg| { block_on(shell_handler.handle_history_request(msg)) }), @@ -375,9 +361,10 @@ impl Shell { /// Handle an execute request. Unlike other requests that use the generic /// `handle_request`, this method select-loops on both the execute response /// and `comm_event_rx`. This allows Shell to process comm events (e.g. - /// `CommEvent::Barrier` from `comm_open_backend`) while the R thread is - /// still executing, preventing a deadlock where the R thread waits for - /// Shell to drain comm events while Shell waits for the execute response. + /// the barrier in `CommEvent::Opened` from `comm_open_backend`) while the + /// R thread is still executing, preventing a deadlock where the R thread + /// waits for Shell to drain comm events while Shell waits for the execute + /// response. fn handle_execute_request(&mut self, req: JupyterMessage) -> crate::Result<()> { self.iopub_tx .send(status(req.clone(), ExecutionState::Busy)) @@ -391,22 +378,7 @@ impl Shell { .shell_handler .start_execute_request(originator, &req.content); - // Select-loop: drain comm events while waiting for the execute reply. - let result = loop { - let mut sel = Select::new(); - let resp_idx = sel.recv(&response_rx); - sel.recv(&self.comm_event_rx); - - let ready = sel.ready(); - - while let Ok(event) = self.comm_event_rx.try_recv() { - self.process_comm_event(event); - } - - if ready == resp_idx { - break response_rx.recv().unwrap(); - } - }; + let result = self.drain_comm_events_until(&response_rx); let result = match result { Ok(reply) => req.send_reply(reply, &self.socket), @@ -429,34 +401,99 @@ impl Shell { result.and(Ok(())) } - fn handle_notification( - iopub_tx: &Sender, - not: JupyterMessage, - handler: Handler, - ) -> crate::Result<()> - where - Not: ProtocolMessage, - Handler: FnOnce(&Not) -> crate::Result<()>, - { - // Enter the kernel-busy state in preparation for handling the message - iopub_tx - .send(status(not.clone(), ExecutionState::Busy)) + fn handle_comm_msg_request(&mut self, req: JupyterMessage) -> crate::Result<()> { + self.handle_comm_notification(req, |this, req| { + let originator = Originator::from(req); + Self::handle_comm_msg( + &mut this.shell_handler, + &this.open_comms, + originator, + &req.content, + ) + }) + } + + fn handle_comm_close_request(&mut self, req: JupyterMessage) -> crate::Result<()> { + self.handle_comm_notification(req, |this, req| { + Self::handle_comm_close(&mut this.shell_handler, &mut this.open_comms, &req.content) + }) + } + + fn handle_comm_open_request(&mut self, req: JupyterMessage) -> crate::Result<()> { + self.handle_comm_notification(req, |this, req| { + Self::handle_comm_open( + &this.iopub_tx, + &mut this.shell_handler, + &this.server_handlers, + &mut this.open_comms, + &req.content, + ) + }) + } + + /// Wrap a comm handler in busy/idle status and drain comm events while + /// the handler runs. The handler returns a result and an optional + /// completion receiver; if present, Shell select-loops on it to process + /// comm events (e.g. barriers from `comm_open_backend`). + fn handle_comm_notification( + &mut self, + req: JupyterMessage, + handler: impl FnOnce(&mut Self, &JupyterMessage) -> (crate::Result<()>, Option>), + ) -> crate::Result<()> { + self.iopub_tx + .send(status(req.clone(), ExecutionState::Busy)) .unwrap(); - log::info!("Received shell notification: {not:?}"); + log::info!("Received shell notification: {req:?}"); - // Handle the message - let result = handler(¬.content); + let (result, done_rx) = handler(self, &req); - // Return to idle - iopub_tx - .send(status(not.clone(), ExecutionState::Idle)) + if let Some(done_rx) = done_rx { + self.drain_comm_events_until(&done_rx); + } + + self.iopub_tx + .send(status(req.clone(), ExecutionState::Idle)) .unwrap(); result } - /// Handle a request for open comms + /// Drain comm events while waiting for a value on `rx`. + /// Used by execute requests, comm_msg, comm_close, and comm_open to + /// process comm events (e.g. barriers from `comm_open_backend`) while + /// the R thread is working. + fn drain_comm_events_until(&mut self, rx: &Receiver) -> T { + loop { + let mut sel = Select::new(); + let rx_idx = sel.recv(rx); + sel.recv(&self.comm_event_rx); + + let ready = sel.ready(); + + // Always drain pending comm events, regardless of which + // channel was ready. + while let Ok(event) = self.comm_event_rx.try_recv() { + self.process_comm_event(event); + } + + // `Select::ready()` can return spuriously, so we must use + // `try_recv()` instead of `recv()` to avoid blocking when + // the channel isn't actually ready. Blocking here would + // prevent us from draining comm events, causing a deadlock + // when the R thread sends a barrier via `comm_open_backend`. + if ready == rx_idx { + match rx.try_recv() { + Ok(value) => return value, + Err(crossbeam::channel::TryRecvError::Empty) => continue, + Err(crossbeam::channel::TryRecvError::Disconnected) => { + panic!("Completion channel disconnected in drain_comm_events_until"); + }, + } + } + } + } + fn handle_comm_info_request( open_comms: &[CommSocket], req: &CommInfoRequest, @@ -505,15 +542,16 @@ impl Shell { server_handlers: &HashMap>>, open_comms: &mut Vec, msg: &CommOpen, - ) -> crate::Result<()> { + ) -> (crate::Result<()>, Option>) { log::info!("Received request to open comm: {msg:?}"); // Process the comm open request - let result = Self::open_comm(iopub_tx, shell_handler, server_handlers, open_comms, msg); + let (result, done_rx) = + Self::open_comm(iopub_tx, shell_handler, server_handlers, open_comms, msg); // There is no error reply for a comm open request. Instead we must send // a `comm_close` message as soon as possible. The error is logged on our side. - if let Err(err) = result { + if let Err(ref err) = result { iopub_tx .send(IOPubMessage::CommOutgoing( msg.comm_id.clone(), @@ -523,7 +561,7 @@ impl Shell { log::warn!("Failed to open comm: {err:?}"); } - Ok(()) + (Ok(()), done_rx) } /// Deliver a request from the frontend to a comm. Specifically, this is a @@ -534,7 +572,7 @@ impl Shell { open_comms: &[CommSocket], originator: Originator, msg: &CommWireMsg, - ) -> crate::Result<()> { + ) -> (crate::Result<()>, Option>) { // The presence of an `id` field means this is a request, not a notification // https://github.com/posit-dev/positron/issues/7448 let comm_msg = if msg.data.get("id").is_some() { @@ -558,7 +596,7 @@ impl Shell { "Received message for unknown comm channel {}: {comm_msg:?}", msg.comm_id ); - return Ok(()); + return (Ok(()), None); }; // Try to dispatch the message to the new handler API @@ -567,15 +605,16 @@ impl Shell { &comm.comm_name, comm_msg.clone(), originator, - )? { - CommHandled::Handled => Ok(()), - CommHandled::NotHandled => { + ) { + Ok((CommHandled::Handled, done_rx)) => (Ok(()), done_rx), + Ok((CommHandled::NotHandled, _)) => { // Fall back to old approach for compatibility while we migrate comms log::trace!("Sending message to comm '{}'", comm.comm_name); comm.incoming_tx.send(comm_msg).log_err(); - Ok(()) + (Ok(()), None) }, + Err(err) => (Err(err), None), } } @@ -590,7 +629,7 @@ impl Shell { server_handlers: &HashMap>>, open_comms: &mut Vec, msg: &CommOpen, - ) -> crate::Result<()> { + ) -> (crate::Result<()>, Option>) { // Check to see whether the target name begins with "positron." This // prefix designates comm IDs that are known to the Positron IDE. let comm = match msg.target_name.starts_with("positron.") { @@ -607,7 +646,7 @@ impl Shell { &msg.target_name, err ); - return Err(Error::UnknownCommName(msg.target_name.clone())); + return (Err(Error::UnknownCommName(msg.target_name.clone())), None); }, }, @@ -638,6 +677,7 @@ impl Shell { // internal ID or a reference to the IOPub channel. let mut lsp_comm = false; + let mut done_rx: Option> = None; let opened = match comm { Comm::Lsp => { @@ -653,40 +693,53 @@ impl Shell { }; let handler = server_handlers.get(target_key).cloned(); - server_started_rx = Some(Self::start_server_comm(msg, handler, &comm_socket)?); + match Self::start_server_comm(msg, handler, &comm_socket) { + Ok(rx) => server_started_rx = Some(rx), + Err(err) => return (Err(err), None), + }; true }, Comm::Other(_) => { // This might be a server comm or a regular comm if let Some(handler) = server_handlers.get(&msg.target_name).cloned() { - server_started_rx = - Some(Self::start_server_comm(msg, Some(handler), &comm_socket)?); + match Self::start_server_comm(msg, Some(handler), &comm_socket) { + Ok(rx) => server_started_rx = Some(rx), + Err(err) => return (Err(err), None), + }; true } else { // No server handler found, pass through to shell handler - block_on(shell_handler.handle_comm_open( + let (opened, rx) = match shell_handler.handle_comm_open( comm, comm_socket.clone(), msg.data.clone(), - ))? + ) { + Ok(val) => val, + Err(err) => return (Err(err), None), + }; + done_rx = rx; + opened } }, // All comms tied to known Positron clients are passed through to the shell handler _ => { - // Call the shell handler to open the comm - block_on(shell_handler.handle_comm_open( + let (opened, rx) = match shell_handler.handle_comm_open( comm, comm_socket.clone(), msg.data.clone(), - ))? + ) { + Ok(val) => val, + Err(err) => return (Err(err), None), + }; + done_rx = rx; + opened }, }; if !opened { - // Fail if the comm was not opened - return Err(Error::UnknownCommName(comm_name.clone())); + return (Err(Error::UnknownCommName(comm_name.clone())), None); } // Add the comm to our list of open comms @@ -724,11 +777,11 @@ impl Shell { if let Err(err) = result { let msg = format!("With comm '{comm_name}': {err}"); log::error!("{msg}"); - return Err(Error::SendError(msg)); + return (Err(Error::SendError(msg)), None); } } - Ok(()) + (Ok(()), done_rx) } fn start_server_comm( @@ -769,23 +822,26 @@ impl Shell { shell_handler: &mut Box, open_comms: &mut Vec, msg: &CommClose, - ) -> crate::Result<()> { + ) -> (crate::Result<()>, Option>) { let Some(idx) = open_comms.iter().position(|c| c.comm_id == msg.comm_id) else { log::warn!( "Received close message for unknown comm channel {}", msg.comm_id ); - return Ok(()); + return (Ok(()), None); }; // Try to dispatch the message to the new handler API. // Fall back to notifying via `incoming_tx` for comms not yet migrated. - match shell_handler.handle_comm_close(&msg.comm_id, &open_comms[idx].comm_name)? { - CommHandled::Handled => {}, - CommHandled::NotHandled => { - open_comms[idx].incoming_tx.send(CommMsg::Close).log_err(); - }, - } + let done_rx = + match shell_handler.handle_comm_close(&msg.comm_id, &open_comms[idx].comm_name) { + Ok((CommHandled::Handled, done_rx)) => done_rx, + Ok((CommHandled::NotHandled, _)) => { + open_comms[idx].incoming_tx.send(CommMsg::Close).log_err(); + None + }, + Err(err) => return (Err(err), None), + }; open_comms.remove(idx); log::info!( @@ -793,7 +849,7 @@ impl Shell { open_comms.len() ); - Ok(()) + (Ok(()), done_rx) } } diff --git a/crates/amalthea/tests/client/main.rs b/crates/amalthea/tests/client/main.rs index 149d042a0..0cafeb95c 100644 --- a/crates/amalthea/tests/client/main.rs +++ b/crates/amalthea/tests/client/main.rs @@ -281,6 +281,7 @@ fn test_amalthea_comm_open_from_kernel() { .send(CommEvent::Opened( test_comm.clone(), serde_json::Value::Null, + None, )) .unwrap(); diff --git a/crates/amalthea/tests/client/shell.rs b/crates/amalthea/tests/client/shell.rs index a3e019fb1..43a6a3928 100644 --- a/crates/amalthea/tests/client/shell.rs +++ b/crates/amalthea/tests/client/shell.rs @@ -268,19 +268,19 @@ impl ShellHandler for Shell { }) } - async fn handle_comm_open( - &self, + fn handle_comm_open( + &mut self, req: Comm, comm: CommSocket, _data: serde_json::Value, - ) -> amalthea::Result { + ) -> amalthea::Result<(bool, Option>)> { // Used to test error replies match req { Comm::Other(name) if name == "unknown" => { return Err(amalthea::Error::Anyhow(anyhow!("unknown comm target"))); }, _ => {}, - } + }; // Open a test comm channel; this test comm channel is used for every // comm open request (regardless of the target name). It just echoes back any @@ -316,6 +316,6 @@ impl ShellHandler for Shell { }, } }); - Ok(true) + Ok((true, None)) } } diff --git a/crates/ark/Cargo.toml b/crates/ark/Cargo.toml index 8c245020f..e4cc943c3 100644 --- a/crates/ark/Cargo.toml +++ b/crates/ark/Cargo.toml @@ -19,6 +19,7 @@ workspace = true [dependencies] actix-web.workspace = true aether_factory.workspace = true +ark_macros.workspace = true aether_lsp_utils.workspace = true aether_parser.workspace = true aether_syntax.workspace = true diff --git a/crates/ark/ark_macros/Cargo.toml b/crates/ark/ark_macros/Cargo.toml new file mode 100644 index 000000000..cbfc07281 --- /dev/null +++ b/crates/ark/ark_macros/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "ark_macros" +version = "0.1.0" +authors.workspace = true +edition.workspace = true +rust-version.workspace = true +license.workspace = true + +[lib] +proc-macro = true + +[lints] +workspace = true + +[dependencies] +quote.workspace = true +syn.workspace = true \ No newline at end of file diff --git a/crates/ark/ark_macros/src/lib.rs b/crates/ark/ark_macros/src/lib.rs new file mode 100644 index 000000000..1eedb75a1 --- /dev/null +++ b/crates/ark/ark_macros/src/lib.rs @@ -0,0 +1,154 @@ +// +// lib.rs +// +// Copyright (C) 2025 Posit Software, PBC. All rights reserved. +// +// + +//! Proc macros for the Ark kernel. +//! +//! ## `#[ark::register]` +//! +//! Registers a function as an R `.Call` entry point with automatic `Console` +//! access and panic safety. Composes with `#[harp::register]`. +//! +//! ```ignore +//! #[ark::register] +//! fn ps_my_function(console: &Console, x: SEXP) -> anyhow::Result { +//! let dc = console.device_context(); +//! Ok(harp::r_null()) +//! } +//! ``` +//! +//! The macro transforms this into: +//! +//! ```ignore +//! #[harp::register] +//! unsafe extern "C-unwind" fn ps_my_function(x: SEXP) -> anyhow::Result { +//! crate::console::Console::with(|console| { +//! let dc = console.device_context(); +//! Ok(harp::r_null()) +//! }) +//! } +//! ``` +//! +//! `harp::register` then adds `r_unwrap()` (Rust error to R error), +//! `r_sandbox()` (catches R longjumps), and ctor-based routine registration. +//! +//! `Console::with()` catches Rust panics (e.g. from `RefCell` borrow +//! violations) and converts them to `anyhow::Error`, which `r_unwrap()` +//! surfaces as a clean R error instead of crashing the session. +//! +//! The first parameter may be `&Console` (any name, type is matched by +//! the last path segment). It is stripped from the generated C signature +//! and injected at runtime. All remaining parameters must be `SEXP`. +//! +//! The return type must be `anyhow::Result`. + +use proc_macro::TokenStream; +use quote::quote; +use syn::parse_macro_input; + +extern crate proc_macro; + +#[proc_macro_attribute] +pub fn register(_attr: TokenStream, item: TokenStream) -> TokenStream { + let function = parse_macro_input!(item as syn::ItemFn); + match register_impl(function) { + Ok(tokens) => tokens, + Err(err) => err.to_compile_error().into(), + } +} + +fn register_impl(function: syn::ItemFn) -> syn::Result { + let span = function.sig.ident.span(); + + // Partition parameters: optional leading `&Console` + remaining SEXP args. + let mut console_ident: Option = None; + let mut sexp_params: Vec = Vec::new(); + + for (i, param) in function.sig.inputs.iter().enumerate() { + let typed = match param { + syn::FnArg::Typed(t) => t, + syn::FnArg::Receiver(r) => { + return Err(syn::Error::new_spanned( + r, + "ark::register functions cannot have a `self` parameter", + )); + }, + }; + + if i == 0 && is_ref_console(&typed.ty) { + if let syn::Pat::Ident(pat) = &*typed.pat { + console_ident = Some(pat.ident.clone()); + } else { + console_ident = Some(syn::Ident::new("console", span)); + } + continue; + } + + if !is_sexp_type(&typed.ty) { + return Err(syn::Error::new_spanned( + &typed.ty, + "ark::register parameters (other than the leading `&Console`) must be `SEXP`", + )); + } + + sexp_params.push(param.clone()); + } + + let ident = &function.sig.ident; + let vis = &function.vis; + let attrs = &function.attrs; + let function_block = &function.block; + + // Build the body: wrap in `Console::with()` if `&Console` was requested, + // otherwise just invoke the block directly. + let body = if let Some(console_name) = console_ident { + quote! { + crate::console::Console::with(|#console_name| #function_block) + } + } else { + quote! { + (|| #function_block)() + } + }; + + Ok(quote! { + #(#attrs)* + #[harp::register] + #vis unsafe extern "C-unwind" fn #ident(#(#sexp_params),*) -> anyhow::Result { + #body + } + } + .into()) +} + +/// Check if a type is `&Console` (matches `&Console` or `&path::to::Console`). +fn is_ref_console(ty: &syn::Type) -> bool { + let syn::Type::Reference(ref_ty) = ty else { + return false; + }; + if ref_ty.mutability.is_some() { + return false; + } + match &*ref_ty.elem { + syn::Type::Path(path) => path + .path + .segments + .last() + .is_some_and(|seg| seg.ident == "Console"), + _ => false, + } +} + +/// Check if a type is `SEXP`. +fn is_sexp_type(ty: &syn::Type) -> bool { + let syn::Type::Path(path) = ty else { + return false; + }; + path.path + .segments + .last() + .is_some_and(|seg| seg.ident == "SEXP") +} diff --git a/crates/ark/src/comm_handler.rs b/crates/ark/src/comm_handler.rs index 6d4dcf6b9..78d5a9f8a 100644 --- a/crates/ark/src/comm_handler.rs +++ b/crates/ark/src/comm_handler.rs @@ -17,8 +17,7 @@ use crossbeam::channel::Sender; use serde::de::DeserializeOwned; use serde::Serialize; use stdext::result::ResultExt; - -use crate::console::Console; +use stdext::DebugRefCell; /// Context provided to `CommHandler` methods, giving access to the outgoing /// channel and close-request mechanism. @@ -72,24 +71,18 @@ pub trait CommHandler: Debug { /// Initialise handler state on the R thread (initial scan, first event, /// etc.). Default is no-op. - fn handle_open(&mut self, _ctx: &CommHandlerContext, _console: &Console) {} + fn handle_open(&mut self, _ctx: &CommHandlerContext) {} /// Handle an incoming message (RPC or data). - fn handle_msg(&mut self, msg: CommMsg, ctx: &CommHandlerContext, console: &Console); + fn handle_msg(&mut self, msg: CommMsg, ctx: &CommHandlerContext); /// Handle comm close. Default is no-op. - fn handle_close(&mut self, _ctx: &CommHandlerContext, _console: &Console) {} + fn handle_close(&mut self, _ctx: &CommHandlerContext) {} /// Called when the environment changes. The `event` indicates what /// triggered the change so handlers can decide whether to react. /// Default is no-op. - fn handle_environment( - &mut self, - _event: &EnvironmentChanged, - _ctx: &CommHandlerContext, - _console: &Console, - ) { - } + fn handle_environment(&mut self, _event: &EnvironmentChanged, _ctx: &CommHandlerContext) {} } /// Why the environment changed. @@ -108,7 +101,8 @@ pub enum EnvironmentChanged { /// A registered comm in the Console's comm table. pub(crate) struct ConsoleComm { - pub(crate) handler: Box, + pub(crate) comm_id: String, + pub(crate) handler: DebugRefCell>, pub(crate) ctx: CommHandlerContext, } @@ -132,7 +126,7 @@ pub fn handle_comm_message( parent_header, data, } => { - let json = dispatch_rpc(comm_name, &data, |req| { + let json = dispatch_rpc(comm_name, data, |req| { let _span = tracing::trace_span!("comm handler", name = comm_name, request = ?req) .entered(); rpc_handler(req) @@ -145,7 +139,7 @@ pub fn handle_comm_message( outgoing_tx.send(response).log_err(); }, CommMsg::Data(data) => { - dispatch_event(comm_name, &data, |evt| { + dispatch_event(comm_name, data, |evt| { let _span = tracing::trace_span!("comm handler", name = comm_name, event = ?evt).entered(); event_handler(evt) @@ -182,7 +176,7 @@ pub fn handle_rpc_request( }, }; - let json = dispatch_rpc(comm_name, &data, |req| { + let json = dispatch_rpc(comm_name, data, |req| { let _span = tracing::trace_span!("comm handler", name = comm_name, request = ?req).entered(); request_handler(req) @@ -198,36 +192,32 @@ pub fn handle_rpc_request( fn dispatch_rpc( comm_name: &str, - data: &serde_json::Value, + data: serde_json::Value, handler: impl FnOnce(Reqs) -> anyhow::Result, ) -> serde_json::Value where Reqs: DeserializeOwned + Debug, Reps: Serialize, { - match serde_json::from_value::(data.clone()) { + match serde_json::from_value::(data) { Ok(m) => match handler(m) { Ok(reply) => match serde_json::to_value(reply) { Ok(value) => value, Err(err) => { - let message = format!( - "Failed to serialise reply for {comm_name} request: {err} (request: {data})" - ); + let message = + format!("Failed to serialise reply for {comm_name} request: {err}"); log::warn!("{message}"); json_rpc_error(JsonRpcErrorCode::InternalError, message) }, }, Err(err) => { - let message = - format!("Failed to process {comm_name} request: {err} (request: {data})"); + let message = format!("Failed to process {comm_name} request: {err}"); log::warn!("{message}"); json_rpc_error(JsonRpcErrorCode::InternalError, message) }, }, Err(err) => { - let message = format!( - "No handler for {comm_name} request (method not found): {err} (request: {data})" - ); + let message = format!("No handler for {comm_name} request (method not found): {err}"); log::warn!("{message}"); json_rpc_error(JsonRpcErrorCode::MethodNotFound, message) }, @@ -236,17 +226,17 @@ where fn dispatch_event( comm_name: &str, - data: &serde_json::Value, + data: serde_json::Value, handler: impl FnOnce(Evts) -> anyhow::Result<()>, ) where Evts: DeserializeOwned + Debug, { - match serde_json::from_value::(data.clone()) { + match serde_json::from_value::(data) { Ok(event) => { handler(event).log_err(); }, Err(err) => { - log::warn!("Failed to parse {comm_name} event: {err} (data: {data})"); + log::warn!("Failed to parse {comm_name} event: {err}"); }, } } diff --git a/crates/ark/src/connections/r_connection.rs b/crates/ark/src/connections/r_connection.rs index 8cf14ec8a..00ee72d83 100644 --- a/crates/ark/src/connections/r_connection.rs +++ b/crates/ark/src/connections/r_connection.rs @@ -92,7 +92,7 @@ impl RConnection { let comm_open_json = serde_json::to_value(self.metadata.clone())?; // Notify the frontend that a new connection has been opened. - let event = CommEvent::Opened(self.comm.clone(), comm_open_json); + let event = CommEvent::Opened(self.comm.clone(), comm_open_json, None); self.comm_event_tx.send(event)?; Ok(()) } diff --git a/crates/ark/src/console.rs b/crates/ark/src/console.rs index 8e512ceaa..8f936fbc3 100644 --- a/crates/ark/src/console.rs +++ b/crates/ark/src/console.rs @@ -9,6 +9,15 @@ // state inside of a global `CONSOLE` singleton that implements `Console`. // The frontend methods called by R are forwarded to the corresponding // `Console` methods via `CONSOLE`. +// +// Interior-mutable fields on `Console` use `DebugRefCell` instead of +// `RefCell`. During the transition away from `Console::get()`/`get_mut()` +// (which bypass the borrow checker via `UnsafeCell`), this lets CI and +// development builds catch reentrancy violations while release builds +// skip the check, matching the existing `UnsafeCell` behaviour that has +// been stable for years. Once the ownership model is fully principled +// and tested, these can be replaced with real `RefCell`s. +// https://github.com/posit-dev/ark/issues/1145 use std::cell::Cell; use std::cell::RefCell; @@ -16,6 +25,7 @@ use std::cell::UnsafeCell; use std::collections::HashMap; use std::ffi::*; use std::os::raw::c_uchar; +use std::rc::Rc; use std::result::Result::Ok; use std::sync::Arc; use std::sync::Mutex; @@ -93,6 +103,7 @@ use once_cell::sync::Lazy; use regex::Regex; use serde_json::json; use stdext::result::ResultExt; +use stdext::DebugRefCell; use stdext::*; use tokio::sync::mpsc::UnboundedReceiver as AsyncUnboundedReceiver; use uuid::Uuid; @@ -102,6 +113,7 @@ mod console_comm; mod console_debug; mod console_error; mod console_filter; +mod console_graphics; mod console_integration; mod console_repl; @@ -114,6 +126,7 @@ pub(crate) use console_debug::FrameSource; use console_error::stack_overflow_occurred; use console_filter::strip_step_lines; use console_filter::ConsoleFilter; +pub use console_repl::catching_panics; pub(crate) use console_repl::console_inputs; pub(crate) use console_repl::r_busy; pub(crate) use console_repl::r_polled_events; @@ -225,9 +238,9 @@ pub struct Console { tasks_idle_any_rx: Receiver, pending_futures: HashMap, RTaskStartInfo, Option)>, - /// Comm ID of the currently connected UI comm, if any. - /// The handler lives in `self.comms`; this is just an index into it. - ui_comm_id: Option, + /// The UI comm, stored separately from `comms` so that `ui_comm()` can + /// borrow it independently of the comms map. + ui_comm: DebugRefCell>, /// Error captured by our global condition handler during the last iteration /// of the REPL. @@ -333,12 +346,12 @@ pub struct Console { /// Stack of topmost environments while waiting for input in ReadConsole. /// Pushed on entry to `r_read_console()`, popped on exit. - /// This is a RefCell since we require `get()` for this field and `RObject` isn't `Copy`. - read_console_env_stack: RefCell>, + /// This is a `DebugRefCell` since we require `get()` for this field and `RObject` isn't `Copy`. + read_console_env_stack: DebugRefCell>, /// Comm handlers registered on the R thread (keyed by comm ID). - comms: HashMap, + comms: DebugRefCell>, /// Graphics device state (plot recording, rendering, comm management). - device_context: DeviceContext, + device_context: Rc, } diff --git a/crates/ark/src/console/console_comm.rs b/crates/ark/src/console/console_comm.rs index 1f6a233d9..6852dce78 100644 --- a/crates/ark/src/console/console_comm.rs +++ b/crates/ark/src/console/console_comm.rs @@ -10,6 +10,7 @@ use amalthea::socket::comm::CommInitiator; use amalthea::socket::comm::CommOutgoingTx; use amalthea::socket::comm::CommSocket; use stdext::result::ResultExt; +use stdext::DebugRefCell; use uuid::Uuid; use crate::comm_handler::CommHandler; @@ -19,22 +20,45 @@ use crate::comm_handler::EnvironmentChanged; use crate::console::Console; use crate::ui::UI_COMM_NAME; +// All methods take `&self`. +// +// Regular comms use a take/remove pattern: we take the comm out of the +// `comms` HashMap before calling the handler, so no `borrow_mut()` guard +// is held during the call. This prevents panics if the handler reenters +// the HashMap. For instance, a data explorer handler calls +// `comm_open_backend` to open a child explorer for a column, which +// needs to `borrow_mut()` the same HashMap to insert. +// +// The UI comm uses a different strategy: the handler is in its own +// `DebugRefCell` inside the `ConsoleComm`, and we borrow the outer +// `ui_comm: DebugRefCell>` with a shared `&` ref during +// dispatch. This keeps the `CommHandlerContext` (and thus the outgoing channel) +// visible to reentrant code that calls `ui_comm()`, e.g. R hooks that send +// fire-and-forget events via `try_ui_comm()?.send_event()`. impl Console { - pub(super) fn comm_handle_msg(&mut self, comm_id: &str, msg: CommMsg) { - let Some(comm) = self.comms.get_mut(comm_id) else { - log::warn!("Received message for unknown registered comm {comm_id}"); + pub(super) fn comm_handle_msg(&self, comm_id: &str, msg: CommMsg) { + if self.is_ui_comm(comm_id) { + self.with_ui_handler_mut(|handler, ctx| { + handler.handle_msg(msg, ctx); + }); return; - }; - comm.handler.handle_msg(msg, &comm.ctx, Console::get()); + } + + self.with_comm_mut(comm_id, |comm| { + comm.handler.get_mut().handle_msg(msg, &comm.ctx); + }); self.drain_closed(); } - pub(super) fn comm_handle_close(&mut self, comm_id: &str) { - let Some(mut comm) = self.comm_remove(comm_id) else { - log::warn!("Received close for unknown registered comm {comm_id}"); + pub(super) fn comm_handle_close(&self, comm_id: &str) { + if let Some(ui) = self.take_ui_comm_if(comm_id) { + ui.handler.into_inner().handle_close(&ui.ctx); return; - }; - comm.handler.handle_close(&comm.ctx, Console::get()); + } + + if let Some(comm) = self.take_comm(comm_id) { + comm.handler.into_inner().handle_close(&comm.ctx); + } } /// Register a backend-initiated comm on the R thread. @@ -48,7 +72,7 @@ impl Console { /// `comm_msg` sent by the caller afterwards are ordered after the /// `comm_open` on IOPub. pub(crate) fn comm_open_backend( - &mut self, + &self, comm_name: &str, mut handler: Box, ) -> anyhow::Result { @@ -63,19 +87,22 @@ impl Console { ); let ctx = CommHandlerContext::new(comm.outgoing_tx.clone(), self.comm_event_tx.clone()); - handler.handle_open(&ctx, Console::get()); + handler.handle_open(&ctx); self.comms - .insert(comm_id.clone(), ConsoleComm { handler, ctx }); - - self.comm_event_tx - .send(CommEvent::Opened(comm, open_metadata))?; - - // Block until Shell has processed the Opened event, ensuring the - // `comm_open` message is on IOPub before we return. Any updates - // the caller sends after this point are guaranteed to follow it. + .borrow_mut() + .insert(comm_id.clone(), ConsoleComm { + comm_id: comm_id.clone(), + handler: DebugRefCell::new(handler), + ctx, + }); + + // Block until Shell has processed the open, ensuring the `comm_open` + // message is on IOPub before we return. Any updates the caller sends + // after this point are guaranteed to follow it. let (done_tx, done_rx) = crossbeam::channel::bounded(0); - self.comm_event_tx.send(CommEvent::Barrier(done_tx))?; + self.comm_event_tx + .send(CommEvent::Opened(comm, open_metadata, Some(done_tx)))?; done_rx.recv()?; Ok(comm_id) @@ -88,61 +115,126 @@ impl Console { /// comm. The `CommSocket` already exists in amalthea's open_comms list, so /// we only need to register the handler and call `handle_open`. pub(super) fn comm_open_frontend( - &mut self, + &self, comm_id: String, comm_name: &str, outgoing_tx: CommOutgoingTx, mut handler: Box, ) { let ctx = CommHandlerContext::new(outgoing_tx, self.comm_event_tx.clone()); - handler.handle_open(&ctx, Console::get()); + handler.handle_open(&ctx); if comm_name == UI_COMM_NAME { - if let Some(old_id) = self.ui_comm_id.take() { + if let Some(old) = self.take_ui_comm() { log::info!("Replacing an existing UI comm."); - if let Some(mut old) = self.comm_remove(&old_id) { - old.handler.handle_close(&old.ctx, Console::get()); - } + old.handler.into_inner().handle_close(&old.ctx); } - self.ui_comm_id = Some(comm_id.clone()); + self.set_ui_comm(ConsoleComm { + comm_id, + handler: DebugRefCell::new(handler), + ctx, + }); + } else { + self.comms + .borrow_mut() + .insert(comm_id.clone(), ConsoleComm { + comm_id, + handler: DebugRefCell::new(handler), + ctx, + }); } - - self.comms.insert(comm_id, ConsoleComm { handler, ctx }); } - pub(super) fn comm_notify_environment_changed(&mut self, event: &EnvironmentChanged) { - for (_, comm) in self.comms.iter_mut() { - comm.handler - .handle_environment(event, &comm.ctx, Console::get()); + pub(super) fn comm_notify_environment_changed(&self, event: &EnvironmentChanged) { + self.with_ui_handler_mut(|handler, ctx| { + handler.handle_environment(event, ctx); + }); + + let ids: Vec = self.comms.borrow().keys().cloned().collect(); + for id in ids { + self.with_comm_mut(&id, |comm| { + comm.handler.get_mut().handle_environment(event, &comm.ctx); + }); } self.drain_closed(); } - /// Remove a comm from the map, clearing `ui_comm_id` if it matches. - fn comm_remove(&mut self, comm_id: &str) -> Option { - if self.ui_comm_id.as_deref() == Some(comm_id) { - self.ui_comm_id = None; + // -- UI comm helpers -------------------------------------------------- + + fn is_ui_comm(&self, comm_id: &str) -> bool { + self.ui_comm + .borrow() + .as_ref() + .is_some_and(|ui| ui.comm_id == comm_id) + } + + /// Borrow the UI comm with `&`, then borrow the handler with `&mut`. + /// + /// Because the outer `RefCell` is only borrowed by shared ref, `ui_comm()` + /// remains functional during handler dispatch and R code that calls + /// back into Rust (e.g. `navigateToFile` from a `frontend_ready` + /// hook) can still send events on the UI comm. + fn with_ui_handler_mut(&self, f: impl FnOnce(&mut Box, &CommHandlerContext)) { + let guard = self.ui_comm.borrow(); + let Some(ui) = guard.as_ref() else { + log::warn!("UI comm is absent during dispatch (reentrant call?)"); + return; + }; + let mut handler = ui.handler.borrow_mut(); + f(&mut handler, &ui.ctx); + } + + fn take_ui_comm(&self) -> Option { + self.ui_comm.borrow_mut().take() + } + + /// Take the UI comm only if its `comm_id` matches. Checks and takes + /// in a single `borrow_mut()` so there is no TOCTOU gap. + fn take_ui_comm_if(&self, comm_id: &str) -> Option { + let mut guard = self.ui_comm.borrow_mut(); + if guard.as_ref().is_some_and(|ui| ui.comm_id == comm_id) { + guard.take() + } else { + None } - self.comms.remove(comm_id) } - /// Remove all comms whose handler requested closing via `ctx.close_on_exit()`. - fn drain_closed(&mut self) { + fn set_ui_comm(&self, ui: ConsoleComm) { + *self.ui_comm.borrow_mut() = Some(ui); + } + + // -- Comms map helpers ------------------------------------------------ + + /// Take a comm out, call `f`, put it back. + fn with_comm_mut(&self, comm_id: &str, f: impl FnOnce(&mut ConsoleComm)) { + let Some(mut comm) = self.take_comm(comm_id) else { + log::warn!("Received message for unknown registered comm {comm_id}"); + return; + }; + f(&mut comm); + self.comms.borrow_mut().insert(comm.comm_id.clone(), comm); + } + + fn take_comm(&self, comm_id: &str) -> Option { + self.comms.borrow_mut().remove(comm_id) + } + + fn drain_closed(&self) { let closed_ids: Vec = self .comms + .borrow() .iter() .filter(|(_, comm)| comm.ctx.is_closed()) .map(|(id, _)| id.clone()) .collect(); for comm_id in closed_ids { - if let Some(comm) = self.comm_remove(&comm_id) { + if let Some(comm) = self.take_comm(&comm_id) { self.comm_notify_closed(&comm_id, &comm); } } } - /// Backend-initiated close cleanup: notify frontend via amalthea. fn comm_notify_closed(&self, comm_id: &str, comm: &ConsoleComm) { comm.ctx.outgoing_tx.send(CommMsg::Close).log_err(); comm.ctx diff --git a/crates/ark/src/console/console_graphics.rs b/crates/ark/src/console/console_graphics.rs new file mode 100644 index 000000000..69c4825db --- /dev/null +++ b/crates/ark/src/console/console_graphics.rs @@ -0,0 +1,42 @@ +// +// console_graphics.rs +// +// Copyright (C) 2026 Posit Software, PBC. All rights reserved. +// + +use amalthea::comm::plot_comm::IntrinsicSize; +use amalthea::comm::plot_comm::PlotRenderSettings; +use amalthea::wire::execute_request::CodeLocation; + +use crate::console::Console; + +impl Console { + /// Push execution context to the graphics device when an execute request starts. + /// + /// Stores the execution_id, code, code_location, and optional sizing overrides + /// so they can be captured when new plots are created during execution. + pub(super) fn graphics_on_execute_request( + &self, + execution_id: String, + code: String, + code_location: Option, + render_settings: Option, + intrinsic_size: Option, + ) { + self.device_context().set_execution_context( + execution_id, + code, + code_location, + render_settings, + intrinsic_size, + ); + } + + /// Process pending graphics changes after an execute request completes. + pub(super) fn graphics_on_did_execute_request(&self) { + let dc = self.device_context(); + dc.process_changes(self); + dc.clear_execution_context(); + dc.clear_pending_origin(); + } +} diff --git a/crates/ark/src/console/console_integration.rs b/crates/ark/src/console/console_integration.rs index 012aafb91..c77d8f889 100644 --- a/crates/ark/src/console/console_integration.rs +++ b/crates/ark/src/console/console_integration.rs @@ -7,6 +7,8 @@ //! Help, LSP, UI comm, and frontend method integration for the R console. +use stdext::cell::DebugRef; + use super::*; use crate::data_explorer::r_data_explorer::DataExplorerMode; use crate::data_explorer::r_data_explorer::InlineDataExplorerData; @@ -21,9 +23,12 @@ impl Console { } pub(crate) fn ui_comm(&self) -> Option> { - let comm = self.comms.get(self.ui_comm_id.as_deref()?)?; + let guard = self.ui_comm.borrow(); + if guard.is_none() { + return None; + } Some(UiCommRef { - comm, + guard, originator: self .active_request .as_ref() @@ -226,14 +231,18 @@ impl Console { /// /// Existence of this value guarantees the comm is connected. pub(crate) struct UiCommRef<'a> { - comm: &'a ConsoleComm, + guard: DebugRef<'a, Option>, originator: Option<&'a Originator>, stdin_request_tx: &'a Sender, } impl UiCommRef<'_> { + fn comm(&self) -> &ConsoleComm { + self.guard.as_ref().unwrap() + } + pub(crate) fn send_event(&self, event: &UiFrontendEvent) { - self.comm.ctx.send_event(event); + self.comm().ctx.send_event(event); } pub(crate) fn busy(&self, busy: bool) { diff --git a/crates/ark/src/console/console_repl.rs b/crates/ark/src/console/console_repl.rs index 16ce3d4a2..a13f9154e 100644 --- a/crates/ark/src/console/console_repl.rs +++ b/crates/ark/src/console/console_repl.rs @@ -10,6 +10,10 @@ //! This module contains `impl Console` with methods and functions related to //! ReadConsole, WriteConsole, and R frontend callbacks. +use std::rc::Rc; + +use stdext::DebugRefCell; + use super::*; use crate::data_explorer::r_data_explorer::POSITRON_DATA_EXPLORER_MIME; use crate::r_task::QueuedRTask; @@ -25,6 +29,18 @@ const DEBUG_COMMANDS: &[&str] = &["c", "cont", "f", "help", "n", "s", "where", " // These are not transient evals: they represent deliberate debugger navigation. const DEBUG_COMMANDS_CONTINUE: &[&str] = &["n", "f", "c", "cont", "Q"]; +thread_local! { + /// When `true`, the global panic hook should return early instead of + /// aborting, so that `catch_unwind` can catch the panic in `Console::with`. + static CATCHING_PANICS: Cell = const { Cell::new(false) }; +} + +/// Returns `true` when we are inside a `Console::with` catch boundary. +/// Checked by the global panic hook to decide whether to abort. +pub fn catching_panics() -> bool { + CATCHING_PANICS.get() +} + /// Used to wait for complete R startup in `Console::wait_initialized()` or /// check for it in `Console::is_initialized()`. /// @@ -603,7 +619,7 @@ impl Console { dap: Arc>, session_mode: SessionMode, ) -> Self { - let device_context = DeviceContext::new(iopub_tx.clone()); + let device_context = Rc::new(DeviceContext::new(iopub_tx.clone(), session_mode)); Self { r_request_rx, @@ -616,7 +632,7 @@ impl Console { comm_msg_originator: None, execution_count: 0, autoprint_output: String::new(), - ui_comm_id: None, + ui_comm: DebugRefCell::new(None), last_error: None, help_event_tx: None, help_port: None, @@ -647,10 +663,10 @@ impl Console { read_console_nested_return: Cell::new(false), read_console_threw_error: Cell::new(false), read_console_pending_action: Cell::new(ReadConsolePendingAction::None), - read_console_env_stack: RefCell::new(Vec::new()), + read_console_env_stack: DebugRefCell::new(Vec::new()), read_console_shutdown: Cell::new(false), debug_filter: ConsoleFilter::new(), - comms: HashMap::new(), + comms: DebugRefCell::new(HashMap::new()), device_context, } } @@ -673,6 +689,37 @@ impl Console { Console::get_mut() } + /// Run a closure with `&Console`, catching panics at the boundary. + /// + /// Intended for use in `#[ark::register]` entry points and C callbacks + /// where a panic would unwind through R's C call stack. Panics are + /// caught and converted to `anyhow::Error`, which `harp::register`'s + /// `r_unwrap()` then surfaces as a clean R error. + pub fn with(f: impl FnOnce(&Console) -> anyhow::Result) -> anyhow::Result { + if cfg!(debug_assertions) { + return f(Console::get()); + } + + CATCHING_PANICS.set(true); + let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| f(Console::get()))); + CATCHING_PANICS.set(false); + + match result { + Ok(result) => result, + Err(panic) => { + let msg = match panic.downcast_ref::<&str>() { + Some(s) => s.to_string(), + None => match panic.downcast_ref::() { + Some(s) => s.clone(), + None => String::from("(unknown payload)"), + }, + }; + + Err(anyhow!("Panic in Console callback: {msg}")) + }, + } + } + /// Install a minimal stopgap `Console` in the thread-local for unit tests /// that need a `&Console` (e.g. to pass to `CommHandler` methods) but /// don't go through the full `Console::start()` path. @@ -763,6 +810,10 @@ impl Console { &self.device_context } + pub(crate) fn device_context_rc(&self) -> Rc { + Rc::clone(&self.device_context) + } + /// Run a closure while capturing console output. /// Returns the closure's result paired with any captured output. pub(crate) fn with_capture(f: impl FnOnce() -> T) -> (T, String) { @@ -793,17 +844,6 @@ impl Console { self.active_request.as_ref().map(|req| &req.request) } - /// Get the current execution context if an active request exists. - /// Returns (execution_id, code) tuple where execution_id is the Jupyter message ID. - pub(crate) fn get_execution_context(&self) -> Option<(String, String)> { - self.active_request.as_ref().map(|req| { - ( - req.originator.header.msg_id.clone(), - req.request.code.clone(), - ) - }) - } - // Async messages for the Console. Processed at interrupt time. async fn process_console_notifications( mut console_notification_rx: AsyncUnboundedReceiver, @@ -1328,7 +1368,7 @@ impl Console { // Check for pending graphics updates // (Important that this occurs while in the "busy" state of this ExecuteRequest // so that the `parent` message is set correctly in any Jupyter messages) - graphics_device::on_did_execute_request(); + self.graphics_on_did_execute_request(); let (reply, result) = Self::prepare_execute_reply(req.exec_count, value); @@ -1388,7 +1428,7 @@ impl Console { .as_ref() .map(graphics_device::compute_plot_overrides) .unwrap_or((None, None)); - graphics_device::on_execute_request( + self.graphics_on_execute_request( originator.header.msg_id.clone(), exec_req.code.clone(), code_location, diff --git a/crates/ark/src/data_explorer/r_data_explorer.rs b/crates/ark/src/data_explorer/r_data_explorer.rs index 4d267414a..43b208cdf 100644 --- a/crates/ark/src/data_explorer/r_data_explorer.rs +++ b/crates/ark/src/data_explorer/r_data_explorer.rs @@ -481,18 +481,13 @@ impl CommHandler for RDataExplorer { serde_json::json!({ "title": self.title, "inline_only": inline_only }) } - fn handle_msg(&mut self, msg: CommMsg, ctx: &CommHandlerContext, _console: &Console) { + fn handle_msg(&mut self, msg: CommMsg, ctx: &CommHandlerContext) { handle_rpc_request(&ctx.outgoing_tx, DATA_EXPLORER_COMM_NAME, msg, |req| { self.handle_rpc(req, ctx) }); } - fn handle_environment( - &mut self, - event: &EnvironmentChanged, - ctx: &CommHandlerContext, - _console: &Console, - ) { + fn handle_environment(&mut self, event: &EnvironmentChanged, ctx: &CommHandlerContext) { let EnvironmentChanged::Execution { .. } = event else { return; }; diff --git a/crates/ark/src/lib.rs b/crates/ark/src/lib.rs index fe32f8d74..864646687 100644 --- a/crates/ark/src/lib.rs +++ b/crates/ark/src/lib.rs @@ -5,6 +5,10 @@ // // +extern crate self as ark; + +pub use ark_macros::register; + pub mod analysis; pub mod ark_comm; pub mod browser; diff --git a/crates/ark/src/main.rs b/crates/ark/src/main.rs index e87ab1d2d..451da0f62 100644 --- a/crates/ark/src/main.rs +++ b/crates/ark/src/main.rs @@ -13,6 +13,7 @@ use std::env; use amalthea::kernel; use amalthea::kernel_spec::KernelSpec; use anyhow::Context; +use ark::console::catching_panics; use ark::console::SessionMode; use ark::logger; use ark::repos::DefaultRepos; @@ -392,31 +393,32 @@ fn main() -> anyhow::Result<()> { String::from("No location information:") }; - let append_trace = |info: &str| -> String { - // Top-level-exec and try-catch errors already contain a backtrace - // for the R thread so don't repeat it if we see one. Only perform - // this check on the R thread because we do want other threads' - // backtraces if the panic occurred elsewhere. - if ON_R_THREAD.get() && info.contains("\n{R_BACKTRACE_HEADER}\n") { - String::from("") - } else { - format!( - "\n\nBacktrace:\n{}", - std::backtrace::Backtrace::force_capture() - ) - } - }; + let msg: String; + if let Some(s) = info.downcast_ref::<&str>() { + msg = s.to_string(); + } else if let Some(s) = info.downcast_ref::() { + msg = s.clone(); + } else { + msg = String::from("No contextual information."); + } - // Report panic to the frontend - if let Some(info) = info.downcast_ref::<&str>() { - let trace = append_trace(info); - log::error!("Panic! {loc} {info:}{trace}"); - } else if let Some(info) = info.downcast_ref::() { - let trace = append_trace(info); - log::error!("Panic! {loc} {info:}{trace}"); + // Top-level-exec and try-catch errors already contain a backtrace + // for the R thread so don't repeat it if we see one. Only perform + // this check on the R thread because we do want other threads' + // backtraces if the panic occurred elsewhere. + let trace = if ON_R_THREAD.get() && msg.contains("\n{R_BACKTRACE_HEADER}\n") { + String::new() } else { - let trace = format!("Backtrace:\n{}", std::backtrace::Backtrace::force_capture()); - log::error!("Panic! {loc} No contextual information.\n{trace}"); + format!("Backtrace:\n{}", std::backtrace::Backtrace::force_capture()) + }; + + log::error!("Panic! {loc} {msg}\n{trace}"); + + // `Console::with()` catches panics with `catch_unwind` in release + // builds. Return early so the catch handler can convert the panic + // to an `anyhow::Error`. The backtrace is already logged above. + if catching_panics() { + return; } // We don't want the threads managed by a Tokio runtime to `abort()` the diff --git a/crates/ark/src/plots/graphics_device.rs b/crates/ark/src/plots/graphics_device.rs index 97e258a9a..c6e9c6685 100644 --- a/crates/ark/src/plots/graphics_device.rs +++ b/crates/ark/src/plots/graphics_device.rs @@ -13,6 +13,7 @@ use std::fmt::Display; use std::fs::File; use std::io::BufReader; use std::io::Read; +use std::rc::Rc; use amalthea::comm::comm_channel::CommMsg; use amalthea::comm::plot_comm::IntrinsicSize; @@ -83,10 +84,11 @@ struct WrappedDeviceCallbacks { } #[derive(Debug, Clone, Hash, PartialEq, Eq)] -struct PlotId(String); +pub(crate) struct PlotId(String); /// Execution context captured when an execute request starts. /// Stored on the graphics device so it can be associated with plots created during execution. +#[derive(Clone, Default)] struct ExecutionContext { execution_id: String, code: String, @@ -112,7 +114,15 @@ struct PlotContext { /// R's callback registration layer. A future refactor could wrap the C-to-Rust /// bridge so that the Rust-facing hook methods receive `&mut self` explicitly, /// containing the `Console::get()` unsoundness in one place. +/// +/// NOTE: Never hold a `RefCell` borrow while calling into R (`RObject::from`, +/// `RFunction::call`, `libr::Rf_*`, etc.). Any R call can in principle re-enter +/// Rust (e.g. via finalizers during GC), so keeping borrows short avoids +/// `RefCell` panics. pub(crate) struct DeviceContext { + /// Whether we are running in Console, Notebook, or Background mode. + session_mode: SessionMode, + /// Channel for sending [IOPubMessage::DisplayData] and /// [IOPubMessage::UpdateDisplayData] to Jupyter frontends when plot events occur iopub_tx: Sender, @@ -188,9 +198,16 @@ pub(crate) struct DeviceContext { pending_origin: RefCell>>, } +impl std::fmt::Debug for DeviceContext { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("DeviceContext").finish_non_exhaustive() + } +} + impl DeviceContext { - pub fn new(iopub_tx: Sender) -> Self { + pub fn new(iopub_tx: Sender, session_mode: SessionMode) -> Self { Self { + session_mode, iopub_tx, has_changes: Cell::new(false), is_new_page: Cell::new(true), @@ -220,7 +237,7 @@ impl DeviceContext { } /// Set the current execution context (called when an execute request starts) - fn set_execution_context( + pub(crate) fn set_execution_context( &self, execution_id: String, code: String, @@ -238,7 +255,7 @@ impl DeviceContext { } /// Clear the current execution context (called when an execute request completes) - fn clear_execution_context(&self) { + pub(crate) fn clear_execution_context(&self) { *self.execution_context.borrow_mut() = None; } @@ -265,7 +282,7 @@ impl DeviceContext { } /// Clear any unconsumed pending origin. - fn clear_pending_origin(&self) { + pub(crate) fn clear_pending_origin(&self) { self.pending_origin.replace(None); } @@ -277,14 +294,13 @@ impl DeviceContext { self.clear_pending_origin(); } - /// Should plot events be sent over [CommSocket]s to the frontend? + /// Should plot events be sent over comm channels to the frontend? /// /// This allows plots to be dynamically resized by their `id`. Only possible if the UI /// comm is connected (i.e. we are connected to Positron) and if we are in /// [SessionMode::Console] mode. - fn should_use_dynamic_plots(&self) -> bool { - let console = Console::get(); - console.ui_comm().is_some() && console.session_mode() == SessionMode::Console + fn should_use_dynamic_plots(&self, console: &Console) -> bool { + self.session_mode == SessionMode::Console && console.ui_comm().is_some() } /// Deactivation hook @@ -312,12 +328,12 @@ impl DeviceContext { /// ggsave("temp.png", p) /// ``` #[tracing::instrument(level = "trace", skip_all)] - fn hook_deactivate(&self) { - self.process_changes(); + fn hook_deactivate(&self, console: &Console) { + self.process_changes(console); } #[tracing::instrument(level = "trace", skip_all, fields(level = %level))] - fn hook_holdflush(&self, level: i32) { + fn hook_holdflush(&self, level: i32, console: &Console) { // Be extra safe and check `level <= 0` rather than just `level == 0` in case // our shadowed device returns a negative `level` let is_released = level <= 0; @@ -325,7 +341,7 @@ impl DeviceContext { // Flush deferred changes on hold→release transition if !was_rendering && is_released { - self.process_changes(); + self.process_changes(console); } } @@ -368,37 +384,13 @@ impl DeviceContext { /// Capture the current execution context for a new plot. /// - /// First checks for context pushed via `on_execute_request()`, then falls back - /// to getting context from Console's active request (for backwards compatibility - /// and edge cases). The fallback path does not include `code_location`. + /// Returns the context pushed via `graphics_on_execute_request()`, or an + /// empty default for plots created outside of an execute request (e.g. + /// during startup). fn capture_execution_context(&self) -> ExecutionContext { - // Check if we have a stored execution context from on_execute_request() - let stored = self.execution_context.borrow(); - if let Some(ctx) = stored.as_ref() { - return ExecutionContext { - execution_id: ctx.execution_id.clone(), - code: ctx.code.clone(), - code_location: ctx.code_location.clone(), - render_settings: ctx.render_settings, - intrinsic_size: ctx.intrinsic_size.clone(), - }; - } - drop(stored); - - // Fall back to getting context from Console (for edge cases). - // This path does not provide code_location or plot overrides. - let (execution_id, code) = Console::get().get_execution_context().unwrap_or_else(|| { - // No active request - might be during startup or from R code - (String::new(), String::new()) - }); - - ExecutionContext { - execution_id, - code, - code_location: None, - render_settings: None, - intrinsic_size: None, - } + // No execution context was pushed. This can happen for plots created + // outside of an execute request (e.g. during startup). + self.execution_context.borrow().clone().unwrap_or_default() } /// Determine the plot origin for a new plot. @@ -538,7 +530,7 @@ impl DeviceContext { let mime_type = Self::get_mime_type(&plot_meta.format); Ok(PlotBackendReply::RenderReply(PlotResult { - data: data.to_string(), + data, mime_type: mime_type.to_string(), settings: Some(settings), })) @@ -560,18 +552,18 @@ impl DeviceContext { // If the currently active plot is closed, advance to a new Positron page // See https://github.com/posit-dev/positron/issues/6702. - if *self.id.borrow() == *id { + if self.id() == *id { self.new_positron_page(); } } - fn get_mime_type(format: &PlotRenderFormat) -> String { + fn get_mime_type(format: &PlotRenderFormat) -> &'static str { match format { - PlotRenderFormat::Png => "image/png".to_string(), - PlotRenderFormat::Svg => "image/svg+xml".to_string(), - PlotRenderFormat::Pdf => "application/pdf".to_string(), - PlotRenderFormat::Jpeg => "image/jpeg".to_string(), - PlotRenderFormat::Tiff => "image/tiff".to_string(), + PlotRenderFormat::Png => "image/png", + PlotRenderFormat::Svg => "image/svg+xml", + PlotRenderFormat::Pdf => "application/pdf", + PlotRenderFormat::Jpeg => "image/jpeg", + PlotRenderFormat::Tiff => "image/tiff", } } @@ -580,7 +572,7 @@ impl DeviceContext { /// Uses execution context stored via `on_execute_request()` or falls back to /// getting context from Console's active request. #[tracing::instrument(level = "trace", skip_all)] - fn process_changes(&self) { + pub(crate) fn process_changes(&self, console: &Console) { let id = self.id(); if !self.has_changes.get() { @@ -616,15 +608,15 @@ impl DeviceContext { self.has_changes.replace(false); if self.is_new_page.replace(false) { - self.process_new_plot(&id); + self.process_new_plot(&id, console); } else { - self.process_update_plot(&id); + self.process_update_plot(&id, console); } } - fn process_new_plot(&self, id: &PlotId) { - if self.should_use_dynamic_plots() { - self.process_new_plot_positron(id); + fn process_new_plot(&self, id: &PlotId, console: &Console) { + if self.should_use_dynamic_plots(console) { + self.process_new_plot_positron(id, console); } else { self.process_new_plot_jupyter_protocol(id); } @@ -644,7 +636,7 @@ impl DeviceContext { } #[tracing::instrument(level = "trace", skip_all, fields(id = %id))] - fn process_new_plot_positron(&self, id: &PlotId) { + fn process_new_plot_positron(&self, id: &PlotId, console: &Console) { log::trace!("Notifying Positron of new plot"); let ctx = self.capture_execution_context(); @@ -661,7 +653,7 @@ impl DeviceContext { let mime_type = Self::get_mime_type(&PlotRenderFormat::Png); let pre_render = PlotResult { - data: pre_render.to_string(), + data: pre_render, mime_type: mime_type.to_string(), settings: Some(settings), }; @@ -677,9 +669,10 @@ impl DeviceContext { let plot_comm = PlotComm { id: id.clone(), open_data, + device_context: console.device_context_rc(), }; - match Console::get_mut().comm_open_backend(PLOT_COMM_NAME, Box::new(plot_comm)) { + match console.comm_open_backend(PLOT_COMM_NAME, Box::new(plot_comm)) { Ok(comm_id) => { self.comm_ids.borrow_mut().insert(id.clone(), comm_id); }, @@ -711,9 +704,11 @@ impl DeviceContext { display_id: id.to_string(), data: None, }; - let transient = serde_json::to_value(transient).unwrap(); + let Some(transient) = serde_json::to_value(transient).log_err() else { + return; + }; - log::info!("Sending display data to IOPub."); + log::trace!("Sending display data to IOPub."); self.iopub_tx .send(IOPubMessage::DisplayData(DisplayData { @@ -744,8 +739,8 @@ impl DeviceContext { }); } - fn process_update_plot(&self, id: &PlotId) { - if self.should_use_dynamic_plots() { + fn process_update_plot(&self, id: &PlotId, console: &Console) { + if self.should_use_dynamic_plots(console) { self.process_update_plot_positron(id); } else { self.process_update_plot_jupyter_protocol(id); @@ -770,7 +765,7 @@ impl DeviceContext { let mime_type = Self::get_mime_type(&settings.format); let pre_render = PlotResult { - data: pre_render.to_string(), + data: pre_render, mime_type: mime_type.to_string(), settings: Some(settings), }; @@ -785,7 +780,10 @@ impl DeviceContext { }, }; - let value = serde_json::to_value(PlotFrontendEvent::Update(update_params)).unwrap(); + let Some(value) = serde_json::to_value(PlotFrontendEvent::Update(update_params)).log_err() + else { + return; + }; let outgoing_tx = CommOutgoingTx::new(comm_id, self.iopub_tx.clone()); outgoing_tx @@ -811,7 +809,7 @@ impl DeviceContext { data: None, }; - log::info!("Sending update display data to IOPub for `id` {id}."); + log::trace!("Sending update display data to IOPub for `id` {id}."); self.iopub_tx .send(IOPubMessage::UpdateDisplayData(UpdateDisplayData { @@ -848,7 +846,7 @@ impl DeviceContext { }); let mut map = serde_json::Map::new(); - map.insert("image/png".to_string(), serde_json::to_value(data).unwrap()); + map.insert("image/png".to_string(), serde_json::to_value(data)?); Ok(serde_json::Value::Object(map)) } @@ -907,6 +905,7 @@ impl DeviceContext { struct PlotComm { id: PlotId, open_data: serde_json::Value, + device_context: Rc, } impl CommHandler for PlotComm { @@ -914,15 +913,14 @@ impl CommHandler for PlotComm { self.open_data.clone() } - fn handle_msg(&mut self, msg: CommMsg, ctx: &CommHandlerContext, console: &Console) { - let dc = console.device_context(); + fn handle_msg(&mut self, msg: CommMsg, ctx: &CommHandlerContext) { handle_rpc_request(&ctx.outgoing_tx, PLOT_COMM_NAME, msg, |req| { - dc.handle_rpc(req, &self.id) + self.device_context.handle_rpc(req, &self.id) }); } - fn handle_close(&mut self, _ctx: &CommHandlerContext, console: &Console) { - console.device_context().on_plot_closed(&self.id); + fn handle_close(&mut self, _ctx: &CommHandlerContext) { + self.device_context.on_plot_closed(&self.id); } } @@ -1092,58 +1090,8 @@ pub(crate) fn compute_plot_overrides( ) } -/// Hook applied when an execute request starts -/// -/// Pushes the execution context (execution_id, code, code_location) to the graphics device -/// so it can be captured when new plots are created. This allows plots to be -/// correctly attributed to the code that generated them. -/// -/// Called from `handle_execute_request()` after setting the active request. -#[tracing::instrument(level = "trace", skip_all)] -pub(crate) fn on_execute_request( - execution_id: String, - code: String, - code_location: Option, - render_settings: Option, - intrinsic_size: Option, -) { - log::trace!("Entering on_execute_request"); - Console::get().device_context().set_execution_context( - execution_id, - code, - code_location, - render_settings, - intrinsic_size, - ); -} - -/// Hook applied after a code chunk has finished executing -/// -/// Not an official graphics device hook, instead we run this manually after -/// completing execution of a chunk of R code. -/// -/// This is particularly useful for recording "partial" states within a single -/// page, for example: -/// -/// ```r -/// # Run this line by line -/// par(mfrow = c(2, 1)) -/// plot(1:10) -/// ``` -/// -/// After `plot(1:10)`, we've only plotted 1 of 2 potential plots on the page, -/// but we can still render this intermediate state and show it to the user until -/// they add more plots or advance to another new page. -#[tracing::instrument(level = "trace", skip_all)] -pub(crate) fn on_did_execute_request() { - log::trace!("Entering on_did_execute_request"); - let dc = Console::get().device_context(); - dc.process_changes(); - dc.clear_execution_context(); - dc.clear_pending_origin(); -} - -/// Activation callback +/// Run a closure with `&Console` and `&DeviceContext`, catching any panic at +/// the FFI boundary. Graphics device callbacks are invoked from R /// /// Only used for logging /// @@ -1167,10 +1115,12 @@ unsafe extern "C-unwind" fn callback_activate(dev: pDevDesc) { unsafe extern "C-unwind" fn callback_deactivate(dev: pDevDesc) { log::trace!("Entering callback_deactivate"); - let dc = Console::get().device_context(); + let console = Console::get(); + let dc = console.device_context(); + // We run our hook first to record before we deactivate the underlying device, // in case device deactivation messes with the display list - dc.hook_deactivate(); + dc.hook_deactivate(console); if let Some(callback) = dc.wrapped_callbacks.deactivate.get() { callback(dev); } @@ -1180,7 +1130,8 @@ unsafe extern "C-unwind" fn callback_deactivate(dev: pDevDesc) { unsafe extern "C-unwind" fn callback_holdflush(dev: pDevDesc, level_delta: i32) -> i32 { log::trace!("Entering callback_holdflush"); - let dc = Console::get().device_context(); + let console = Console::get(); + let dc = console.device_context(); // If our wrapped device has a `holdflush()` method, we rely on it to apply // the `level_delta` (typically `+1` or `-1`) and return the new level. Otherwise // we follow the lead of `devholdflush()` in R and use a resolved `level` of `0`. @@ -1199,7 +1150,7 @@ unsafe extern "C-unwind" fn callback_holdflush(dev: pDevDesc, level_delta: i32) level }, }; - dc.hook_holdflush(level); + dc.hook_holdflush(level, console); level } @@ -1210,7 +1161,8 @@ unsafe extern "C-unwind" fn callback_holdflush(dev: pDevDesc, level_delta: i32) unsafe extern "C-unwind" fn callback_mode(mode: i32, dev: pDevDesc) { log::trace!("Entering callback_mode"); - let dc = Console::get().device_context(); + let console = Console::get(); + let dc = console.device_context(); if let Some(callback) = dc.wrapped_callbacks.mode.get() { callback(mode, dev); } @@ -1304,13 +1256,13 @@ unsafe extern "C-unwind" fn ps_graphics_device() -> anyhow::Result { /// that intermediate plot since we are still on the same plot page with the same plot /// `id`. #[tracing::instrument(level = "trace", skip_all)] -#[harp::register] -unsafe extern "C-unwind" fn ps_graphics_before_plot_new(_name: SEXP) -> anyhow::Result { +#[ark::register] +fn ps_graphics_before_plot_new(console: &Console, _name: SEXP) -> anyhow::Result { log::trace!("Entering ps_graphics_before_plot_new"); // Process changes related to the last plot before opening a new page. // Particularly important if we make multiple plots in a single chunk. - Console::get().device_context().process_changes(); + console.device_context().process_changes(console); Ok(harp::r_null()) } @@ -1320,66 +1272,70 @@ unsafe extern "C-unwind" fn ps_graphics_before_plot_new(_name: SEXP) -> anyhow:: /// Returns a named list with fields: name, kind, execution_id, code, origin_uri. /// Returns NULL if no metadata is found for the given ID. #[tracing::instrument(level = "trace", skip_all)] -#[harp::register] -unsafe extern "C-unwind" fn ps_graphics_get_metadata(id: SEXP) -> anyhow::Result { +#[ark::register] +fn ps_graphics_get_metadata(console: &Console, id: SEXP) -> anyhow::Result { let id_str: String = RObject::view(id).try_into()?; let plot_id = PlotId(id_str); - let contexts = Console::get().device_context().plot_contexts.borrow(); - match contexts.get(&plot_id) { - Some(ctx) => { - let info = &ctx.metadata; - let origin_uri = info.origin.as_ref().map(|o| o.uri.as_str()).unwrap_or(""); - - // Create a list with the metadata values - let values: Vec = vec![ - RObject::from(info.name.as_str()), - RObject::from(info.kind.as_str()), - RObject::from(info.execution_id.as_str()), - RObject::from(info.code.as_str()), - RObject::from(origin_uri), - ]; - let list = RObject::try_from(values)?; - - // Set the names attribute - let names: Vec = vec![ - "name".to_string(), - "kind".to_string(), - "execution_id".to_string(), - "code".to_string(), - "origin_uri".to_string(), - ]; - let names = RObject::from(names); - libr::Rf_setAttrib(list.sexp, libr::R_NamesSymbol, names.sexp); - - Ok(list.sexp) - }, - None => Ok(harp::r_null()), - } + // Clone metadata out of the borrow before calling into R. R allocations + // (`RObject::from()`, `Rf_setAttrib()`, etc.) can trigger finalizers or + // error handlers that re-enter `plot_contexts.borrow_mut()`, which would + // panic if the shared borrow were still held. + let metadata = { + let contexts = console.device_context().plot_contexts.borrow(); + contexts.get(&plot_id).map(|ctx| ctx.metadata.clone()) + }; + + let Some(info) = metadata else { + return Ok(harp::r_null()); + }; + + let origin_uri = info.origin.as_ref().map(|o| o.uri.as_str()).unwrap_or(""); + + let values: Vec = vec![ + RObject::from(info.name.as_str()), + RObject::from(info.kind.as_str()), + RObject::from(info.execution_id.as_str()), + RObject::from(info.code.as_str()), + RObject::from(origin_uri), + ]; + let list = RObject::try_from(values)?; + + let names: Vec = vec![ + "name".to_string(), + "kind".to_string(), + "execution_id".to_string(), + "code".to_string(), + "origin_uri".to_string(), + ]; + let names = RObject::from(names); + libr::Rf_setAttrib(list.sexp, libr::R_NamesSymbol, names.sexp); + + Ok(list.sexp) } /// Return the current plot ID. Used by tests to verify that layout panels /// share the same page (same ID) and that overflow creates a new page. -#[harp::register] -unsafe extern "C-unwind" fn ps_graphics_current_plot_id() -> anyhow::Result { - let id = Console::get().device_context().id(); +#[ark::register] +fn ps_graphics_current_plot_id(console: &Console) -> anyhow::Result { + let id = console.device_context().id(); Ok(RObject::from(&id).sexp) } /// Push a source file URI onto the source context stack. /// Called from the `source()` hook when entering a sourced file. -#[harp::register] -unsafe extern "C-unwind" fn ps_graphics_push_source_context(uri: SEXP) -> anyhow::Result { +#[ark::register] +fn ps_graphics_push_source_context(console: &Console, uri: SEXP) -> anyhow::Result { let uri_str: String = RObject::view(uri).try_into()?; - Console::get().device_context().push_source_context(uri_str); + console.device_context().push_source_context(uri_str); Ok(harp::r_null()) } /// Pop a source file URI from the source context stack. /// Called from the `source()` hook when leaving a sourced file. -#[harp::register] -unsafe extern "C-unwind" fn ps_graphics_pop_source_context() -> anyhow::Result { - Console::get().device_context().pop_source_context(); +#[ark::register] +fn ps_graphics_pop_source_context(console: &Console) -> anyhow::Result { + console.device_context().pop_source_context(); Ok(harp::r_null()) } @@ -1404,3 +1360,58 @@ fn r_option_positive_f64(name: &str) -> Option { _ => None, } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::console::SessionMode; + + fn test_device_context() -> DeviceContext { + let (tx, _rx) = crossbeam::channel::unbounded(); + DeviceContext::new(tx, SessionMode::Console) + } + + #[test] + fn test_capture_execution_context_default_when_empty() { + let dc = test_device_context(); + let ctx = dc.capture_execution_context(); + assert_eq!(ctx.execution_id, ""); + assert_eq!(ctx.code, ""); + assert!(ctx.code_location.is_none()); + assert!(ctx.render_settings.is_none()); + assert!(ctx.intrinsic_size.is_none()); + } + + #[test] + fn test_capture_execution_context_returns_stored() { + let dc = test_device_context(); + dc.set_execution_context( + String::from("msg-123"), + String::from("plot(1:10)"), + None, + None, + None, + ); + + let ctx = dc.capture_execution_context(); + assert_eq!(ctx.execution_id, "msg-123"); + assert_eq!(ctx.code, "plot(1:10)"); + } + + #[test] + fn test_capture_execution_context_after_clear() { + let dc = test_device_context(); + dc.set_execution_context( + String::from("msg-123"), + String::from("plot(1:10)"), + None, + None, + None, + ); + dc.clear_execution_context(); + + let ctx = dc.capture_execution_context(); + assert_eq!(ctx.execution_id, ""); + assert_eq!(ctx.code, ""); + } +} diff --git a/crates/ark/src/r_task.rs b/crates/ark/src/r_task.rs index 3d54bb92a..b5eb3e6c7 100644 --- a/crates/ark/src/r_task.rs +++ b/crates/ark/src/r_task.rs @@ -519,3 +519,20 @@ unsafe extern "C-unwind" fn ps_test_spawn_sleeping_idle_tasks( Ok(libr::R_NilValue) } + +/// Spawn an idle task that evaluates an R expression. Used in integration tests. +#[cfg(debug_assertions)] +#[harp::register] +unsafe extern "C-unwind" fn ps_test_spawn_eval_idle_task(code: SEXP) -> anyhow::Result { + stdext::assert_testing(); + + let code: String = harp::RObject::view(code).try_into()?; + + spawn(RTask::idle(async move |_capture| { + if let Err(err) = harp::parse_eval_global(&code) { + log::error!("Idle task eval failed: {err:?}"); + } + })); + + Ok(libr::R_NilValue) +} diff --git a/crates/ark/src/reticulate.rs b/crates/ark/src/reticulate.rs index 2a5b5d182..b465fc9fe 100644 --- a/crates/ark/src/reticulate.rs +++ b/crates/ark/src/reticulate.rs @@ -66,6 +66,7 @@ impl ReticulateService { "input": input, "reticulate_id": (*RETICULATE_ID).clone(), }), + None, ); service.comm_event_tx.send(event).log_err(); diff --git a/crates/ark/src/shell.rs b/crates/ark/src/shell.rs index 35ab1d6f7..01fcfdb49 100644 --- a/crates/ark/src/shell.rs +++ b/crates/ark/src/shell.rs @@ -31,6 +31,7 @@ use async_trait::async_trait; use bus::BusReader; use crossbeam::channel::bounded; use crossbeam::channel::unbounded; +use crossbeam::channel::Receiver; use crossbeam::channel::Sender; use harp::environment::R_ENVS; use harp::line_ending::convert_line_endings; @@ -232,18 +233,20 @@ impl ShellHandler for Shell { /// /// Note that there might be multiple requests during a single session if /// the UI has been disconnected and reconnected. - async fn handle_comm_open( - &self, + fn handle_comm_open( + &mut self, target: Comm, comm: CommSocket, data: serde_json::Value, - ) -> amalthea::Result { + ) -> amalthea::Result<(bool, Option>)> { match target { - Comm::Variables => handle_comm_open_variables(comm), + Comm::Variables => Ok((handle_comm_open_variables(comm)?, None)), Comm::Ui => handle_comm_open_ui(comm, self.kernel_request_tx.clone(), data), - Comm::Help => handle_comm_open_help(comm), - Comm::Other(target_name) if target_name == "ark" => ArkComm::handle_comm_open(comm), - _ => Ok(false), + Comm::Help => Ok((handle_comm_open_help(comm)?, None)), + Comm::Other(target_name) if target_name == "ark" => { + Ok((ArkComm::handle_comm_open(comm)?, None)) + }, + _ => Ok((false, None)), } } @@ -253,18 +256,18 @@ impl ShellHandler for Shell { comm_name: &str, msg: CommMsg, originator: Originator, - ) -> amalthea::Result { + ) -> amalthea::Result<(CommHandled, Option>)> { match comm_name { DATA_EXPLORER_COMM_NAME | PLOT_COMM_NAME | UI_COMM_NAME => { - self.dispatch_kernel_request(|done_tx| KernelRequest::CommMsg { + let done_rx = self.start_kernel_request(|done_tx| KernelRequest::CommMsg { comm_id: comm_id.to_string(), msg, originator: Box::new(originator), done_tx, })?; - Ok(CommHandled::Handled) + Ok((CommHandled::Handled, Some(done_rx))) }, - _ => Ok(CommHandled::NotHandled), + _ => Ok((CommHandled::NotHandled, None)), } } @@ -272,34 +275,33 @@ impl ShellHandler for Shell { &mut self, comm_id: &str, comm_name: &str, - ) -> amalthea::Result { + ) -> amalthea::Result<(CommHandled, Option>)> { match comm_name { DATA_EXPLORER_COMM_NAME | PLOT_COMM_NAME | UI_COMM_NAME => { - self.dispatch_kernel_request(|done_tx| KernelRequest::CommClose { + let done_rx = self.start_kernel_request(|done_tx| KernelRequest::CommClose { comm_id: comm_id.to_string(), done_tx, })?; - Ok(CommHandled::Handled) + Ok((CommHandled::Handled, Some(done_rx))) }, - _ => Ok(CommHandled::NotHandled), + _ => Ok((CommHandled::NotHandled, None)), } } } impl Shell { - /// Send a `KernelRequest` to the R thread and block until it's processed. - fn dispatch_kernel_request( + /// Send a `KernelRequest` to the R thread and return a completion receiver. + /// The caller (Shell) select-loops on this receiver and `comm_event_rx` + /// to drain comm events while the request is processed. + fn start_kernel_request( &self, build: impl FnOnce(Sender<()>) -> KernelRequest, - ) -> amalthea::Result<()> { + ) -> amalthea::Result> { let (done_tx, done_rx) = bounded(0); self.kernel_request_tx .send(build(done_tx)) .map_err(|err| amalthea::Error::SendError(err.to_string()))?; - done_rx - .recv() - .map_err(|err| amalthea::Error::ReceiveError(err.to_string()))?; - Ok(()) + Ok(done_rx) } } @@ -315,7 +317,7 @@ fn handle_comm_open_ui( comm: CommSocket, kernel_request_tx: Sender, data: serde_json::Value, -) -> amalthea::Result { +) -> amalthea::Result<(bool, Option>)> { let handler = UiComm::new(data); let (done_tx, done_rx) = bounded(0); @@ -328,11 +330,8 @@ fn handle_comm_open_ui( done_tx, }) .map_err(|err| amalthea::Error::SendError(err.to_string()))?; - done_rx - .recv() - .map_err(|err| amalthea::Error::ReceiveError(err.to_string()))?; - Ok(true) + Ok((true, Some(done_rx))) } fn handle_comm_open_help(comm: CommSocket) -> amalthea::Result { diff --git a/crates/ark/src/ui/ui_comm.rs b/crates/ark/src/ui/ui_comm.rs index ecd7c9823..e61942fab 100644 --- a/crates/ark/src/ui/ui_comm.rs +++ b/crates/ark/src/ui/ui_comm.rs @@ -51,7 +51,7 @@ pub struct UiComm { } impl CommHandler for UiComm { - fn handle_open(&mut self, ctx: &CommHandlerContext, _console: &Console) { + fn handle_open(&mut self, ctx: &CommHandlerContext) { // Set initial console width from the comm_open data, if provided. if let Some(width) = self.comm_open_data.console_width { if let Err(err) = RFunction::from(".ps.rpc.setConsoleWidth") @@ -70,7 +70,7 @@ impl CommHandler for UiComm { self.refresh(&input_prompt, &continuation_prompt, ctx); } - fn handle_msg(&mut self, msg: CommMsg, ctx: &CommHandlerContext, _console: &Console) { + fn handle_msg(&mut self, msg: CommMsg, ctx: &CommHandlerContext) { let this = &*self; handle_comm_message( &ctx.outgoing_tx, @@ -81,12 +81,7 @@ impl CommHandler for UiComm { ); } - fn handle_environment( - &mut self, - event: &EnvironmentChanged, - ctx: &CommHandlerContext, - _console: &Console, - ) { + fn handle_environment(&mut self, event: &EnvironmentChanged, ctx: &CommHandlerContext) { let EnvironmentChanged::Execution { input_prompt, continuation_prompt, @@ -323,7 +318,7 @@ mod tests { })) .unwrap(), }; - handler.handle_msg(msg, &ctx, Console::get()); + handler.handle_msg(msg, &ctx); // Assert that the console width changed let new_width: i32 = harp::get_option("width").try_into().unwrap(); @@ -339,7 +334,7 @@ mod tests { })) .unwrap(), }; - handler.handle_msg(msg, &ctx, Console::get()); + handler.handle_msg(msg, &ctx); old_width }); @@ -385,7 +380,7 @@ mod tests { })) .unwrap(), }; - handler.handle_msg(msg, &ctx, Console::get()); + handler.handle_msg(msg, &ctx); }); let response = iopub_rx.recv_comm_msg(); diff --git a/crates/ark/tests/connections.rs b/crates/ark/tests/connections.rs index 87bc0e899..9267a9b5b 100644 --- a/crates/ark/tests/connections.rs +++ b/crates/ark/tests/connections.rs @@ -62,7 +62,7 @@ fn open_dummy_connection() -> (socket::comm::CommSocket, Receiver) .unwrap(); match msg { - CommEvent::Opened(socket, _value) => { + CommEvent::Opened(socket, _value, _) => { assert_eq!(socket.comm_name, "positron.connection"); assert_eq!(socket.comm_id, comm_id); (socket, iopub_rx) diff --git a/crates/ark/tests/data_explorer.rs b/crates/ark/tests/data_explorer.rs index 7c9c28067..69e761975 100644 --- a/crates/ark/tests/data_explorer.rs +++ b/crates/ark/tests/data_explorer.rs @@ -70,7 +70,6 @@ use amalthea::socket::iopub::IOPubMessage; use ark::comm_handler::CommHandler; use ark::comm_handler::CommHandlerContext; use ark::comm_handler::EnvironmentChanged; -use ark::console::Console; use ark::data_explorer::format::format_column; use ark::data_explorer::format::format_string; use ark::data_explorer::r_data_explorer::DataExplorerMode; @@ -176,7 +175,7 @@ impl TestSetup { let inner = &self.inner; r_task(|| { let TestInner(handler, ctx) = &mut *inner.lock().unwrap(); - handler.handle_msg(msg, ctx, Console::get()); + handler.handle_msg(msg, ctx); }); let iopub_msg = self.iopub_rx.recv_timeout(RECV_TIMEOUT).unwrap(); @@ -198,7 +197,6 @@ impl TestSetup { continuation_prompt: String::from("+ "), }, ctx, - Console::get(), ); ctx.is_closed() }); @@ -785,7 +783,7 @@ fn expect_column_profile_results( let inner = &setup.inner; r_task(|| { let TestInner(handler, ctx) = &mut *inner.lock().unwrap(); - handler.handle_msg(msg, ctx, Console::get()); + handler.handle_msg(msg, ctx); }); let msg = setup.iopub_rx.recv_comm_msg(); diff --git a/crates/ark/tests/data_explorer_priority.rs b/crates/ark/tests/data_explorer_integration.rs similarity index 70% rename from crates/ark/tests/data_explorer_priority.rs rename to crates/ark/tests/data_explorer_integration.rs index e48a2cdc9..a0a63237b 100644 --- a/crates/ark/tests/data_explorer_priority.rs +++ b/crates/ark/tests/data_explorer_integration.rs @@ -1,5 +1,5 @@ // -// data_explorer_priority.rs +// data_explorer_integration.rs // // Copyright (C) 2026 Posit Software, PBC. All rights reserved. // @@ -104,3 +104,47 @@ fn test_kernel_request_priority_over_idle_tasks() { are being starved by idle tasks" ); } + +/// The `OpenDataExplorer` RPC calls `comm_open_backend` from inside the +/// handler to open a child explorer. This inserts into the `comms` +/// HashMap while the parent comm has been taken out for dispatch. +/// Without the take pattern, this would panic on a reentrant +/// `borrow_mut()`. +#[test] +fn test_open_child_explorer_during_dispatch() { + let frontend = DummyArkFrontend::lock(); + + frontend.send_execute_request( + "test_df <- data.frame(x = 1:3, y = letters[1:3])", + ExecuteRequestOptions::default(), + ); + frontend.recv_iopub_busy(); + frontend.recv_iopub_execute_input(); + frontend.recv_iopub_idle(); + frontend.recv_shell_execute_reply(); + + let parent_comm_id = frontend.open_data_explorer("test_df"); + + // Send the OpenDataExplorer RPC to the parent explorer. + let request = DataExplorerBackendRequest::OpenDataExplorer; + let mut data = serde_json::to_value(&request).unwrap(); + data["id"] = serde_json::Value::String(String::from("open-rpc")); + + frontend.send_shell_comm_msg(parent_comm_id.clone(), data); + frontend.recv_iopub_busy(); + + // The handler calls `comm_open_backend`, which opens a new data + // explorer comm. Shell drains comm events during the handler, so + // the child's `comm_open` arrives before the RPC reply. + let child_open = frontend.recv_iopub_comm_open(); + assert_eq!(child_open.target_name, "positron.dataExplorer"); + assert_ne!(child_open.comm_id, parent_comm_id); + + // The RPC reply follows. + let reply_msg = frontend.recv_iopub_comm_msg(); + assert_eq!(reply_msg.comm_id, parent_comm_id); + let reply: DataExplorerBackendReply = serde_json::from_value(reply_msg.data).unwrap(); + assert_eq!(reply, DataExplorerBackendReply::OpenDataExplorerReply()); + + frontend.recv_iopub_idle(); +} diff --git a/crates/ark/tests/kernel-hooks-session.rs b/crates/ark/tests/kernel-hooks-session.rs index 89b73a191..98b2d7e0d 100644 --- a/crates/ark/tests/kernel-hooks-session.rs +++ b/crates/ark/tests/kernel-hooks-session.rs @@ -21,7 +21,7 @@ fn execute(frontend: &DummyArkFrontend, comm_id: &str, code: &str) { /// A "new" session fires session_init hooks with start_type = "new". /// -/// The hook calls `rstudioapi::navigateToFile()`, a fire-and-forget event. +/// The hook calls `.ps.ui.navigateToFile()`, a fire-and-forget event. /// We can't verify the frontend acts on it, but we can verify the /// `open_editor` message arrives on the UI comm. #[test] @@ -37,7 +37,7 @@ fn test_session_init_hook_new() { &frontend, &comm_id, &format!( - "setHook('positron.session_init', function(start_type) rstudioapi::navigateToFile('{path}'))" + "setHook('positron.session_init', function(start_type) .ps.ui.navigateToFile('{path}'))" ), ); diff --git a/crates/ark/tests/plots.rs b/crates/ark/tests/plots.rs index bd690a583..0494be2ca 100644 --- a/crates/ark/tests/plots.rs +++ b/crates/ark/tests/plots.rs @@ -372,22 +372,9 @@ fn test_plot_get_metadata() { // [1] "plot(1:10)" // Verify execution_id matches the msg_id of the execute_request - assert!( - result.contains(&msg_id), - "Metadata should contain execution_id '{msg_id}', got:\n{result}" - ); - - // Verify code matches - assert!( - result.contains(code), - "Metadata should contain code '{code}', got:\n{result}" - ); - - // Verify kind is "plot" for base R plots - assert!( - result.contains("$kind") && result.contains("\"plot\""), - "Metadata should contain kind 'plot', got:\n{result}" - ); + assert!(result.contains(&format!("$execution_id\n[1] \"{msg_id}\""))); + assert!(result.contains(&format!("$code\n[1] \"{code}\""))); + assert!(result.contains("$kind\n[1] \"plot\"")); } /// Test that plot metadata includes origin when code_location is provided. @@ -443,10 +430,7 @@ fn test_plot_get_metadata_with_origin() { frontend.recv_shell_execute_reply(); // Verify origin_uri is present in the metadata - assert!( - result.contains(origin_uri), - "Metadata should contain origin_uri '{origin_uri}', got:\n{result}" - ); + assert!(result.contains(&format!("$origin_uri\n[1] \"{origin_uri}\""))); } /// Test that plots are emitted when created inside source(). @@ -585,16 +569,8 @@ fn test_plot_source_context_stacking() { frontend.recv_shell_execute_reply(); // The origin_uri should point to file B, not file A - assert!( - result_b.contains(&file_b.uri_id), - "Plot from file B should have origin_uri pointing to file B '{}', got:\n{result_b}", - file_b.uri_id, - ); - assert!( - !result_b.contains(&file_a.uri_id), - "Plot from file B should NOT have origin_uri pointing to file A '{}', got:\n{result_b}", - file_a.uri_id, - ); + assert!(result_b.contains(&format!("$origin_uri\n[1] \"{}\"", file_b.uri_id))); + assert!(!result_b.contains(&file_a.uri_id)); // Query metadata for the second plot (created by file A) let query_a = format!(".ps.graphics.get_metadata('{display_id_a}')"); @@ -606,11 +582,7 @@ fn test_plot_source_context_stacking() { frontend.recv_shell_execute_reply(); // The origin_uri should point to file A - assert!( - result_a.contains(&file_a.uri_id), - "Plot from file A should have origin_uri pointing to file A '{}', got:\n{result_a}", - file_a.uri_id, - ); + assert!(result_a.contains(&format!("$origin_uri\n[1] \"{}\"", file_a.uri_id))); } /// Test that plots rendered with fig-width/fig-height metadata produce @@ -700,6 +672,86 @@ fn test_plot_default_size_without_metadata() { frontend.recv_shell_execute_reply(); } +/// Test that a plot created during a `frontend_ready` comm handler works. +/// +/// Previously this deadlocked because Shell blocked on the comm_msg while +/// the R thread blocked on the barrier in `CommEvent::Opened`. Now Shell +/// drains comm events while waiting for the handler to complete. +/// +/// The UI comm remains visible during its own dispatch (via nested +/// `RefCell` on the handler), so the plot goes through the Positron +/// `comm_open` path as usual. +#[test] +fn test_plot_during_frontend_ready() { + let frontend = DummyArkFrontend::lock(); + let comm_id = frontend.open_ui_comm(); + + // Register a session_init hook that creates a plot. + frontend.send_execute_request( + "setHook('positron.session_init', function(start_type) plot(1:10))", + ExecuteRequestOptions::default(), + ); + frontend.recv_iopub_busy(); + frontend.recv_iopub_execute_input(); + frontend.recv_iopub_ui_prompt_state(); + frontend.recv_iopub_idle(); + frontend.recv_shell_execute_reply(); + + // Trigger the hook via frontend_ready (an event, not an RPC — no `id` field). + let data = serde_json::json!({ + "method": "frontend_ready", + "params": { "start_type": "new" }, + }); + frontend.send_shell_comm_msg(String::from(&comm_id), data); + frontend.recv_iopub_busy(); + + let open = frontend.recv_iopub_comm_open(); + assert_eq!(open.target_name, "positron.plot"); + + frontend.recv_iopub_idle(); +} + +/// Test that a plot created without an active execution context (e.g. from +/// a task callback that fires between execute requests) has empty metadata. +/// +/// This exercises the `capture_execution_context` fallback path where no +/// context was pushed via `graphics_on_execute_request`. +#[test] +fn test_plot_without_execution_context_has_empty_metadata() { + let frontend = DummyArkFrontend::lock(); + + // Spawn an idle task that creates a plot. Idle tasks run during the + // event loop between execute requests, so no execution context is + // active when the plot is produced. + frontend.send_execute_request( + r#"invisible(.Call("ps_test_spawn_eval_idle_task", "plot(1:10)"))"#, + ExecuteRequestOptions::default(), + ); + frontend.recv_iopub_busy(); + frontend.recv_iopub_execute_input(); + frontend.recv_iopub_idle(); + frontend.recv_shell_execute_reply(); + + // After the request completes, R re-enters the idle loop. The test + // thread is blocked here (not sending new requests), so `select` + // picks up the idle task and the display_data arrives promptly. + let display_id = frontend.recv_iopub_display_data_id(); + + // Query metadata using the display_id from the plot + let query_code = format!(".ps.graphics.get_metadata('{display_id}')"); + frontend.send_execute_request(&query_code, ExecuteRequestOptions::default()); + frontend.recv_iopub_busy(); + frontend.recv_iopub_execute_input(); + let result = frontend.recv_iopub_execute_result(); + frontend.recv_iopub_idle(); + frontend.recv_shell_execute_reply(); + + // execution_id and code should be empty since the plot was created + // outside of any execute request + assert!(result.contains("$execution_id\n[1] \"\"")); + assert!(result.contains("$code\n[1] \"\"")); +} + /// Test that `dev.hold()` suppresses intermediate plot output. /// /// Without hold, each `plot()` call emits a separate `display_data`. diff --git a/crates/ark_test/src/dummy_frontend.rs b/crates/ark_test/src/dummy_frontend.rs index 884294ddf..0049a8ab7 100644 --- a/crates/ark_test/src/dummy_frontend.rs +++ b/crates/ark_test/src/dummy_frontend.rs @@ -720,6 +720,32 @@ impl DummyArkFrontend { } } + /// Receive from IOPub and assert DisplayData message, returning the + /// display_id from the transient field. + /// Automatically skips any Stream messages. + #[track_caller] + pub fn recv_iopub_display_data_id(&self) -> String { + let msg = self.recv_iopub_next(); + match msg { + Message::DisplayData(data) => data.content.transient["display_id"] + .as_str() + .expect("display_id should be a string") + .to_string(), + other => panic!("Expected DisplayData, got {:?}", other), + } + } + + /// Receive from IOPub and assert UpdateDisplayData message. + /// Automatically skips any Stream messages. + #[track_caller] + pub fn recv_iopub_update_display_data(&self) { + let msg = self.recv_iopub_next(); + match msg { + Message::UpdateDisplayData(_) => {}, + other => panic!("Expected UpdateDisplayData, got {:?}", other), + } + } + /// Receive from IOPub and assert CommMsg message. /// Automatically skips any Stream messages. #[track_caller] diff --git a/crates/echo/src/shell.rs b/crates/echo/src/shell.rs index 6fe3f0197..921a38d8d 100644 --- a/crates/echo/src/shell.rs +++ b/crates/echo/src/shell.rs @@ -153,14 +153,14 @@ impl ShellHandler for Shell { }) } - async fn handle_comm_open( - &self, + fn handle_comm_open( + &mut self, _target: Comm, _comm: CommSocket, _data: serde_json::Value, - ) -> amalthea::Result { + ) -> amalthea::Result<(bool, Option>)> { // No comms in this toy implementation. - Ok(false) + Ok((false, None)) } } diff --git a/crates/stdext/src/cell.rs b/crates/stdext/src/cell.rs new file mode 100644 index 000000000..0ef0a8aa8 --- /dev/null +++ b/crates/stdext/src/cell.rs @@ -0,0 +1,302 @@ +// +// cell.rs +// +// Copyright (C) 2026 Posit Software, PBC. All rights reserved. +// +// + +//! A `RefCell` wrapper that only enforces borrow rules in debug builds. +//! +//! In debug/test builds, `DebugRefCell` delegates to `RefCell` and panics +//! on borrow violations. In release builds, it tracks borrows via a +//! lightweight counter and logs violations with `log::error!` but does +//! not panic. Callers must still uphold `RefCell`-style aliasing rules; +//! violating them in release builds is undefined behaviour (the same UB +//! that raw `UnsafeCell` access would produce). + +#[cfg(not(debug_assertions))] +use std::cell::Cell; +#[cfg(not(debug_assertions))] +use std::cell::UnsafeCell; +use std::fmt; +use std::ops::Deref; +use std::ops::DerefMut; + +pub struct DebugRefCell { + #[cfg(debug_assertions)] + inner: std::cell::RefCell, + + #[cfg(not(debug_assertions))] + inner: UnsafeCell, + /// Borrow state: 0 = unused, positive = number of shared borrows, + /// -1 = exclusive borrow. + #[cfg(not(debug_assertions))] + borrow_count: Cell, +} + +// --- Construction & owned access (no guards needed) ------------------------- + +impl DebugRefCell { + pub fn new(value: T) -> Self { + Self { + #[cfg(debug_assertions)] + inner: std::cell::RefCell::new(value), + #[cfg(not(debug_assertions))] + inner: UnsafeCell::new(value), + #[cfg(not(debug_assertions))] + borrow_count: Cell::new(0), + } + } + + pub fn into_inner(self) -> T { + self.inner.into_inner() + } +} + +impl DebugRefCell { + /// Exclusive access when you already have `&mut self`. + /// No runtime check needed in either mode. + pub fn get_mut(&mut self) -> &mut T { + self.inner.get_mut() + } +} + +// --- Shared borrows --------------------------------------------------------- + +impl DebugRefCell { + #[track_caller] + pub fn borrow(&self) -> DebugRef<'_, T> { + #[cfg(debug_assertions)] + { + DebugRef { + inner: self.inner.borrow(), + } + } + #[cfg(not(debug_assertions))] + { + let count = self.borrow_count.get(); + let tracked = if count < 0 { + log::error!( + "INTERNAL ERROR (DebugRefCell): immutable borrow while mutably borrowed (at {})", + std::panic::Location::caller(), + ); + false + } else { + self.borrow_count.set(count + 1); + true + }; + DebugRef { + // SAFETY: Sound only when no `DebugRefMut` is alive for this + // cell. On violation we log but still hand out the reference, + // accepting UB to avoid panicking in production. + value: unsafe { &*self.inner.get() }, + borrow_count: &self.borrow_count, + tracked, + } + } + } + + #[track_caller] + pub fn borrow_mut(&self) -> DebugRefMut<'_, T> { + #[cfg(debug_assertions)] + { + DebugRefMut { + inner: self.inner.borrow_mut(), + } + } + #[cfg(not(debug_assertions))] + { + let count = self.borrow_count.get(); + let tracked = if count != 0 { + let kind = if count > 0 { "immutably" } else { "mutably" }; + log::error!( + "INTERNAL ERROR (DebugRefCell): mutable borrow while already borrowed {kind} (at {})", + std::panic::Location::caller(), + ); + false + } else { + self.borrow_count.set(-1); + true + }; + DebugRefMut { + // SAFETY: Sound only when no other borrow (shared or + // exclusive) is alive for this cell. On violation we log + // but still hand out the reference, accepting UB to avoid + // panicking in production. + value: unsafe { &mut *self.inner.get() }, + borrow_count: &self.borrow_count, + tracked, + } + } + } +} + +// --- Debug ------------------------------------------------------------------ + +impl fmt::Debug for DebugRefCell { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let borrow = self.borrow(); + f.debug_struct("DebugRefCell") + .field("value", &*borrow) + .finish() + } +} + +// --- DebugRef (shared guard) ------------------------------------------------ + +pub struct DebugRef<'a, T: ?Sized> { + #[cfg(debug_assertions)] + inner: std::cell::Ref<'a, T>, + + #[cfg(not(debug_assertions))] + value: &'a T, + #[cfg(not(debug_assertions))] + borrow_count: &'a Cell, + #[cfg(not(debug_assertions))] + tracked: bool, +} + +impl Deref for DebugRef<'_, T> { + type Target = T; + + fn deref(&self) -> &T { + #[cfg(debug_assertions)] + { + &self.inner + } + #[cfg(not(debug_assertions))] + { + self.value + } + } +} + +#[cfg(not(debug_assertions))] +impl Drop for DebugRef<'_, T> { + fn drop(&mut self) { + if self.tracked { + let count = self.borrow_count.get(); + self.borrow_count.set(count - 1); + } + } +} + +impl fmt::Debug for DebugRef<'_, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + (**self).fmt(f) + } +} + +// --- DebugRefMut (exclusive guard) ------------------------------------------ + +pub struct DebugRefMut<'a, T: ?Sized> { + #[cfg(debug_assertions)] + inner: std::cell::RefMut<'a, T>, + + #[cfg(not(debug_assertions))] + value: &'a mut T, + #[cfg(not(debug_assertions))] + borrow_count: &'a Cell, + #[cfg(not(debug_assertions))] + tracked: bool, +} + +impl Deref for DebugRefMut<'_, T> { + type Target = T; + + fn deref(&self) -> &T { + #[cfg(debug_assertions)] + { + &self.inner + } + #[cfg(not(debug_assertions))] + { + self.value + } + } +} + +impl DerefMut for DebugRefMut<'_, T> { + fn deref_mut(&mut self) -> &mut T { + #[cfg(debug_assertions)] + { + &mut self.inner + } + #[cfg(not(debug_assertions))] + { + self.value + } + } +} + +#[cfg(not(debug_assertions))] +impl Drop for DebugRefMut<'_, T> { + fn drop(&mut self) { + if self.tracked { + self.borrow_count.set(0); + } + } +} + +impl fmt::Debug for DebugRefMut<'_, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + (**self).fmt(f) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_borrow_and_deref() { + let cell = DebugRefCell::new(42); + let r = cell.borrow(); + assert_eq!(*r, 42); + } + + #[test] + fn test_borrow_mut_and_deref() { + let cell = DebugRefCell::new(vec![1, 2, 3]); + cell.borrow_mut().push(4); + assert_eq!(*cell.borrow(), vec![1, 2, 3, 4]); + } + + #[test] + fn test_multiple_shared_borrows() { + let cell = DebugRefCell::new(String::from("hello")); + let r1 = cell.borrow(); + let r2 = cell.borrow(); + assert_eq!(*r1, *r2); + } + + #[test] + fn test_into_inner() { + let cell = DebugRefCell::new(99); + assert_eq!(cell.into_inner(), 99); + } + + #[test] + fn test_get_mut() { + let mut cell = DebugRefCell::new(10); + *cell.get_mut() = 20; + assert_eq!(cell.into_inner(), 20); + } + + #[test] + fn test_option_pattern() { + let cell = DebugRefCell::new(Some(String::from("value"))); + let guard = cell.borrow(); + assert!(guard.is_some()); + assert_eq!(guard.as_ref().unwrap(), "value"); + } + + #[test] + #[cfg(debug_assertions)] + #[should_panic] + fn test_conflicting_borrows_panics_in_debug() { + let cell = DebugRefCell::new(0); + let _r = cell.borrow(); + let _w = cell.borrow_mut(); + } +} diff --git a/crates/stdext/src/lib.rs b/crates/stdext/src/lib.rs index 959f7fc34..a46598603 100644 --- a/crates/stdext/src/lib.rs +++ b/crates/stdext/src/lib.rs @@ -8,6 +8,7 @@ pub mod all; pub mod any; pub mod case; +pub mod cell; pub mod event; pub mod join; pub mod local; @@ -18,6 +19,7 @@ pub mod spawn; pub mod testing; pub mod unwrap; +pub use crate::cell::DebugRefCell; pub use crate::join::Joined; pub use crate::ok::Ok; pub use crate::push::Push;