From ed80c73b5c1cf0581cd54fbc2e2e7ae70f6eede1 Mon Sep 17 00:00:00 2001 From: Lionel Henry Date: Sun, 19 Apr 2026 09:25:31 +0200 Subject: [PATCH 1/7] Abstract the DAP server --- crates/ark/src/dap/dap_server.rs | 1091 ++++++++++++++++-------------- 1 file changed, 586 insertions(+), 505 deletions(-) diff --git a/crates/ark/src/dap/dap_server.rs b/crates/ark/src/dap/dap_server.rs index ede5f58bf..6a4eb421e 100644 --- a/crates/ark/src/dap/dap_server.rs +++ b/crates/ark/src/dap/dap_server.rs @@ -58,264 +58,60 @@ const THREAD_ID: i64 = -1; /// evaluations will run in that frame's environment. const SELECTED_FRAME_EXPRESSION: &str = ".positron_selected_frame"; -// TODO: Handle comm close to shut down the DAP server thread. -// -// The DAP comm is allowed to persist across TCP sessions. This supports session -// switching on the frontend. Ideally the frontend would be allowed to close the -// DAP comm in addition to the DAP TCP connection, which would shut down the DAP -// server. To achive this, the DAP server, once disconnected should wait for both -// the connection becoming ready and a channel event signalling comm close. If -// the latter fires, shut the server down. - -pub fn start_dap( - state: Arc>, - server_start: ServerStartMessage, - server_started_tx: Sender, - r_request_tx: Sender, - comm_tx: CommOutgoingTx, -) { - let ip_address = server_start.ip_address(); - - // Binding to port `0` to allow the OS to allocate a port for us to bind to - let listener = TcpListener::bind(format!("{ip_address}:0",)).unwrap(); - - let address = match listener.local_addr() { - Ok(address) => address, - Err(error) => { - log::error!("DAP: Failed to bind to {ip_address}:0: {error}"); - return; - }, - }; - - // Get the OS allocated port - let port = address.port(); - - log::trace!("DAP: Thread starting at address {ip_address}:{port}."); - - // Send the port back to `Shell` and eventually out to the frontend so it can connect - server_started_tx - .send(ServerStartedMessage::new(port)) - .log_err(); - - loop { - log::trace!("DAP: Waiting for client"); - - let stream = match listener.accept() { - Ok((stream, addr)) => { - log::info!("DAP: Connected to client {addr:?}"); - - let mut state = state.lock().unwrap(); - state.is_connected = true; - - stream - }, - Err(e) => { - log::error!("DAP: Can't get client: {e:?}"); - continue; - }, - }; - - let reader = BufReader::new(&stream); - let writer = BufWriter::new(&stream); - let (responses_tx, responses_rx) = unbounded::(); - - let mut server = DapServer::new( - reader, - writer, - state.clone(), - r_request_tx.clone(), - comm_tx.clone(), - responses_tx, - ); - - let (backend_events_tx, backend_events_rx) = unbounded::(); - let (done_tx, done_rx) = bounded::(0); - let output_clone = server.output.clone(); - - // We need a scope to let the borrow checker know that - // `output_clone` drops before the next iteration (it gets tangled - // to the stack variable `stream` through `server`) - let _ = crossbeam::thread::scope(|scope| { - spawn!(scope, "ark-dap-events", { - move |_| listen_dap_events(output_clone, backend_events_rx, responses_rx, done_rx) - }); - - // Connect the backend to the events thread - state.lock().unwrap().backend_events_tx = Some(backend_events_tx); - - loop { - // If disconnected, break and accept a new connection to create a new server - if !server.serve() { - log::trace!("DAP: Disconnected from client"); - let mut state = state.lock().unwrap(); - state.is_connected = false; - break; - } - } - - // Terminate the events thread - let _ = done_tx.send(true); - }); - } +/// Side effect requested by a DAP handler that the transport layer must +/// deliver in a transport-specific way. +pub enum DapSideEffect { + /// Send a debug step/quit command to R. + DebugCommand(DebugRequest), + /// Interrupt R for a pause. + Interrupt, + /// Request a session restart. + Restart, } -// Thread that listens for events sent by the backend, usually the -// `ReadConsole()` method. These are forwarded to the DAP client. -fn listen_dap_events( - output: Arc>>, - backend_events_rx: Receiver, - responses_rx: Receiver, - done_rx: Receiver, -) { - loop { - select!( - recv(backend_events_rx) -> event => { - let event = match event { - Ok(event) => event, - Err(err) => { - // Channel closed, sender dropped - log::info!("DAP: Event channel closed: {err:?}"); - return; - }, - }; - - log::trace!("DAP: Got event from backend: {:?}", event); - - let event = match event { - DapBackendEvent::Continued => { - Event::Continued(ContinuedEventBody { - thread_id: THREAD_ID, - all_threads_continued: Some(true) - }) - }, - - DapBackendEvent::Stopped => { - Event::Stopped(StoppedEventBody { - reason: StoppedEventReason::Step, - description: None, - thread_id: Some(THREAD_ID), - preserve_focus_hint: Some(false), - text: None, - all_threads_stopped: Some(true), - hit_breakpoint_ids: None, - }) - }, - - DapBackendEvent::Exception(DapExceptionEvent { class, message }) => { - let text = format!("<{class}>\n{message}"); - Event::Stopped(StoppedEventBody { - reason: StoppedEventReason::Exception, - description: Some(message), - thread_id: Some(THREAD_ID), - preserve_focus_hint: Some(false), - text: Some(text), - all_threads_stopped: Some(true), - hit_breakpoint_ids: None, - }) - }, - - DapBackendEvent::Invalidated => { - Event::Invalidated(InvalidatedEventBody { - areas: Some(vec![types::InvalidatedAreas::Variables]), - thread_id: Some(THREAD_ID), - stack_frame_id: None, - }) - }, - - DapBackendEvent::Terminated => { - Event::Terminated(None) - }, - - DapBackendEvent::BreakpointState { id, line, verified, message } => { - Event::Breakpoint(BreakpointEventBody { - reason: BreakpointEventReason::Changed, - breakpoint: dap::types::Breakpoint { - id: Some(id), - line: Some(Breakpoint::to_dap_line(line)), - verified, - message, - ..Default::default() - }, - }) - }, - }; - - let mut output = output.lock().unwrap(); - if let Err(err) = output.send_event(event) { - log::warn!("DAP: Failed to send event, closing: {err:?}"); - return; - } - }, - - recv(responses_rx) -> response => { - let response = match response { - Ok(response) => response, - Err(err) => { - log::info!("DAP: Responses channel closed: {err:?}"); - return; - }, - }; - - log::trace!("DAP: Sending async response: {:?}", response); - - let mut output = output.lock().unwrap(); - if let Err(err) = output.send(Sendable::Response(response)) { - log::warn!("DAP: Failed to send response, closing: {err:?}"); - return; - } - }, +/// The result of handling a single DAP request. The transport layer +/// (TCP or Jupyter) is responsible for delivering the response, events, +/// and side effects through the appropriate channel. +pub struct DapOutput { + pub response: Response, + pub events: Vec, + pub side_effects: Vec, +} - // Break the loop and terminate the thread - recv(done_rx) -> _ => { return; }, - ) +impl DapOutput { + pub fn response(response: Response) -> Self { + Self { + response, + events: vec![], + side_effects: vec![], + } } } -pub struct DapServer { - server: Server, - pub output: Arc>>, - state: Arc>, - r_request_tx: Sender, - comm_tx: Option, - responses_tx: Sender, +/// Transport-agnostic handler for DAP requests. Translates each request +/// into a [`DapOutput`] containing the protocol response, any DAP events, +/// and console events that the transport layer is responsible for delivering. +pub struct DapHandler { + pub(crate) state: Arc>, + pub(crate) r_request_tx: Sender, } -impl DapServer { - pub fn new( - reader: BufReader, - writer: BufWriter, - state: Arc>, - r_request_tx: Sender, - comm_tx: CommOutgoingTx, - responses_tx: Sender, - ) -> Self { - let server = Server::new(reader, writer); - let output = server.output.clone(); +impl DapHandler { + pub fn new(state: Arc>, r_request_tx: Sender) -> Self { Self { - server, - output, state, r_request_tx, - comm_tx: Some(comm_tx), - responses_tx, } } - pub fn serve(&mut self) -> bool { - log::trace!("DAP: Polling"); - let req = match self.server.poll_request() { - Ok(Some(req)) => req, - Ok(None) => return false, - Err(err) => { - log::warn!("DAP: Connection closed: {err:?}"); - return false; - }, - }; - log::trace!("DAP: Got request: {:#?}", req); - + /// Dispatch a parsed DAP request to the appropriate handler. + /// + /// `Evaluate` is intentionally not handled here because it requires + /// transport-specific async response delivery. + pub fn dispatch(&self, req: Request) -> DapOutput { let cmd = req.command.clone(); - let result = match cmd { + match cmd { Command::Initialize(args) => self.handle_initialize(req, args), Command::Attach(args) => self.handle_attach(req, args), Command::Disconnect(args) => self.handle_disconnect(req, args), @@ -329,7 +125,6 @@ impl DapServer { Command::Source(args) => self.handle_source(req, args), Command::Scopes(args) => self.handle_scopes(req, args), Command::Variables(args) => self.handle_variables(req, args), - Command::Evaluate(args) => self.handle_evaluate(req, args), Command::Continue(args) => { let resp = ResponseBody::Continue(ContinueResponse { all_threads_continued: Some(true), @@ -348,34 +143,12 @@ impl DapServer { Command::Pause(args) => self.handle_pause(req, args), _ => { log::warn!("DAP: Unknown request"); - let rsp = req.error("Ark DAP: Unknown request"); - self.respond(rsp) + DapOutput::response(req.error("Ark DAP: Unknown request")) }, - }; - - if let Err(err) = result { - log::warn!("DAP: Handler failed, closing connection: {err:?}"); - return false; } - - true - } - - fn respond(&mut self, rsp: Response) -> Result<(), ServerError> { - log::trace!("DAP: Responding to request: {rsp:#?}"); - self.server.respond(rsp) } - fn send_event(&mut self, event: Event) -> Result<(), ServerError> { - log::trace!("DAP: Sending event: {event:#?}"); - self.server.send_event(event) - } - - fn handle_initialize( - &mut self, - req: Request, - _args: InitializeArguments, - ) -> Result<(), ServerError> { + fn handle_initialize(&self, req: Request, _args: InitializeArguments) -> DapOutput { let rsp = req.success(ResponseBody::Initialize(types::Capabilities { supports_restart_request: Some(true), supports_exception_info_request: Some(false), @@ -411,8 +184,11 @@ impl DapServer { supports_log_points: Some(true), ..Default::default() })); - self.respond(rsp)?; - self.send_event(Event::Initialized) + DapOutput { + response: rsp, + events: vec![Event::Initialized], + side_effects: vec![], + } } // Handle SetBreakpoints requests from the frontend. @@ -430,15 +206,11 @@ impl DapServer { // - When a user unchecks a breakpoint, it appears as a deletion (omitted // from the request). We preserve verified breakpoints as Disabled so we // can restore their state when re-enabled without requiring re-sourcing. - fn handle_set_breakpoints( - &mut self, - req: Request, - args: SetBreakpointsArguments, - ) -> Result<(), ServerError> { + fn handle_set_breakpoints(&self, req: Request, args: SetBreakpointsArguments) -> DapOutput { let Some(path) = args.source.path.as_ref() else { // We don't currently have virtual documents managed via source references log::warn!("Missing a path to set breakpoints for."); - return self.respond(req.error("Missing a path to set breakpoints for")); + return DapOutput::response(req.error("Missing a path to set breakpoints for")); }; // We currently only support "path" URIs as Positron never sends URIs. @@ -451,7 +223,7 @@ impl DapServer { let rsp = req.success(ResponseBody::SetBreakpoints(SetBreakpointsResponse { breakpoints: vec![], })); - return self.respond(rsp); + return DapOutput::response(rsp); }, }; @@ -480,7 +252,7 @@ impl DapServer { let rsp = req.success(ResponseBody::SetBreakpoints(SetBreakpointsResponse { breakpoints, })); - return self.respond(rsp); + return DapOutput::response(rsp); }, }; @@ -623,194 +395,567 @@ impl DapServer { message, ..Default::default() } - }) - .collect(); + }) + .collect(); + + state.breakpoints.insert(uri, (doc_hash, new_breakpoints)); + + drop(state); + + let rsp = req.success(ResponseBody::SetBreakpoints(SetBreakpointsResponse { + breakpoints: response_breakpoints, + })); + + DapOutput::response(rsp) + } + + fn handle_attach(&self, req: Request, _args: AttachRequestArguments) -> DapOutput { + let rsp = req.success(ResponseBody::Attach); + DapOutput { + response: rsp, + events: vec![Event::Thread(ThreadEventBody { + reason: ThreadEventReason::Started, + thread_id: THREAD_ID, + })], + side_effects: vec![], + } + } + + fn handle_disconnect(&self, req: Request, _args: DisconnectArguments) -> DapOutput { + // Only send `Q` if currently in a debugging session. + let is_debugging = { self.state.lock().unwrap().is_debugging }; + let side_effects = if is_debugging { + vec![DapSideEffect::DebugCommand(DebugRequest::Quit)] + } else { + vec![] + }; + + DapOutput { + response: req.success(ResponseBody::Disconnect), + events: vec![], + side_effects, + } + } + + fn handle_restart(&self, req: Request, _args: T) -> DapOutput { + DapOutput { + response: req.success(ResponseBody::Restart), + events: vec![], + side_effects: vec![DapSideEffect::Restart], + } + } + + // All servers must respond to `Threads` requests, possibly with + // a dummy thread as is the case here + fn handle_threads(&self, req: Request) -> DapOutput { + let rsp = req.success(ResponseBody::Threads(ThreadsResponse { + threads: vec![Thread { + id: THREAD_ID, + name: String::from("R console"), + }], + })); + DapOutput::response(rsp) + } + + fn handle_set_exception_breakpoints( + &self, + req: Request, + args: SetExceptionBreakpointsArguments, + ) -> DapOutput { + { + let mut state = self.state.lock().unwrap(); + state.exception_breakpoint_filters = args.filters; + } + let rsp = req.success(ResponseBody::SetExceptionBreakpoints( + SetExceptionBreakpointsResponse { + // This field is only useful for reporting problems with + // individual filters. Since we always accept all filters, + // `None` is fine here. + breakpoints: None, + }, + )); + DapOutput::response(rsp) + } + + fn handle_stacktrace(&self, req: Request, args: StackTraceArguments) -> DapOutput { + let stack = { + let state = self.state.lock().unwrap(); + let fallback_sources = &state.fallback_sources; + match &state.stack { + Some(stack) => stack + .iter() + .map(|frame| into_dap_frame(frame, fallback_sources)) + .collect(), + _ => vec![], + } + }; + + // Slice the stack as requested + let n_usize = stack.len(); + + let start_frame = args.start_frame.unwrap_or(0); + let Ok(start) = usize::try_from(start_frame) else { + let rsp = req.error(&format!("Invalid start_frame: {start_frame}")); + return DapOutput::response(rsp); + }; + let start = std::cmp::min(start, n_usize); + + let end = if let Some(levels) = args.levels { + let Ok(levels) = usize::try_from(levels) else { + let rsp = req.error(&format!("Invalid levels: {levels}")); + return DapOutput::response(rsp); + }; + std::cmp::min(start.saturating_add(levels), n_usize) + } else { + n_usize + }; + + let Ok(total_frames) = i64::try_from(n_usize) else { + let rsp = req.error(&format!("Stack frame count overflows i64: {n_usize}")); + return DapOutput::response(rsp); + }; + let stack = stack[start..end].to_vec(); + + let rsp = req.success(ResponseBody::StackTrace(StackTraceResponse { + stack_frames: stack, + total_frames: Some(total_frames), + })); + + DapOutput::response(rsp) + } + + fn handle_source(&self, req: Request, _args: SourceArguments) -> DapOutput { + let message = "Unsupported `source` request: {req:?}"; + log::error!("{message}"); + DapOutput::response(req.error(message)) + } + + fn handle_scopes(&self, req: Request, args: ScopesArguments) -> DapOutput { + let state = self.state.lock().unwrap(); + let frame_id_to_variables_reference = &state.frame_id_to_variables_reference; + + // Entirely possible that the requested `frame_id` doesn't have any + // variables (like the top most frame where the call was made). We send + // back `0` in those cases, which is an indication of "no variables". + let variables_reference = frame_id_to_variables_reference + .get(&args.frame_id) + .copied() + .unwrap_or(0); + + // Only 1 overarching scope for now + let scopes = vec![Scope { + name: String::from("Locals"), + presentation_hint: Some(ScopePresentationhint::Locals), + variables_reference, + named_variables: None, + indexed_variables: None, + expensive: false, + source: None, + line: None, + column: None, + end_line: None, + end_column: None, + }]; + + let rsp = req.success(ResponseBody::Scopes(ScopesResponse { scopes })); + + drop(state); + DapOutput::response(rsp) + } + + fn handle_variables(&self, req: Request, args: VariablesArguments) -> DapOutput { + let variables_reference = args.variables_reference; + let variables = self.collect_r_variables(variables_reference); + let variables = self.make_variables(variables); + let rsp = req.success(ResponseBody::Variables(VariablesResponse { variables })); + DapOutput::response(rsp) + } + + fn collect_r_variables(&self, variables_reference: i64) -> Vec { + // Wait until we're in the `r_task()` to lock + // See https://github.com/posit-dev/positron/issues/5024 + let state = self.state.clone(); + + let variables = r_task(move || { + let state = state.lock().unwrap(); + let variables_reference_to_r_object = &state.variables_reference_to_r_object; + + let Some(object) = variables_reference_to_r_object.get(&variables_reference) else { + log::error!( + "Failed to locate R object for `variables_reference` {variables_reference}." + ); + return Vec::new(); + }; + + let object = object.get(); + object_variables(object.sexp) + }); + + variables + } + + fn make_variables(&self, variables: Vec) -> Vec { + self.state.lock().unwrap().make_variables(variables) + } + + fn handle_step( + &self, + req: Request, + _args: A, + cmd: DebugRequest, + resp: ResponseBody, + ) -> DapOutput { + DapOutput { + response: req.success(resp), + events: vec![], + side_effects: vec![DapSideEffect::DebugCommand(cmd)], + } + } + + fn handle_pause(&self, req: Request, _args: PauseArguments) -> DapOutput { + self.state.lock().unwrap().is_interrupting_for_debugger = true; + + log::info!("DAP: Received request to pause R, sending interrupt"); + + DapOutput { + response: req.success(ResponseBody::Pause), + events: vec![], + side_effects: vec![DapSideEffect::Interrupt], + } + } +} + +// TODO: Handle comm close to shut down the DAP server thread. +// +// The DAP comm is allowed to persist across TCP sessions. This supports session +// switching on the frontend. Ideally the frontend would be allowed to close the +// DAP comm in addition to the DAP TCP connection, which would shut down the DAP +// server. To achive this, the DAP server, once disconnected should wait for both +// the connection becoming ready and a channel event signalling comm close. If +// the latter fires, shut the server down. + +pub fn start_dap( + state: Arc>, + server_start: ServerStartMessage, + server_started_tx: Sender, + r_request_tx: Sender, + comm_tx: CommOutgoingTx, +) { + let ip_address = server_start.ip_address(); + + // Binding to port `0` to allow the OS to allocate a port for us to bind to + let listener = TcpListener::bind(format!("{ip_address}:0",)).unwrap(); + + let address = match listener.local_addr() { + Ok(address) => address, + Err(error) => { + log::error!("DAP: Failed to bind to {ip_address}:0: {error}"); + return; + }, + }; + + // Get the OS allocated port + let port = address.port(); + + log::trace!("DAP: Thread starting at address {ip_address}:{port}."); + + // Send the port back to `Shell` and eventually out to the frontend so it can connect + server_started_tx + .send(ServerStartedMessage::new(port)) + .log_err(); + + loop { + log::trace!("DAP: Waiting for client"); + + let stream = match listener.accept() { + Ok((stream, addr)) => { + log::info!("DAP: Connected to client {addr:?}"); + + let mut state = state.lock().unwrap(); + state.is_connected = true; + + stream + }, + Err(e) => { + log::error!("DAP: Can't get client: {e:?}"); + continue; + }, + }; + + let reader = BufReader::new(&stream); + let writer = BufWriter::new(&stream); + let (responses_tx, responses_rx) = unbounded::(); + + let mut server = DapServer::new( + reader, + writer, + state.clone(), + r_request_tx.clone(), + comm_tx.clone(), + responses_tx, + ); + + let (backend_events_tx, backend_events_rx) = unbounded::(); + let (done_tx, done_rx) = bounded::(0); + let output_clone = server.output.clone(); + + // We need a scope to let the borrow checker know that + // `output_clone` drops before the next iteration (it gets tangled + // to the stack variable `stream` through `server`) + let _ = crossbeam::thread::scope(|scope| { + spawn!(scope, "ark-dap-events", { + move |_| listen_dap_events(output_clone, backend_events_rx, responses_rx, done_rx) + }); + + // Connect the backend to the events thread + state.lock().unwrap().backend_events_tx = Some(backend_events_tx); + + loop { + // If disconnected, break and accept a new connection to create a new server + if !server.serve() { + log::trace!("DAP: Disconnected from client"); + let mut state = state.lock().unwrap(); + state.is_connected = false; + break; + } + } + + // Terminate the events thread + let _ = done_tx.send(true); + }); + } +} + +// Thread that listens for events sent by the backend, usually the +// `ReadConsole()` method. These are forwarded to the DAP client. +fn listen_dap_events( + output: Arc>>, + backend_events_rx: Receiver, + responses_rx: Receiver, + done_rx: Receiver, +) { + loop { + select!( + recv(backend_events_rx) -> event => { + let event = match event { + Ok(event) => event, + Err(err) => { + // Channel closed, sender dropped + log::info!("DAP: Event channel closed: {err:?}"); + return; + }, + }; + + log::trace!("DAP: Got event from backend: {:?}", event); + + let event = match event { + DapBackendEvent::Continued => { + Event::Continued(ContinuedEventBody { + thread_id: THREAD_ID, + all_threads_continued: Some(true) + }) + }, + + DapBackendEvent::Stopped => { + Event::Stopped(StoppedEventBody { + reason: StoppedEventReason::Step, + description: None, + thread_id: Some(THREAD_ID), + preserve_focus_hint: Some(false), + text: None, + all_threads_stopped: Some(true), + hit_breakpoint_ids: None, + }) + }, + + DapBackendEvent::Exception(DapExceptionEvent { class, message }) => { + let text = format!("<{class}>\n{message}"); + Event::Stopped(StoppedEventBody { + reason: StoppedEventReason::Exception, + description: Some(message), + thread_id: Some(THREAD_ID), + preserve_focus_hint: Some(false), + text: Some(text), + all_threads_stopped: Some(true), + hit_breakpoint_ids: None, + }) + }, + + DapBackendEvent::Invalidated => { + Event::Invalidated(InvalidatedEventBody { + areas: Some(vec![types::InvalidatedAreas::Variables]), + thread_id: Some(THREAD_ID), + stack_frame_id: None, + }) + }, + + DapBackendEvent::Terminated => { + Event::Terminated(None) + }, + + DapBackendEvent::BreakpointState { id, line, verified, message } => { + Event::Breakpoint(BreakpointEventBody { + reason: BreakpointEventReason::Changed, + breakpoint: dap::types::Breakpoint { + id: Some(id), + line: Some(Breakpoint::to_dap_line(line)), + verified, + message, + ..Default::default() + }, + }) + }, + }; + + let mut output = output.lock().unwrap(); + if let Err(err) = output.send_event(event) { + log::warn!("DAP: Failed to send event, closing: {err:?}"); + return; + } + }, - state.breakpoints.insert(uri, (doc_hash, new_breakpoints)); + recv(responses_rx) -> response => { + let response = match response { + Ok(response) => response, + Err(err) => { + log::info!("DAP: Responses channel closed: {err:?}"); + return; + }, + }; - drop(state); + log::trace!("DAP: Sending async response: {:?}", response); - let rsp = req.success(ResponseBody::SetBreakpoints(SetBreakpointsResponse { - breakpoints: response_breakpoints, - })); + let mut output = output.lock().unwrap(); + if let Err(err) = output.send(Sendable::Response(response)) { + log::warn!("DAP: Failed to send response, closing: {err:?}"); + return; + } + }, - self.respond(rsp) + // Break the loop and terminate the thread + recv(done_rx) -> _ => { return; }, + ) } +} - fn handle_attach( - &mut self, - req: Request, - _args: AttachRequestArguments, - ) -> Result<(), ServerError> { - let rsp = req.success(ResponseBody::Attach); - self.respond(rsp)?; - - self.send_event(Event::Thread(ThreadEventBody { - reason: ThreadEventReason::Started, - thread_id: THREAD_ID, - })) - } +pub struct DapServer { + server: Server, + pub output: Arc>>, + pub(crate) handler: DapHandler, + comm_tx: Option, + responses_tx: Sender, +} - fn handle_disconnect( - &mut self, - req: Request, - _args: DisconnectArguments, - ) -> Result<(), ServerError> { - // Only send `Q` if currently in a debugging session. - let is_debugging = { self.state.lock().unwrap().is_debugging }; - if is_debugging { - self.send_command(DebugRequest::Quit); +impl DapServer { + pub fn new( + reader: BufReader, + writer: BufWriter, + state: Arc>, + r_request_tx: Sender, + comm_tx: CommOutgoingTx, + responses_tx: Sender, + ) -> Self { + let server = Server::new(reader, writer); + let output = server.output.clone(); + let handler = DapHandler::new(state, r_request_tx); + Self { + server, + output, + handler, + comm_tx: Some(comm_tx), + responses_tx, } - - let rsp = req.success(ResponseBody::Disconnect); - self.respond(rsp) } - fn handle_restart(&mut self, req: Request, _args: T) -> Result<(), ServerError> { - // If connected to Positron, forward the restart command to the - // frontend. Otherwise ignore it. - if let Some(tx) = &self.comm_tx { - let msg = amalthea::comm_rpc_message!("restart"); - tx.send(msg).log_err(); - } + pub fn serve(&mut self) -> bool { + log::trace!("DAP: Polling"); + let req = match self.server.poll_request() { + Ok(Some(req)) => req, + Ok(None) => return false, + Err(err) => { + log::warn!("DAP: Connection closed: {err:?}"); + return false; + }, + }; + log::trace!("DAP: Got request: {:#?}", req); - let rsp = req.success(ResponseBody::Restart); - self.respond(rsp) - } + // Evaluate is async: the response is sent later via `responses_tx`. + // It is the only command that needs transport-specific handling. + if matches!(&req.command, Command::Evaluate(_)) { + let Command::Evaluate(args) = req.command.clone() else { + unreachable!() + }; + if let Err(err) = self.handle_evaluate(req, args) { + log::warn!("DAP: Handler failed, closing connection: {err:?}"); + return false; + } + return true; + } - // All servers must respond to `Threads` requests, possibly with - // a dummy thread as is the case here - fn handle_threads(&mut self, req: Request) -> Result<(), ServerError> { - let rsp = req.success(ResponseBody::Threads(ThreadsResponse { - threads: vec![Thread { - id: THREAD_ID, - name: String::from("R console"), - }], - })); - self.respond(rsp) + let output = self.handler.dispatch(req); + self.deliver(output) } - fn handle_set_exception_breakpoints( - &mut self, - req: Request, - args: SetExceptionBreakpointsArguments, - ) -> Result<(), ServerError> { - { - let mut state = self.state.lock().unwrap(); - state.exception_breakpoint_filters = args.filters; + fn deliver(&mut self, output: DapOutput) -> bool { + if let Err(err) = self.respond(output.response) { + log::warn!("DAP: Failed to send response: {err:?}"); + return false; } - let rsp = req.success(ResponseBody::SetExceptionBreakpoints( - SetExceptionBreakpointsResponse { - // This field is only useful for reporting problems with - // individual filters. Since we always accept all filters, - // `None` is fine here. - breakpoints: None, - }, - )); - self.respond(rsp) - } - fn handle_stacktrace( - &mut self, - req: Request, - args: StackTraceArguments, - ) -> Result<(), ServerError> { - let stack = { - let state = self.state.lock().unwrap(); - let fallback_sources = &state.fallback_sources; - match &state.stack { - Some(stack) => stack - .iter() - .map(|frame| into_dap_frame(frame, fallback_sources)) - .collect(), - _ => vec![], + for event in output.events { + if let Err(err) = self.send_event(event) { + log::warn!("DAP: Failed to send event: {err:?}"); + return false; } - }; - - // Slice the stack as requested - let n_usize = stack.len(); - - let start_frame = args.start_frame.unwrap_or(0); - let Ok(start) = usize::try_from(start_frame) else { - let rsp = req.error(&format!("Invalid start_frame: {start_frame}")); - return self.respond(rsp); - }; - let start = std::cmp::min(start, n_usize); - - let end = if let Some(levels) = args.levels { - let Ok(levels) = usize::try_from(levels) else { - let rsp = req.error(&format!("Invalid levels: {levels}")); - return self.respond(rsp); - }; - std::cmp::min(start.saturating_add(levels), n_usize) - } else { - n_usize - }; - - let Ok(total_frames) = i64::try_from(n_usize) else { - let rsp = req.error(&format!("Stack frame count overflows i64: {n_usize}")); - return self.respond(rsp); - }; - let stack = stack[start..end].to_vec(); + } - let rsp = req.success(ResponseBody::StackTrace(StackTraceResponse { - stack_frames: stack, - total_frames: Some(total_frames), - })); + for effect in output.side_effects { + self.handle_side_effect(effect); + } - self.respond(rsp) + true } - fn handle_source(&mut self, req: Request, _args: SourceArguments) -> Result<(), ServerError> { - let message = "Unsupported `source` request: {req:?}"; - log::error!("{message}"); - let rsp = req.error(message); - self.respond(rsp) + fn handle_side_effect(&mut self, effect: DapSideEffect) { + match effect { + DapSideEffect::DebugCommand(cmd) => { + if let Some(tx) = &self.comm_tx { + // If we have a comm channel (always the case as of this + // writing) we are connected to Positron or similar. Send + // control events so that the IDE can execute these as if they + // were sent by the user. This ensures prompts are updated. + let msg = amalthea::comm_rpc_message!( + "execute", + command = debug_request_command(cmd) + ); + tx.send(msg).log_err(); + } else { + self.handler + .r_request_tx + .send(RRequest::DebugCommand(cmd)) + .log_err(); + } + }, + DapSideEffect::Interrupt => { + crate::sys::control::handle_interrupt_request(); + }, + DapSideEffect::Restart => { + if let Some(tx) = &self.comm_tx { + let msg = amalthea::comm_rpc_message!("restart"); + tx.send(msg).log_err(); + } + }, + } } - fn handle_scopes(&mut self, req: Request, args: ScopesArguments) -> Result<(), ServerError> { - let state = self.state.lock().unwrap(); - let frame_id_to_variables_reference = &state.frame_id_to_variables_reference; - - // Entirely possible that the requested `frame_id` doesn't have any - // variables (like the top most frame where the call was made). We send - // back `0` in those cases, which is an indication of "no variables". - let variables_reference = frame_id_to_variables_reference - .get(&args.frame_id) - .copied() - .unwrap_or(0); - - // Only 1 overarching scope for now - let scopes = vec![Scope { - name: String::from("Locals"), - presentation_hint: Some(ScopePresentationhint::Locals), - variables_reference, - named_variables: None, - indexed_variables: None, - expensive: false, - source: None, - line: None, - column: None, - end_line: None, - end_column: None, - }]; - - let rsp = req.success(ResponseBody::Scopes(ScopesResponse { scopes })); - - drop(state); - self.respond(rsp) + fn respond(&mut self, rsp: Response) -> Result<(), ServerError> { + log::trace!("DAP: Responding to request: {rsp:#?}"); + self.server.respond(rsp) } - fn handle_variables( - &mut self, - req: Request, - args: VariablesArguments, - ) -> Result<(), ServerError> { - let variables_reference = args.variables_reference; - let variables = self.collect_r_variables(variables_reference); - let variables = self.make_variables(variables); - let rsp = req.success(ResponseBody::Variables(VariablesResponse { variables })); - self.respond(rsp) + fn send_event(&mut self, event: Event) -> Result<(), ServerError> { + log::trace!("DAP: Sending event: {event:#?}"); + self.server.send_event(event) } fn handle_evaluate( @@ -820,7 +965,7 @@ impl DapServer { ) -> Result<(), ServerError> { let expression = args.expression; let frame_id = args.frame_id; - let state = self.state.clone(); + let state = self.handler.state.clone(); let responses_tx = self.responses_tx.clone(); log::trace!("DAP: Spawning idle task for evaluate"); @@ -864,70 +1009,6 @@ impl DapServer { Ok(()) } - - fn collect_r_variables(&self, variables_reference: i64) -> Vec { - // Wait until we're in the `r_task()` to lock - // See https://github.com/posit-dev/positron/issues/5024 - let state = self.state.clone(); - - let variables = r_task(move || { - let state = state.lock().unwrap(); - let variables_reference_to_r_object = &state.variables_reference_to_r_object; - - let Some(object) = variables_reference_to_r_object.get(&variables_reference) else { - log::error!( - "Failed to locate R object for `variables_reference` {variables_reference}." - ); - return Vec::new(); - }; - - let object = object.get(); - object_variables(object.sexp) - }); - - variables - } - - fn make_variables(&self, variables: Vec) -> Vec { - self.state.lock().unwrap().make_variables(variables) - } - - fn handle_step( - &mut self, - req: Request, - _args: A, - cmd: DebugRequest, - resp: ResponseBody, - ) -> Result<(), ServerError> { - self.send_command(cmd); - let rsp = req.success(resp); - self.respond(rsp) - } - - fn handle_pause(&mut self, req: Request, _args: PauseArguments) -> Result<(), ServerError> { - self.state.lock().unwrap().is_interrupting_for_debugger = true; - - log::info!("DAP: Received request to pause R, sending interrupt"); - crate::sys::control::handle_interrupt_request(); - - let rsp = req.success(ResponseBody::Pause); - self.respond(rsp) - } - - fn send_command(&mut self, cmd: DebugRequest) { - if let Some(tx) = &self.comm_tx { - // If we have a comm channel (always the case as of this - // writing) we are connected to Positron or similar. Send - // control events so that the IDE can execute these as if they - // were sent by the user. This ensures prompts are updated. - let msg = amalthea::comm_rpc_message!("execute", command = debug_request_command(cmd)); - - tx.send(msg).log_err(); - } else { - // Otherwise, send command to R's `ReadConsole()` frontend method - self.r_request_tx.send(RRequest::DebugCommand(cmd)).unwrap(); - } - } } fn into_dap_frame(frame: &FrameInfo, fallback_sources: &HashMap) -> StackFrame { From abe5e88cf397f01822f7a540308c6fc5f0b32eae Mon Sep 17 00:00:00 2001 From: Lionel Henry Date: Thu, 23 Apr 2026 08:10:41 +0200 Subject: [PATCH 2/7] Rename `DapSideEffect` to `DapConsoleEvent` --- crates/ark/src/dap/dap_server.rs | 71 ++++++++++++++++---------------- 1 file changed, 36 insertions(+), 35 deletions(-) diff --git a/crates/ark/src/dap/dap_server.rs b/crates/ark/src/dap/dap_server.rs index 6a4eb421e..df2fdd49b 100644 --- a/crates/ark/src/dap/dap_server.rs +++ b/crates/ark/src/dap/dap_server.rs @@ -58,32 +58,33 @@ const THREAD_ID: i64 = -1; /// evaluations will run in that frame's environment. const SELECTED_FRAME_EXPRESSION: &str = ".positron_selected_frame"; -/// Side effect requested by a DAP handler that the transport layer must -/// deliver in a transport-specific way. -pub enum DapSideEffect { - /// Send a debug step/quit command to R. +/// Events for the Console requested by a DAP handler. Either delivered by a +/// round-trip through the frontend (so users see the command) or directly to +/// the Console. +pub enum DapConsoleEvent { + /// Send a debug step/quit command to R via the frontend. DebugCommand(DebugRequest), - /// Interrupt R for a pause. + /// Interrupt R for a pause. No frontend round-trip. Interrupt, - /// Request a session restart. + /// Request a session restart via the frontend. Restart, } -/// The result of handling a single DAP request. The transport layer -/// (TCP or Jupyter) is responsible for delivering the response, events, -/// and side effects through the appropriate channel. +/// The result of handling a single DAP request. The transport layer (TCP or +/// Jupyter) is responsible for delivering the response, DAP events, and console +/// events through the appropriate channel. pub struct DapOutput { pub response: Response, - pub events: Vec, - pub side_effects: Vec, + pub dap_events: Vec, + pub console_events: Vec, } impl DapOutput { pub fn response(response: Response) -> Self { Self { response, - events: vec![], - side_effects: vec![], + dap_events: vec![], + console_events: vec![], } } } @@ -186,8 +187,8 @@ impl DapHandler { })); DapOutput { response: rsp, - events: vec![Event::Initialized], - side_effects: vec![], + dap_events: vec![Event::Initialized], + console_events: vec![], } } @@ -413,35 +414,35 @@ impl DapHandler { let rsp = req.success(ResponseBody::Attach); DapOutput { response: rsp, - events: vec![Event::Thread(ThreadEventBody { + dap_events: vec![Event::Thread(ThreadEventBody { reason: ThreadEventReason::Started, thread_id: THREAD_ID, })], - side_effects: vec![], + console_events: vec![], } } fn handle_disconnect(&self, req: Request, _args: DisconnectArguments) -> DapOutput { // Only send `Q` if currently in a debugging session. let is_debugging = { self.state.lock().unwrap().is_debugging }; - let side_effects = if is_debugging { - vec![DapSideEffect::DebugCommand(DebugRequest::Quit)] + let console_events = if is_debugging { + vec![DapConsoleEvent::DebugCommand(DebugRequest::Quit)] } else { vec![] }; DapOutput { response: req.success(ResponseBody::Disconnect), - events: vec![], - side_effects, + dap_events: vec![], + console_events, } } fn handle_restart(&self, req: Request, _args: T) -> DapOutput { DapOutput { response: req.success(ResponseBody::Restart), - events: vec![], - side_effects: vec![DapSideEffect::Restart], + dap_events: vec![], + console_events: vec![DapConsoleEvent::Restart], } } @@ -607,8 +608,8 @@ impl DapHandler { ) -> DapOutput { DapOutput { response: req.success(resp), - events: vec![], - side_effects: vec![DapSideEffect::DebugCommand(cmd)], + dap_events: vec![], + console_events: vec![DapConsoleEvent::DebugCommand(cmd)], } } @@ -619,8 +620,8 @@ impl DapHandler { DapOutput { response: req.success(ResponseBody::Pause), - events: vec![], - side_effects: vec![DapSideEffect::Interrupt], + dap_events: vec![], + console_events: vec![DapConsoleEvent::Interrupt], } } } @@ -902,23 +903,23 @@ impl DapServer { return false; } - for event in output.events { + for event in output.dap_events { if let Err(err) = self.send_event(event) { log::warn!("DAP: Failed to send event: {err:?}"); return false; } } - for effect in output.side_effects { - self.handle_side_effect(effect); + for event in output.console_events { + self.handle_console_event(event); } true } - fn handle_side_effect(&mut self, effect: DapSideEffect) { - match effect { - DapSideEffect::DebugCommand(cmd) => { + fn handle_console_event(&mut self, event: DapConsoleEvent) { + match event { + DapConsoleEvent::DebugCommand(cmd) => { if let Some(tx) = &self.comm_tx { // If we have a comm channel (always the case as of this // writing) we are connected to Positron or similar. Send @@ -936,10 +937,10 @@ impl DapServer { .log_err(); } }, - DapSideEffect::Interrupt => { + DapConsoleEvent::Interrupt => { crate::sys::control::handle_interrupt_request(); }, - DapSideEffect::Restart => { + DapConsoleEvent::Restart => { if let Some(tx) = &self.comm_tx { let msg = amalthea::comm_rpc_message!("restart"); tx.send(msg).log_err(); From e556e56ae941cf5fd52f6a368ab82e87c0a67a4a Mon Sep 17 00:00:00 2001 From: Lionel Henry Date: Thu, 23 Apr 2026 08:20:48 +0200 Subject: [PATCH 3/7] Use `log_err()` --- crates/ark/src/dap/dap_server.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/crates/ark/src/dap/dap_server.rs b/crates/ark/src/dap/dap_server.rs index df2fdd49b..9251f286a 100644 --- a/crates/ark/src/dap/dap_server.rs +++ b/crates/ark/src/dap/dap_server.rs @@ -898,14 +898,12 @@ impl DapServer { } fn deliver(&mut self, output: DapOutput) -> bool { - if let Err(err) = self.respond(output.response) { - log::warn!("DAP: Failed to send response: {err:?}"); + if self.respond(output.response).log_err().is_none() { return false; } for event in output.dap_events { - if let Err(err) = self.send_event(event) { - log::warn!("DAP: Failed to send event: {err:?}"); + if self.send_event(event).log_err().is_none() { return false; } } From 0f33ef3d9a855b793828a38688436c230d1b10c5 Mon Sep 17 00:00:00 2001 From: Lionel Henry Date: Thu, 23 Apr 2026 08:52:12 +0200 Subject: [PATCH 4/7] Wrap `DapOutput` in `Result` --- crates/ark/src/dap/dap_server.rs | 119 +++++++++++++++++++------------ 1 file changed, 72 insertions(+), 47 deletions(-) diff --git a/crates/ark/src/dap/dap_server.rs b/crates/ark/src/dap/dap_server.rs index 9251f286a..045b3aae3 100644 --- a/crates/ark/src/dap/dap_server.rs +++ b/crates/ark/src/dap/dap_server.rs @@ -112,7 +112,8 @@ impl DapHandler { pub fn dispatch(&self, req: Request) -> DapOutput { let cmd = req.command.clone(); - match cmd { + let err_req = req.clone(); + let result = match cmd { Command::Initialize(args) => self.handle_initialize(req, args), Command::Attach(args) => self.handle_attach(req, args), Command::Disconnect(args) => self.handle_disconnect(req, args), @@ -144,12 +145,21 @@ impl DapHandler { Command::Pause(args) => self.handle_pause(req, args), _ => { log::warn!("DAP: Unknown request"); - DapOutput::response(req.error("Ark DAP: Unknown request")) + return DapOutput::response(err_req.error("Ark DAP: Unknown request")); }, + }; + + match result { + Ok(output) => output, + Err(err) => DapOutput::response(err_req.error(&format!("{err}"))), } } - fn handle_initialize(&self, req: Request, _args: InitializeArguments) -> DapOutput { + fn handle_initialize( + &self, + req: Request, + _args: InitializeArguments, + ) -> anyhow::Result { let rsp = req.success(ResponseBody::Initialize(types::Capabilities { supports_restart_request: Some(true), supports_exception_info_request: Some(false), @@ -185,11 +195,11 @@ impl DapHandler { supports_log_points: Some(true), ..Default::default() })); - DapOutput { + Ok(DapOutput { response: rsp, dap_events: vec![Event::Initialized], console_events: vec![], - } + }) } // Handle SetBreakpoints requests from the frontend. @@ -207,11 +217,13 @@ impl DapHandler { // - When a user unchecks a breakpoint, it appears as a deletion (omitted // from the request). We preserve verified breakpoints as Disabled so we // can restore their state when re-enabled without requiring re-sourcing. - fn handle_set_breakpoints(&self, req: Request, args: SetBreakpointsArguments) -> DapOutput { + fn handle_set_breakpoints( + &self, + req: Request, + args: SetBreakpointsArguments, + ) -> anyhow::Result { let Some(path) = args.source.path.as_ref() else { - // We don't currently have virtual documents managed via source references - log::warn!("Missing a path to set breakpoints for."); - return DapOutput::response(req.error("Missing a path to set breakpoints for")); + return Err(anyhow::anyhow!("Missing a path to set breakpoints for")); }; // We currently only support "path" URIs as Positron never sends URIs. @@ -224,7 +236,7 @@ impl DapHandler { let rsp = req.success(ResponseBody::SetBreakpoints(SetBreakpointsResponse { breakpoints: vec![], })); - return DapOutput::response(rsp); + return Ok(DapOutput::response(rsp)); }, }; @@ -253,7 +265,7 @@ impl DapHandler { let rsp = req.success(ResponseBody::SetBreakpoints(SetBreakpointsResponse { breakpoints, })); - return DapOutput::response(rsp); + return Ok(DapOutput::response(rsp)); }, }; @@ -407,22 +419,30 @@ impl DapHandler { breakpoints: response_breakpoints, })); - DapOutput::response(rsp) + Ok(DapOutput::response(rsp)) } - fn handle_attach(&self, req: Request, _args: AttachRequestArguments) -> DapOutput { + fn handle_attach( + &self, + req: Request, + _args: AttachRequestArguments, + ) -> anyhow::Result { let rsp = req.success(ResponseBody::Attach); - DapOutput { + Ok(DapOutput { response: rsp, dap_events: vec![Event::Thread(ThreadEventBody { reason: ThreadEventReason::Started, thread_id: THREAD_ID, })], console_events: vec![], - } + }) } - fn handle_disconnect(&self, req: Request, _args: DisconnectArguments) -> DapOutput { + fn handle_disconnect( + &self, + req: Request, + _args: DisconnectArguments, + ) -> anyhow::Result { // Only send `Q` if currently in a debugging session. let is_debugging = { self.state.lock().unwrap().is_debugging }; let console_events = if is_debugging { @@ -431,38 +451,38 @@ impl DapHandler { vec![] }; - DapOutput { + Ok(DapOutput { response: req.success(ResponseBody::Disconnect), dap_events: vec![], console_events, - } + }) } - fn handle_restart(&self, req: Request, _args: T) -> DapOutput { - DapOutput { + fn handle_restart(&self, req: Request, _args: T) -> anyhow::Result { + Ok(DapOutput { response: req.success(ResponseBody::Restart), dap_events: vec![], console_events: vec![DapConsoleEvent::Restart], - } + }) } // All servers must respond to `Threads` requests, possibly with // a dummy thread as is the case here - fn handle_threads(&self, req: Request) -> DapOutput { + fn handle_threads(&self, req: Request) -> anyhow::Result { let rsp = req.success(ResponseBody::Threads(ThreadsResponse { threads: vec![Thread { id: THREAD_ID, name: String::from("R console"), }], })); - DapOutput::response(rsp) + Ok(DapOutput::response(rsp)) } fn handle_set_exception_breakpoints( &self, req: Request, args: SetExceptionBreakpointsArguments, - ) -> DapOutput { + ) -> anyhow::Result { { let mut state = self.state.lock().unwrap(); state.exception_breakpoint_filters = args.filters; @@ -475,10 +495,14 @@ impl DapHandler { breakpoints: None, }, )); - DapOutput::response(rsp) + Ok(DapOutput::response(rsp)) } - fn handle_stacktrace(&self, req: Request, args: StackTraceArguments) -> DapOutput { + fn handle_stacktrace( + &self, + req: Request, + args: StackTraceArguments, + ) -> anyhow::Result { let stack = { let state = self.state.lock().unwrap(); let fallback_sources = &state.fallback_sources; @@ -496,15 +520,13 @@ impl DapHandler { let start_frame = args.start_frame.unwrap_or(0); let Ok(start) = usize::try_from(start_frame) else { - let rsp = req.error(&format!("Invalid start_frame: {start_frame}")); - return DapOutput::response(rsp); + return Err(anyhow::anyhow!("Invalid start_frame: {start_frame}")); }; let start = std::cmp::min(start, n_usize); let end = if let Some(levels) = args.levels { let Ok(levels) = usize::try_from(levels) else { - let rsp = req.error(&format!("Invalid levels: {levels}")); - return DapOutput::response(rsp); + return Err(anyhow::anyhow!("Invalid levels: {levels}")); }; std::cmp::min(start.saturating_add(levels), n_usize) } else { @@ -512,8 +534,9 @@ impl DapHandler { }; let Ok(total_frames) = i64::try_from(n_usize) else { - let rsp = req.error(&format!("Stack frame count overflows i64: {n_usize}")); - return DapOutput::response(rsp); + return Err(anyhow::anyhow!( + "Stack frame count overflows i64: {n_usize}" + )); }; let stack = stack[start..end].to_vec(); @@ -522,16 +545,14 @@ impl DapHandler { total_frames: Some(total_frames), })); - DapOutput::response(rsp) + Ok(DapOutput::response(rsp)) } - fn handle_source(&self, req: Request, _args: SourceArguments) -> DapOutput { - let message = "Unsupported `source` request: {req:?}"; - log::error!("{message}"); - DapOutput::response(req.error(message)) + fn handle_source(&self, _req: Request, _args: SourceArguments) -> anyhow::Result { + Err(anyhow::anyhow!("Unsupported `source` request")) } - fn handle_scopes(&self, req: Request, args: ScopesArguments) -> DapOutput { + fn handle_scopes(&self, req: Request, args: ScopesArguments) -> anyhow::Result { let state = self.state.lock().unwrap(); let frame_id_to_variables_reference = &state.frame_id_to_variables_reference; @@ -561,15 +582,19 @@ impl DapHandler { let rsp = req.success(ResponseBody::Scopes(ScopesResponse { scopes })); drop(state); - DapOutput::response(rsp) + Ok(DapOutput::response(rsp)) } - fn handle_variables(&self, req: Request, args: VariablesArguments) -> DapOutput { + fn handle_variables( + &self, + req: Request, + args: VariablesArguments, + ) -> anyhow::Result { let variables_reference = args.variables_reference; let variables = self.collect_r_variables(variables_reference); let variables = self.make_variables(variables); let rsp = req.success(ResponseBody::Variables(VariablesResponse { variables })); - DapOutput::response(rsp) + Ok(DapOutput::response(rsp)) } fn collect_r_variables(&self, variables_reference: i64) -> Vec { @@ -605,24 +630,24 @@ impl DapHandler { _args: A, cmd: DebugRequest, resp: ResponseBody, - ) -> DapOutput { - DapOutput { + ) -> anyhow::Result { + Ok(DapOutput { response: req.success(resp), dap_events: vec![], console_events: vec![DapConsoleEvent::DebugCommand(cmd)], - } + }) } - fn handle_pause(&self, req: Request, _args: PauseArguments) -> DapOutput { + fn handle_pause(&self, req: Request, _args: PauseArguments) -> anyhow::Result { self.state.lock().unwrap().is_interrupting_for_debugger = true; log::info!("DAP: Received request to pause R, sending interrupt"); - DapOutput { + Ok(DapOutput { response: req.success(ResponseBody::Pause), dap_events: vec![], console_events: vec![DapConsoleEvent::Interrupt], - } + }) } } From bca51927ad4c648088be05743406cf7297f39f5c Mon Sep 17 00:00:00 2001 From: Lionel Henry Date: Sat, 25 Apr 2026 10:14:49 +0200 Subject: [PATCH 5/7] Don't thread `req` through handlers --- crates/ark/src/dap/dap_server.rs | 209 +++++++++++++++---------------- 1 file changed, 103 insertions(+), 106 deletions(-) diff --git a/crates/ark/src/dap/dap_server.rs b/crates/ark/src/dap/dap_server.rs index 045b3aae3..8591e8b85 100644 --- a/crates/ark/src/dap/dap_server.rs +++ b/crates/ark/src/dap/dap_server.rs @@ -80,13 +80,36 @@ pub struct DapOutput { } impl DapOutput { - pub fn response(response: Response) -> Self { + pub fn new(response: Response) -> Self { Self { response, dap_events: vec![], console_events: vec![], } } + + pub fn error(req: Request, message: &str) -> Self { + Self::new(req.error(message)) + } +} + +/// The result of a handler method. Contains a `ResponseBody` (not a full +/// `Response`) plus any events. The dispatcher wraps the body with +/// `req.success()` or `req.error()` to produce a transport-ready `DapOutput`. +pub(crate) struct HandlerOutput { + pub body: ResponseBody, + pub dap_events: Vec, + pub console_events: Vec, +} + +impl HandlerOutput { + fn new(body: ResponseBody) -> Self { + Self { + body, + dap_events: vec![], + console_events: vec![], + } + } } /// Transport-agnostic handler for DAP requests. Translates each request @@ -112,55 +135,50 @@ impl DapHandler { pub fn dispatch(&self, req: Request) -> DapOutput { let cmd = req.command.clone(); - let err_req = req.clone(); let result = match cmd { - Command::Initialize(args) => self.handle_initialize(req, args), - Command::Attach(args) => self.handle_attach(req, args), - Command::Disconnect(args) => self.handle_disconnect(req, args), - Command::Restart(args) => self.handle_restart(req, args), - Command::Threads => self.handle_threads(req), - Command::SetBreakpoints(args) => self.handle_set_breakpoints(req, args), - Command::SetExceptionBreakpoints(args) => { - self.handle_set_exception_breakpoints(req, args) - }, - Command::StackTrace(args) => self.handle_stacktrace(req, args), - Command::Source(args) => self.handle_source(req, args), - Command::Scopes(args) => self.handle_scopes(req, args), - Command::Variables(args) => self.handle_variables(req, args), + Command::Initialize(args) => self.handle_initialize(args), + Command::Attach(args) => self.handle_attach(args), + Command::Disconnect(args) => self.handle_disconnect(args), + Command::Restart(args) => self.handle_restart(args), + Command::Threads => self.handle_threads(), + Command::SetBreakpoints(args) => self.handle_set_breakpoints(args), + Command::SetExceptionBreakpoints(args) => self.handle_set_exception_breakpoints(args), + Command::StackTrace(args) => self.handle_stacktrace(args), + Command::Source(args) => self.handle_source(args), + Command::Scopes(args) => self.handle_scopes(args), + Command::Variables(args) => self.handle_variables(args), Command::Continue(args) => { let resp = ResponseBody::Continue(ContinueResponse { all_threads_continued: Some(true), }); - self.handle_step(req, args, DebugRequest::Continue, resp) - }, - Command::Next(args) => { - self.handle_step(req, args, DebugRequest::Next, ResponseBody::Next) + self.handle_step(args, DebugRequest::Continue, resp) }, + Command::Next(args) => self.handle_step(args, DebugRequest::Next, ResponseBody::Next), Command::StepIn(args) => { - self.handle_step(req, args, DebugRequest::StepIn, ResponseBody::StepIn) + self.handle_step(args, DebugRequest::StepIn, ResponseBody::StepIn) }, Command::StepOut(args) => { - self.handle_step(req, args, DebugRequest::StepOut, ResponseBody::StepOut) + self.handle_step(args, DebugRequest::StepOut, ResponseBody::StepOut) }, - Command::Pause(args) => self.handle_pause(req, args), + Command::Pause(args) => self.handle_pause(args), _ => { log::warn!("DAP: Unknown request"); - return DapOutput::response(err_req.error("Ark DAP: Unknown request")); + return DapOutput::error(req, "Ark DAP: Unknown request"); }, }; match result { - Ok(output) => output, - Err(err) => DapOutput::response(err_req.error(&format!("{err}"))), + Ok(output) => DapOutput { + response: req.success(output.body), + dap_events: output.dap_events, + console_events: output.console_events, + }, + Err(err) => DapOutput::error(req, &format!("{err}")), } } - fn handle_initialize( - &self, - req: Request, - _args: InitializeArguments, - ) -> anyhow::Result { - let rsp = req.success(ResponseBody::Initialize(types::Capabilities { + fn handle_initialize(&self, _args: InitializeArguments) -> anyhow::Result { + let body = ResponseBody::Initialize(types::Capabilities { supports_restart_request: Some(true), supports_exception_info_request: Some(false), exception_breakpoint_filters: Some(vec![ @@ -194,9 +212,9 @@ impl DapHandler { supports_hit_conditional_breakpoints: Some(true), supports_log_points: Some(true), ..Default::default() - })); - Ok(DapOutput { - response: rsp, + }); + Ok(HandlerOutput { + body, dap_events: vec![Event::Initialized], console_events: vec![], }) @@ -219,9 +237,8 @@ impl DapHandler { // can restore their state when re-enabled without requiring re-sourcing. fn handle_set_breakpoints( &self, - req: Request, args: SetBreakpointsArguments, - ) -> anyhow::Result { + ) -> anyhow::Result { let Some(path) = args.source.path.as_ref() else { return Err(anyhow::anyhow!("Missing a path to set breakpoints for")); }; @@ -233,10 +250,11 @@ impl DapHandler { Ok(uri) => uri, Err(err) => { log::warn!("Can't set breakpoints for non-file path: '{path}': {err}"); - let rsp = req.success(ResponseBody::SetBreakpoints(SetBreakpointsResponse { - breakpoints: vec![], - })); - return Ok(DapOutput::response(rsp)); + return Ok(HandlerOutput::new(ResponseBody::SetBreakpoints( + SetBreakpointsResponse { + breakpoints: vec![], + }, + ))); }, }; @@ -262,10 +280,9 @@ impl DapHandler { }) .collect(); - let rsp = req.success(ResponseBody::SetBreakpoints(SetBreakpointsResponse { - breakpoints, - })); - return Ok(DapOutput::response(rsp)); + return Ok(HandlerOutput::new(ResponseBody::SetBreakpoints( + SetBreakpointsResponse { breakpoints }, + ))); }, }; @@ -415,21 +432,16 @@ impl DapHandler { drop(state); - let rsp = req.success(ResponseBody::SetBreakpoints(SetBreakpointsResponse { - breakpoints: response_breakpoints, - })); - - Ok(DapOutput::response(rsp)) + Ok(HandlerOutput::new(ResponseBody::SetBreakpoints( + SetBreakpointsResponse { + breakpoints: response_breakpoints, + }, + ))) } - fn handle_attach( - &self, - req: Request, - _args: AttachRequestArguments, - ) -> anyhow::Result { - let rsp = req.success(ResponseBody::Attach); - Ok(DapOutput { - response: rsp, + fn handle_attach(&self, _args: AttachRequestArguments) -> anyhow::Result { + Ok(HandlerOutput { + body: ResponseBody::Attach, dap_events: vec![Event::Thread(ThreadEventBody { reason: ThreadEventReason::Started, thread_id: THREAD_ID, @@ -438,11 +450,7 @@ impl DapHandler { }) } - fn handle_disconnect( - &self, - req: Request, - _args: DisconnectArguments, - ) -> anyhow::Result { + fn handle_disconnect(&self, _args: DisconnectArguments) -> anyhow::Result { // Only send `Q` if currently in a debugging session. let is_debugging = { self.state.lock().unwrap().is_debugging }; let console_events = if is_debugging { @@ -451,16 +459,16 @@ impl DapHandler { vec![] }; - Ok(DapOutput { - response: req.success(ResponseBody::Disconnect), + Ok(HandlerOutput { + body: ResponseBody::Disconnect, dap_events: vec![], console_events, }) } - fn handle_restart(&self, req: Request, _args: T) -> anyhow::Result { - Ok(DapOutput { - response: req.success(ResponseBody::Restart), + fn handle_restart(&self, _args: T) -> anyhow::Result { + Ok(HandlerOutput { + body: ResponseBody::Restart, dap_events: vec![], console_events: vec![DapConsoleEvent::Restart], }) @@ -468,41 +476,34 @@ impl DapHandler { // All servers must respond to `Threads` requests, possibly with // a dummy thread as is the case here - fn handle_threads(&self, req: Request) -> anyhow::Result { - let rsp = req.success(ResponseBody::Threads(ThreadsResponse { + fn handle_threads(&self) -> anyhow::Result { + Ok(HandlerOutput::new(ResponseBody::Threads(ThreadsResponse { threads: vec![Thread { id: THREAD_ID, name: String::from("R console"), }], - })); - Ok(DapOutput::response(rsp)) + }))) } fn handle_set_exception_breakpoints( &self, - req: Request, args: SetExceptionBreakpointsArguments, - ) -> anyhow::Result { + ) -> anyhow::Result { { let mut state = self.state.lock().unwrap(); state.exception_breakpoint_filters = args.filters; } - let rsp = req.success(ResponseBody::SetExceptionBreakpoints( + Ok(HandlerOutput::new(ResponseBody::SetExceptionBreakpoints( SetExceptionBreakpointsResponse { // This field is only useful for reporting problems with // individual filters. Since we always accept all filters, // `None` is fine here. breakpoints: None, }, - )); - Ok(DapOutput::response(rsp)) + ))) } - fn handle_stacktrace( - &self, - req: Request, - args: StackTraceArguments, - ) -> anyhow::Result { + fn handle_stacktrace(&self, args: StackTraceArguments) -> anyhow::Result { let stack = { let state = self.state.lock().unwrap(); let fallback_sources = &state.fallback_sources; @@ -540,19 +541,19 @@ impl DapHandler { }; let stack = stack[start..end].to_vec(); - let rsp = req.success(ResponseBody::StackTrace(StackTraceResponse { - stack_frames: stack, - total_frames: Some(total_frames), - })); - - Ok(DapOutput::response(rsp)) + Ok(HandlerOutput::new(ResponseBody::StackTrace( + StackTraceResponse { + stack_frames: stack, + total_frames: Some(total_frames), + }, + ))) } - fn handle_source(&self, _req: Request, _args: SourceArguments) -> anyhow::Result { + fn handle_source(&self, _args: SourceArguments) -> anyhow::Result { Err(anyhow::anyhow!("Unsupported `source` request")) } - fn handle_scopes(&self, req: Request, args: ScopesArguments) -> anyhow::Result { + fn handle_scopes(&self, args: ScopesArguments) -> anyhow::Result { let state = self.state.lock().unwrap(); let frame_id_to_variables_reference = &state.frame_id_to_variables_reference; @@ -579,22 +580,19 @@ impl DapHandler { end_column: None, }]; - let rsp = req.success(ResponseBody::Scopes(ScopesResponse { scopes })); - drop(state); - Ok(DapOutput::response(rsp)) + Ok(HandlerOutput::new(ResponseBody::Scopes(ScopesResponse { + scopes, + }))) } - fn handle_variables( - &self, - req: Request, - args: VariablesArguments, - ) -> anyhow::Result { + fn handle_variables(&self, args: VariablesArguments) -> anyhow::Result { let variables_reference = args.variables_reference; let variables = self.collect_r_variables(variables_reference); let variables = self.make_variables(variables); - let rsp = req.success(ResponseBody::Variables(VariablesResponse { variables })); - Ok(DapOutput::response(rsp)) + Ok(HandlerOutput::new(ResponseBody::Variables( + VariablesResponse { variables }, + ))) } fn collect_r_variables(&self, variables_reference: i64) -> Vec { @@ -626,25 +624,24 @@ impl DapHandler { fn handle_step( &self, - req: Request, _args: A, cmd: DebugRequest, resp: ResponseBody, - ) -> anyhow::Result { - Ok(DapOutput { - response: req.success(resp), + ) -> anyhow::Result { + Ok(HandlerOutput { + body: resp, dap_events: vec![], console_events: vec![DapConsoleEvent::DebugCommand(cmd)], }) } - fn handle_pause(&self, req: Request, _args: PauseArguments) -> anyhow::Result { + fn handle_pause(&self, _args: PauseArguments) -> anyhow::Result { self.state.lock().unwrap().is_interrupting_for_debugger = true; log::info!("DAP: Received request to pause R, sending interrupt"); - Ok(DapOutput { - response: req.success(ResponseBody::Pause), + Ok(HandlerOutput { + body: ResponseBody::Pause, dap_events: vec![], console_events: vec![DapConsoleEvent::Interrupt], }) From 841539f86511c6ae3356000ee4da663d8247e9c2 Mon Sep 17 00:00:00 2001 From: Lionel Henry Date: Sat, 25 Apr 2026 10:31:54 +0200 Subject: [PATCH 6/7] Rename to `DapHandlerOutput` --- crates/ark/src/dap/dap_server.rs | 80 ++++++++++++++++---------------- 1 file changed, 41 insertions(+), 39 deletions(-) diff --git a/crates/ark/src/dap/dap_server.rs b/crates/ark/src/dap/dap_server.rs index 8591e8b85..0bae99871 100644 --- a/crates/ark/src/dap/dap_server.rs +++ b/crates/ark/src/dap/dap_server.rs @@ -96,13 +96,13 @@ impl DapOutput { /// The result of a handler method. Contains a `ResponseBody` (not a full /// `Response`) plus any events. The dispatcher wraps the body with /// `req.success()` or `req.error()` to produce a transport-ready `DapOutput`. -pub(crate) struct HandlerOutput { +pub(crate) struct DapHandlerOutput { pub body: ResponseBody, pub dap_events: Vec, pub console_events: Vec, } -impl HandlerOutput { +impl DapHandlerOutput { fn new(body: ResponseBody) -> Self { Self { body, @@ -177,7 +177,7 @@ impl DapHandler { } } - fn handle_initialize(&self, _args: InitializeArguments) -> anyhow::Result { + fn handle_initialize(&self, _args: InitializeArguments) -> anyhow::Result { let body = ResponseBody::Initialize(types::Capabilities { supports_restart_request: Some(true), supports_exception_info_request: Some(false), @@ -213,7 +213,7 @@ impl DapHandler { supports_log_points: Some(true), ..Default::default() }); - Ok(HandlerOutput { + Ok(DapHandlerOutput { body, dap_events: vec![Event::Initialized], console_events: vec![], @@ -238,7 +238,7 @@ impl DapHandler { fn handle_set_breakpoints( &self, args: SetBreakpointsArguments, - ) -> anyhow::Result { + ) -> anyhow::Result { let Some(path) = args.source.path.as_ref() else { return Err(anyhow::anyhow!("Missing a path to set breakpoints for")); }; @@ -250,7 +250,7 @@ impl DapHandler { Ok(uri) => uri, Err(err) => { log::warn!("Can't set breakpoints for non-file path: '{path}': {err}"); - return Ok(HandlerOutput::new(ResponseBody::SetBreakpoints( + return Ok(DapHandlerOutput::new(ResponseBody::SetBreakpoints( SetBreakpointsResponse { breakpoints: vec![], }, @@ -280,7 +280,7 @@ impl DapHandler { }) .collect(); - return Ok(HandlerOutput::new(ResponseBody::SetBreakpoints( + return Ok(DapHandlerOutput::new(ResponseBody::SetBreakpoints( SetBreakpointsResponse { breakpoints }, ))); }, @@ -432,15 +432,15 @@ impl DapHandler { drop(state); - Ok(HandlerOutput::new(ResponseBody::SetBreakpoints( + Ok(DapHandlerOutput::new(ResponseBody::SetBreakpoints( SetBreakpointsResponse { breakpoints: response_breakpoints, }, ))) } - fn handle_attach(&self, _args: AttachRequestArguments) -> anyhow::Result { - Ok(HandlerOutput { + fn handle_attach(&self, _args: AttachRequestArguments) -> anyhow::Result { + Ok(DapHandlerOutput { body: ResponseBody::Attach, dap_events: vec![Event::Thread(ThreadEventBody { reason: ThreadEventReason::Started, @@ -450,7 +450,7 @@ impl DapHandler { }) } - fn handle_disconnect(&self, _args: DisconnectArguments) -> anyhow::Result { + fn handle_disconnect(&self, _args: DisconnectArguments) -> anyhow::Result { // Only send `Q` if currently in a debugging session. let is_debugging = { self.state.lock().unwrap().is_debugging }; let console_events = if is_debugging { @@ -459,15 +459,15 @@ impl DapHandler { vec![] }; - Ok(HandlerOutput { + Ok(DapHandlerOutput { body: ResponseBody::Disconnect, dap_events: vec![], console_events, }) } - fn handle_restart(&self, _args: T) -> anyhow::Result { - Ok(HandlerOutput { + fn handle_restart(&self, _args: T) -> anyhow::Result { + Ok(DapHandlerOutput { body: ResponseBody::Restart, dap_events: vec![], console_events: vec![DapConsoleEvent::Restart], @@ -476,34 +476,36 @@ impl DapHandler { // All servers must respond to `Threads` requests, possibly with // a dummy thread as is the case here - fn handle_threads(&self) -> anyhow::Result { - Ok(HandlerOutput::new(ResponseBody::Threads(ThreadsResponse { - threads: vec![Thread { - id: THREAD_ID, - name: String::from("R console"), - }], - }))) + fn handle_threads(&self) -> anyhow::Result { + Ok(DapHandlerOutput::new(ResponseBody::Threads( + ThreadsResponse { + threads: vec![Thread { + id: THREAD_ID, + name: String::from("R console"), + }], + }, + ))) } fn handle_set_exception_breakpoints( &self, args: SetExceptionBreakpointsArguments, - ) -> anyhow::Result { + ) -> anyhow::Result { { let mut state = self.state.lock().unwrap(); state.exception_breakpoint_filters = args.filters; } - Ok(HandlerOutput::new(ResponseBody::SetExceptionBreakpoints( - SetExceptionBreakpointsResponse { + Ok(DapHandlerOutput::new( + ResponseBody::SetExceptionBreakpoints(SetExceptionBreakpointsResponse { // This field is only useful for reporting problems with // individual filters. Since we always accept all filters, // `None` is fine here. breakpoints: None, - }, - ))) + }), + )) } - fn handle_stacktrace(&self, args: StackTraceArguments) -> anyhow::Result { + fn handle_stacktrace(&self, args: StackTraceArguments) -> anyhow::Result { let stack = { let state = self.state.lock().unwrap(); let fallback_sources = &state.fallback_sources; @@ -541,7 +543,7 @@ impl DapHandler { }; let stack = stack[start..end].to_vec(); - Ok(HandlerOutput::new(ResponseBody::StackTrace( + Ok(DapHandlerOutput::new(ResponseBody::StackTrace( StackTraceResponse { stack_frames: stack, total_frames: Some(total_frames), @@ -549,11 +551,11 @@ impl DapHandler { ))) } - fn handle_source(&self, _args: SourceArguments) -> anyhow::Result { + fn handle_source(&self, _args: SourceArguments) -> anyhow::Result { Err(anyhow::anyhow!("Unsupported `source` request")) } - fn handle_scopes(&self, args: ScopesArguments) -> anyhow::Result { + fn handle_scopes(&self, args: ScopesArguments) -> anyhow::Result { let state = self.state.lock().unwrap(); let frame_id_to_variables_reference = &state.frame_id_to_variables_reference; @@ -581,16 +583,16 @@ impl DapHandler { }]; drop(state); - Ok(HandlerOutput::new(ResponseBody::Scopes(ScopesResponse { - scopes, - }))) + Ok(DapHandlerOutput::new(ResponseBody::Scopes( + ScopesResponse { scopes }, + ))) } - fn handle_variables(&self, args: VariablesArguments) -> anyhow::Result { + fn handle_variables(&self, args: VariablesArguments) -> anyhow::Result { let variables_reference = args.variables_reference; let variables = self.collect_r_variables(variables_reference); let variables = self.make_variables(variables); - Ok(HandlerOutput::new(ResponseBody::Variables( + Ok(DapHandlerOutput::new(ResponseBody::Variables( VariablesResponse { variables }, ))) } @@ -627,20 +629,20 @@ impl DapHandler { _args: A, cmd: DebugRequest, resp: ResponseBody, - ) -> anyhow::Result { - Ok(HandlerOutput { + ) -> anyhow::Result { + Ok(DapHandlerOutput { body: resp, dap_events: vec![], console_events: vec![DapConsoleEvent::DebugCommand(cmd)], }) } - fn handle_pause(&self, _args: PauseArguments) -> anyhow::Result { + fn handle_pause(&self, _args: PauseArguments) -> anyhow::Result { self.state.lock().unwrap().is_interrupting_for_debugger = true; log::info!("DAP: Received request to pause R, sending interrupt"); - Ok(HandlerOutput { + Ok(DapHandlerOutput { body: ResponseBody::Pause, dap_events: vec![], console_events: vec![DapConsoleEvent::Interrupt], From 37d682d796c19a020ed0dae7afb75a3b17e47ed5 Mon Sep 17 00:00:00 2001 From: Lionel Henry Date: Sat, 25 Apr 2026 10:48:54 +0200 Subject: [PATCH 7/7] Add comment on Evaluate being tied to TCP --- crates/ark/src/dap/dap_server.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/crates/ark/src/dap/dap_server.rs b/crates/ark/src/dap/dap_server.rs index 0bae99871..abec56ae0 100644 --- a/crates/ark/src/dap/dap_server.rs +++ b/crates/ark/src/dap/dap_server.rs @@ -981,6 +981,8 @@ impl DapServer { self.server.send_event(event) } + // Tied to the TCP transport for now. For Jupyter we need to figure out how + // to do async responses with Jupyter's Control channel. fn handle_evaluate( &mut self, req: Request,