Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
0b02cb7
Pass Console refs explicitly to device context methods
lionel- Apr 16, 2026
9873dfa
Store comms in a RefCell
lionel- Apr 16, 2026
deef0d6
Don't pass whole `Console` to handlers after all
lionel- Apr 17, 2026
52d965b
Pass `session_mode` at construction time
lionel- Apr 17, 2026
122f9af
Remove dependency on Console for fallback execution context
lionel- Apr 17, 2026
fa00a32
Take `&self` in `comm_open_frontend()`
lionel- Apr 17, 2026
b36553c
Take `&self` in `comm_handle_msg()`
lionel- Apr 17, 2026
107fb4b
Use safer `with_` pattern
lionel- Apr 17, 2026
acbef24
Drain events during `CommMsg` and `CommClose`
lionel- Apr 17, 2026
3aa5924
Remove `CommEvent::Barrier`
lionel- Apr 17, 2026
5a81b28
Drain events during `CommOpen` too
lionel- Apr 17, 2026
11b7fc1
Reuse event drainer in execute request handler
lionel- Apr 17, 2026
7c7470b
Extract `handle_comm_notification()`
lionel- Apr 17, 2026
36975cd
Fix semantic conflicts after rebase
lionel- Apr 17, 2026
dff9df6
Fix UI comm reentrancy when sending events from R
lionel- Apr 17, 2026
7045131
Don't panic on borrow errors in release builds
lionel- Apr 17, 2026
16c7857
Fix potential deadlock in event draining
lionel- Apr 22, 2026
a726ae6
Drop refcell borrows before calling into R
lionel- Apr 22, 2026
9fb7b2e
Add `ark::register` macro to pass `&Console` and catch panics
lionel- Apr 22, 2026
a2f22e5
Fix unwraps
lionel- Apr 22, 2026
a3d077e
Fix a few style issues
lionel- Apr 22, 2026
a80ab84
Actually test for empty execution context
lionel- Apr 22, 2026
1025ccd
Add buffering receivers for display data
lionel- Apr 22, 2026
6030131
Let panics propagate in debug builds
lionel- Apr 24, 2026
fb41ba3
Log caught panics in release builds
lionel- Apr 24, 2026
9dfef7f
Fix `Console::with` panic catching by preventing abort in panic hook
lionel- Apr 24, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ resolver = "2"
members = [
"crates/amalthea",
"crates/ark",
"crates/ark/ark_macros",
"crates/echo",
"crates/harp",
"crates/libr",
Expand All @@ -38,6 +39,7 @@ aether_syntax = { git = "https://github.com/posit-dev/air", package = "air_r_syn
amalthea = { path = "crates/amalthea" }
anyhow = "1.0.100"
ark = { path = "crates/ark" }
ark_macros = { path = "crates/ark/ark_macros" }
ark_test = { path = "crates/ark_test" }
assert_matches = "1.5.0"
async-trait = "0.1.66"
Expand Down
14 changes: 6 additions & 8 deletions crates/amalthea/src/comm/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,17 @@ use crate::socket::comm::CommSocket;

/// Comm events sent to the frontend via Shell.
pub enum CommEvent {
/// A new Comm was opened
Opened(CommSocket, Value),
/// A new Comm was opened. The optional `Sender` is a synchronisation
/// barrier: if provided, Shell signals it after processing the open
/// (sending `comm_open` on IOPub). The caller blocks on the paired
/// receiver to guarantee that the `comm_open` message has been sent
/// before any subsequent messages.
Opened(CommSocket, Value, Option<Sender<()>>),

/// A message was received on a Comm; the first value is the comm ID, and the
/// second value is the message.
Message(String, CommMsg),

/// A Comm was closed
Closed(String),

/// Synchronisation barrier. Shell signals the sender after processing all
/// preceding events. The caller blocks on the paired receiver to guarantee
/// that earlier events (e.g. `Opened`) have been fully handled before
/// continuing.
Barrier(Sender<()>),
}
55 changes: 30 additions & 25 deletions crates/amalthea/src/language/shell_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ use crate::wire::kernel_info_request::KernelInfoRequest;
use crate::wire::originator::Originator;

/// Result of a `handle_comm_msg` or `handle_comm_close` call on the
/// `ShellHandler`. `Handled` means the kernel processed the message
/// synchronously (blocking Shell until done). `NotHandled` means amalthea
/// should fall back to the historical `incoming_tx` path. This fallback is
/// temporary until all comms are migrated to the blocking path.
/// `ShellHandler`. `Handled` means the kernel dispatched the message
/// (possibly asynchronously via a completion receiver). `NotHandled` means
/// amalthea should fall back to the historical `incoming_tx` path. This
/// fallback is temporary until all comms are migrated to the new path.
pub enum CommHandled {
Handled,
NotHandled,
Expand Down Expand Up @@ -81,26 +81,30 @@ pub trait ShellHandler: Send {
/// Docs: https://jupyter-client.readthedocs.io/en/stable/messaging.html#history
async fn handle_history_request(&self, req: &HistoryRequest) -> crate::Result<HistoryReply>;

/// Handles a request to open a comm.
/// Handle a request to open a comm.
///
/// https://jupyter-client.readthedocs.io/en/stable/messaging.html#opening-a-comm
/// Returns `(true, Some(receiver))` if the comm was opened and the handler
/// was dispatched asynchronously. Shell will select-loop on the receiver
/// and `comm_event_rx` to drain comm events while the handler runs.
///
/// Returns true if the handler handled the request (and opened the comm), false if it did not.
///
/// * `target` - The target name of the comm, such as `positron.variables`
/// * `comm` - The comm channel to use to communicate with the frontend
/// * `data` - The `data` payload from the `comm_open` message
async fn handle_comm_open(
&self,
/// Returns `(true, None)` if the comm was opened synchronously.
/// Returns `(false, None)` if the comm was not handled.
fn handle_comm_open(
&mut self,
target: Comm,
comm: CommSocket,
data: serde_json::Value,
) -> crate::Result<bool>;
) -> crate::Result<(bool, Option<Receiver<()>>)>;

/// Handle an incoming comm message (RPC or data). Return
/// `CommHandled::Handled` if the message was processed, or
/// `CommHandled::NotHandled` to fall back to the existing
/// `incoming_tx` path.
/// Handle an incoming comm message (RPC or data).
///
/// Returns `(CommHandled::Handled, Some(receiver))` if the message was
/// dispatched to the R thread. Shell will select-loop on the receiver
/// and `comm_event_rx` to drain comm events (e.g. barriers from
/// `comm_open_backend`) while the handler runs.
///
/// Returns `(CommHandled::NotHandled, None)` to fall back to the
/// existing `incoming_tx` path.
///
/// * `comm_id` - The comm's unique identifier
/// * `comm_name` - The comm's target name (e.g. `"positron.dataExplorer"`)
Expand All @@ -113,21 +117,22 @@ pub trait ShellHandler: Send {
_comm_name: &str,
_msg: CommMsg,
_originator: Originator,
) -> crate::Result<CommHandled> {
Ok(CommHandled::NotHandled)
) -> crate::Result<(CommHandled, Option<Receiver<()>>)> {
Ok((CommHandled::NotHandled, None))
}

/// Handle a comm close. Return `CommHandled::Handled` if the close
/// was processed, or `CommHandled::NotHandled` to fall back to the
/// existing `incoming_tx` path.
/// Handle a comm close.
///
/// Same pattern as `handle_comm_msg`: returns a completion receiver
/// so Shell can drain comm events while the handler runs.
///
/// * `comm_id` - The comm's unique identifier
/// * `comm_name` - The comm's target name
fn handle_comm_close(
&mut self,
_comm_id: &str,
_comm_name: &str,
) -> crate::Result<CommHandled> {
Ok(CommHandled::NotHandled)
) -> crate::Result<(CommHandled, Option<Receiver<()>>)> {
Ok((CommHandled::NotHandled, None))
}
}
Loading
Loading