diff --git a/crates/ark/src/dap/dap_server.rs b/crates/ark/src/dap/dap_server.rs index ede5f58bf..abec56ae0 100644 --- a/crates/ark/src/dap/dap_server.rs +++ b/crates/ark/src/dap/dap_server.rs @@ -58,325 +58,127 @@ const THREAD_ID: i64 = -1; /// evaluations will run in that frame's environment. const SELECTED_FRAME_EXPRESSION: &str = ".positron_selected_frame"; -// TODO: Handle comm close to shut down the DAP server thread. -// -// The DAP comm is allowed to persist across TCP sessions. This supports session -// switching on the frontend. Ideally the frontend would be allowed to close the -// DAP comm in addition to the DAP TCP connection, which would shut down the DAP -// server. To achive this, the DAP server, once disconnected should wait for both -// the connection becoming ready and a channel event signalling comm close. If -// the latter fires, shut the server down. - -pub fn start_dap( - state: Arc>, - server_start: ServerStartMessage, - server_started_tx: Sender, - r_request_tx: Sender, - comm_tx: CommOutgoingTx, -) { - let ip_address = server_start.ip_address(); - - // Binding to port `0` to allow the OS to allocate a port for us to bind to - let listener = TcpListener::bind(format!("{ip_address}:0",)).unwrap(); - - let address = match listener.local_addr() { - Ok(address) => address, - Err(error) => { - log::error!("DAP: Failed to bind to {ip_address}:0: {error}"); - return; - }, - }; - - // Get the OS allocated port - let port = address.port(); - - log::trace!("DAP: Thread starting at address {ip_address}:{port}."); - - // Send the port back to `Shell` and eventually out to the frontend so it can connect - server_started_tx - .send(ServerStartedMessage::new(port)) - .log_err(); - - loop { - log::trace!("DAP: Waiting for client"); - - let stream = match listener.accept() { - Ok((stream, addr)) => { - log::info!("DAP: Connected to client {addr:?}"); - - let mut state = state.lock().unwrap(); - state.is_connected = true; - - stream - }, - Err(e) => { - log::error!("DAP: Can't get client: {e:?}"); - continue; - }, - }; - - let reader = BufReader::new(&stream); - let writer = BufWriter::new(&stream); - let (responses_tx, responses_rx) = unbounded::(); - - let mut server = DapServer::new( - reader, - writer, - state.clone(), - r_request_tx.clone(), - comm_tx.clone(), - responses_tx, - ); - - let (backend_events_tx, backend_events_rx) = unbounded::(); - let (done_tx, done_rx) = bounded::(0); - let output_clone = server.output.clone(); - - // We need a scope to let the borrow checker know that - // `output_clone` drops before the next iteration (it gets tangled - // to the stack variable `stream` through `server`) - let _ = crossbeam::thread::scope(|scope| { - spawn!(scope, "ark-dap-events", { - move |_| listen_dap_events(output_clone, backend_events_rx, responses_rx, done_rx) - }); +/// Events for the Console requested by a DAP handler. Either delivered by a +/// round-trip through the frontend (so users see the command) or directly to +/// the Console. +pub enum DapConsoleEvent { + /// Send a debug step/quit command to R via the frontend. + DebugCommand(DebugRequest), + /// Interrupt R for a pause. No frontend round-trip. + Interrupt, + /// Request a session restart via the frontend. + Restart, +} - // Connect the backend to the events thread - state.lock().unwrap().backend_events_tx = Some(backend_events_tx); +/// The result of handling a single DAP request. The transport layer (TCP or +/// Jupyter) is responsible for delivering the response, DAP events, and console +/// events through the appropriate channel. +pub struct DapOutput { + pub response: Response, + pub dap_events: Vec, + pub console_events: Vec, +} - loop { - // If disconnected, break and accept a new connection to create a new server - if !server.serve() { - log::trace!("DAP: Disconnected from client"); - let mut state = state.lock().unwrap(); - state.is_connected = false; - break; - } - } +impl DapOutput { + pub fn new(response: Response) -> Self { + Self { + response, + dap_events: vec![], + console_events: vec![], + } + } - // Terminate the events thread - let _ = done_tx.send(true); - }); + pub fn error(req: Request, message: &str) -> Self { + Self::new(req.error(message)) } } -// Thread that listens for events sent by the backend, usually the -// `ReadConsole()` method. These are forwarded to the DAP client. -fn listen_dap_events( - output: Arc>>, - backend_events_rx: Receiver, - responses_rx: Receiver, - done_rx: Receiver, -) { - loop { - select!( - recv(backend_events_rx) -> event => { - let event = match event { - Ok(event) => event, - Err(err) => { - // Channel closed, sender dropped - log::info!("DAP: Event channel closed: {err:?}"); - return; - }, - }; - - log::trace!("DAP: Got event from backend: {:?}", event); - - let event = match event { - DapBackendEvent::Continued => { - Event::Continued(ContinuedEventBody { - thread_id: THREAD_ID, - all_threads_continued: Some(true) - }) - }, - - DapBackendEvent::Stopped => { - Event::Stopped(StoppedEventBody { - reason: StoppedEventReason::Step, - description: None, - thread_id: Some(THREAD_ID), - preserve_focus_hint: Some(false), - text: None, - all_threads_stopped: Some(true), - hit_breakpoint_ids: None, - }) - }, - - DapBackendEvent::Exception(DapExceptionEvent { class, message }) => { - let text = format!("<{class}>\n{message}"); - Event::Stopped(StoppedEventBody { - reason: StoppedEventReason::Exception, - description: Some(message), - thread_id: Some(THREAD_ID), - preserve_focus_hint: Some(false), - text: Some(text), - all_threads_stopped: Some(true), - hit_breakpoint_ids: None, - }) - }, - - DapBackendEvent::Invalidated => { - Event::Invalidated(InvalidatedEventBody { - areas: Some(vec![types::InvalidatedAreas::Variables]), - thread_id: Some(THREAD_ID), - stack_frame_id: None, - }) - }, - - DapBackendEvent::Terminated => { - Event::Terminated(None) - }, - - DapBackendEvent::BreakpointState { id, line, verified, message } => { - Event::Breakpoint(BreakpointEventBody { - reason: BreakpointEventReason::Changed, - breakpoint: dap::types::Breakpoint { - id: Some(id), - line: Some(Breakpoint::to_dap_line(line)), - verified, - message, - ..Default::default() - }, - }) - }, - }; - - let mut output = output.lock().unwrap(); - if let Err(err) = output.send_event(event) { - log::warn!("DAP: Failed to send event, closing: {err:?}"); - return; - } - }, - - recv(responses_rx) -> response => { - let response = match response { - Ok(response) => response, - Err(err) => { - log::info!("DAP: Responses channel closed: {err:?}"); - return; - }, - }; - - log::trace!("DAP: Sending async response: {:?}", response); - - let mut output = output.lock().unwrap(); - if let Err(err) = output.send(Sendable::Response(response)) { - log::warn!("DAP: Failed to send response, closing: {err:?}"); - return; - } - }, +/// The result of a handler method. Contains a `ResponseBody` (not a full +/// `Response`) plus any events. The dispatcher wraps the body with +/// `req.success()` or `req.error()` to produce a transport-ready `DapOutput`. +pub(crate) struct DapHandlerOutput { + pub body: ResponseBody, + pub dap_events: Vec, + pub console_events: Vec, +} - // Break the loop and terminate the thread - recv(done_rx) -> _ => { return; }, - ) +impl DapHandlerOutput { + fn new(body: ResponseBody) -> Self { + Self { + body, + dap_events: vec![], + console_events: vec![], + } } } -pub struct DapServer { - server: Server, - pub output: Arc>>, - state: Arc>, - r_request_tx: Sender, - comm_tx: Option, - responses_tx: Sender, +/// Transport-agnostic handler for DAP requests. Translates each request +/// into a [`DapOutput`] containing the protocol response, any DAP events, +/// and console events that the transport layer is responsible for delivering. +pub struct DapHandler { + pub(crate) state: Arc>, + pub(crate) r_request_tx: Sender, } -impl DapServer { - pub fn new( - reader: BufReader, - writer: BufWriter, - state: Arc>, - r_request_tx: Sender, - comm_tx: CommOutgoingTx, - responses_tx: Sender, - ) -> Self { - let server = Server::new(reader, writer); - let output = server.output.clone(); +impl DapHandler { + pub fn new(state: Arc>, r_request_tx: Sender) -> Self { Self { - server, - output, state, r_request_tx, - comm_tx: Some(comm_tx), - responses_tx, } } - pub fn serve(&mut self) -> bool { - log::trace!("DAP: Polling"); - let req = match self.server.poll_request() { - Ok(Some(req)) => req, - Ok(None) => return false, - Err(err) => { - log::warn!("DAP: Connection closed: {err:?}"); - return false; - }, - }; - log::trace!("DAP: Got request: {:#?}", req); - + /// Dispatch a parsed DAP request to the appropriate handler. + /// + /// `Evaluate` is intentionally not handled here because it requires + /// transport-specific async response delivery. + pub fn dispatch(&self, req: Request) -> DapOutput { let cmd = req.command.clone(); let result = match cmd { - Command::Initialize(args) => self.handle_initialize(req, args), - Command::Attach(args) => self.handle_attach(req, args), - Command::Disconnect(args) => self.handle_disconnect(req, args), - Command::Restart(args) => self.handle_restart(req, args), - Command::Threads => self.handle_threads(req), - Command::SetBreakpoints(args) => self.handle_set_breakpoints(req, args), - Command::SetExceptionBreakpoints(args) => { - self.handle_set_exception_breakpoints(req, args) - }, - Command::StackTrace(args) => self.handle_stacktrace(req, args), - Command::Source(args) => self.handle_source(req, args), - Command::Scopes(args) => self.handle_scopes(req, args), - Command::Variables(args) => self.handle_variables(req, args), - Command::Evaluate(args) => self.handle_evaluate(req, args), + Command::Initialize(args) => self.handle_initialize(args), + Command::Attach(args) => self.handle_attach(args), + Command::Disconnect(args) => self.handle_disconnect(args), + Command::Restart(args) => self.handle_restart(args), + Command::Threads => self.handle_threads(), + Command::SetBreakpoints(args) => self.handle_set_breakpoints(args), + Command::SetExceptionBreakpoints(args) => self.handle_set_exception_breakpoints(args), + Command::StackTrace(args) => self.handle_stacktrace(args), + Command::Source(args) => self.handle_source(args), + Command::Scopes(args) => self.handle_scopes(args), + Command::Variables(args) => self.handle_variables(args), Command::Continue(args) => { let resp = ResponseBody::Continue(ContinueResponse { all_threads_continued: Some(true), }); - self.handle_step(req, args, DebugRequest::Continue, resp) - }, - Command::Next(args) => { - self.handle_step(req, args, DebugRequest::Next, ResponseBody::Next) + self.handle_step(args, DebugRequest::Continue, resp) }, + Command::Next(args) => self.handle_step(args, DebugRequest::Next, ResponseBody::Next), Command::StepIn(args) => { - self.handle_step(req, args, DebugRequest::StepIn, ResponseBody::StepIn) + self.handle_step(args, DebugRequest::StepIn, ResponseBody::StepIn) }, Command::StepOut(args) => { - self.handle_step(req, args, DebugRequest::StepOut, ResponseBody::StepOut) + self.handle_step(args, DebugRequest::StepOut, ResponseBody::StepOut) }, - Command::Pause(args) => self.handle_pause(req, args), + Command::Pause(args) => self.handle_pause(args), _ => { log::warn!("DAP: Unknown request"); - let rsp = req.error("Ark DAP: Unknown request"); - self.respond(rsp) + return DapOutput::error(req, "Ark DAP: Unknown request"); }, }; - if let Err(err) = result { - log::warn!("DAP: Handler failed, closing connection: {err:?}"); - return false; + match result { + Ok(output) => DapOutput { + response: req.success(output.body), + dap_events: output.dap_events, + console_events: output.console_events, + }, + Err(err) => DapOutput::error(req, &format!("{err}")), } - - true - } - - fn respond(&mut self, rsp: Response) -> Result<(), ServerError> { - log::trace!("DAP: Responding to request: {rsp:#?}"); - self.server.respond(rsp) } - fn send_event(&mut self, event: Event) -> Result<(), ServerError> { - log::trace!("DAP: Sending event: {event:#?}"); - self.server.send_event(event) - } - - fn handle_initialize( - &mut self, - req: Request, - _args: InitializeArguments, - ) -> Result<(), ServerError> { - let rsp = req.success(ResponseBody::Initialize(types::Capabilities { + fn handle_initialize(&self, _args: InitializeArguments) -> anyhow::Result { + let body = ResponseBody::Initialize(types::Capabilities { supports_restart_request: Some(true), supports_exception_info_request: Some(false), exception_breakpoint_filters: Some(vec![ @@ -410,9 +212,12 @@ impl DapServer { supports_hit_conditional_breakpoints: Some(true), supports_log_points: Some(true), ..Default::default() - })); - self.respond(rsp)?; - self.send_event(Event::Initialized) + }); + Ok(DapHandlerOutput { + body, + dap_events: vec![Event::Initialized], + console_events: vec![], + }) } // Handle SetBreakpoints requests from the frontend. @@ -431,14 +236,11 @@ impl DapServer { // from the request). We preserve verified breakpoints as Disabled so we // can restore their state when re-enabled without requiring re-sourcing. fn handle_set_breakpoints( - &mut self, - req: Request, + &self, args: SetBreakpointsArguments, - ) -> Result<(), ServerError> { + ) -> anyhow::Result { let Some(path) = args.source.path.as_ref() else { - // We don't currently have virtual documents managed via source references - log::warn!("Missing a path to set breakpoints for."); - return self.respond(req.error("Missing a path to set breakpoints for")); + return Err(anyhow::anyhow!("Missing a path to set breakpoints for")); }; // We currently only support "path" URIs as Positron never sends URIs. @@ -448,10 +250,11 @@ impl DapServer { Ok(uri) => uri, Err(err) => { log::warn!("Can't set breakpoints for non-file path: '{path}': {err}"); - let rsp = req.success(ResponseBody::SetBreakpoints(SetBreakpointsResponse { - breakpoints: vec![], - })); - return self.respond(rsp); + return Ok(DapHandlerOutput::new(ResponseBody::SetBreakpoints( + SetBreakpointsResponse { + breakpoints: vec![], + }, + ))); }, }; @@ -477,10 +280,9 @@ impl DapServer { }) .collect(); - let rsp = req.success(ResponseBody::SetBreakpoints(SetBreakpointsResponse { - breakpoints, - })); - return self.respond(rsp); + return Ok(DapHandlerOutput::new(ResponseBody::SetBreakpoints( + SetBreakpointsResponse { breakpoints }, + ))); }, }; @@ -608,211 +410,579 @@ impl DapServer { new_breakpoints ); - let response_breakpoints: Vec = new_breakpoints - .iter() - .filter(|bp| !matches!(bp.state, BreakpointState::Disabled)) - .map(|bp| { - let message = match &bp.state { - BreakpointState::Invalid(reason) => Some(reason.message().to_string()), - _ => None, + let response_breakpoints: Vec = new_breakpoints + .iter() + .filter(|bp| !matches!(bp.state, BreakpointState::Disabled)) + .map(|bp| { + let message = match &bp.state { + BreakpointState::Invalid(reason) => Some(reason.message().to_string()), + _ => None, + }; + dap::types::Breakpoint { + id: Some(bp.id), + verified: matches!(bp.state, BreakpointState::Verified), + line: Some(Breakpoint::to_dap_line(bp.line)), + message, + ..Default::default() + } + }) + .collect(); + + state.breakpoints.insert(uri, (doc_hash, new_breakpoints)); + + drop(state); + + Ok(DapHandlerOutput::new(ResponseBody::SetBreakpoints( + SetBreakpointsResponse { + breakpoints: response_breakpoints, + }, + ))) + } + + fn handle_attach(&self, _args: AttachRequestArguments) -> anyhow::Result { + Ok(DapHandlerOutput { + body: ResponseBody::Attach, + dap_events: vec![Event::Thread(ThreadEventBody { + reason: ThreadEventReason::Started, + thread_id: THREAD_ID, + })], + console_events: vec![], + }) + } + + fn handle_disconnect(&self, _args: DisconnectArguments) -> anyhow::Result { + // Only send `Q` if currently in a debugging session. + let is_debugging = { self.state.lock().unwrap().is_debugging }; + let console_events = if is_debugging { + vec![DapConsoleEvent::DebugCommand(DebugRequest::Quit)] + } else { + vec![] + }; + + Ok(DapHandlerOutput { + body: ResponseBody::Disconnect, + dap_events: vec![], + console_events, + }) + } + + fn handle_restart(&self, _args: T) -> anyhow::Result { + Ok(DapHandlerOutput { + body: ResponseBody::Restart, + dap_events: vec![], + console_events: vec![DapConsoleEvent::Restart], + }) + } + + // All servers must respond to `Threads` requests, possibly with + // a dummy thread as is the case here + fn handle_threads(&self) -> anyhow::Result { + Ok(DapHandlerOutput::new(ResponseBody::Threads( + ThreadsResponse { + threads: vec![Thread { + id: THREAD_ID, + name: String::from("R console"), + }], + }, + ))) + } + + fn handle_set_exception_breakpoints( + &self, + args: SetExceptionBreakpointsArguments, + ) -> anyhow::Result { + { + let mut state = self.state.lock().unwrap(); + state.exception_breakpoint_filters = args.filters; + } + Ok(DapHandlerOutput::new( + ResponseBody::SetExceptionBreakpoints(SetExceptionBreakpointsResponse { + // This field is only useful for reporting problems with + // individual filters. Since we always accept all filters, + // `None` is fine here. + breakpoints: None, + }), + )) + } + + fn handle_stacktrace(&self, args: StackTraceArguments) -> anyhow::Result { + let stack = { + let state = self.state.lock().unwrap(); + let fallback_sources = &state.fallback_sources; + match &state.stack { + Some(stack) => stack + .iter() + .map(|frame| into_dap_frame(frame, fallback_sources)) + .collect(), + _ => vec![], + } + }; + + // Slice the stack as requested + let n_usize = stack.len(); + + let start_frame = args.start_frame.unwrap_or(0); + let Ok(start) = usize::try_from(start_frame) else { + return Err(anyhow::anyhow!("Invalid start_frame: {start_frame}")); + }; + let start = std::cmp::min(start, n_usize); + + let end = if let Some(levels) = args.levels { + let Ok(levels) = usize::try_from(levels) else { + return Err(anyhow::anyhow!("Invalid levels: {levels}")); + }; + std::cmp::min(start.saturating_add(levels), n_usize) + } else { + n_usize + }; + + let Ok(total_frames) = i64::try_from(n_usize) else { + return Err(anyhow::anyhow!( + "Stack frame count overflows i64: {n_usize}" + )); + }; + let stack = stack[start..end].to_vec(); + + Ok(DapHandlerOutput::new(ResponseBody::StackTrace( + StackTraceResponse { + stack_frames: stack, + total_frames: Some(total_frames), + }, + ))) + } + + fn handle_source(&self, _args: SourceArguments) -> anyhow::Result { + Err(anyhow::anyhow!("Unsupported `source` request")) + } + + fn handle_scopes(&self, args: ScopesArguments) -> anyhow::Result { + let state = self.state.lock().unwrap(); + let frame_id_to_variables_reference = &state.frame_id_to_variables_reference; + + // Entirely possible that the requested `frame_id` doesn't have any + // variables (like the top most frame where the call was made). We send + // back `0` in those cases, which is an indication of "no variables". + let variables_reference = frame_id_to_variables_reference + .get(&args.frame_id) + .copied() + .unwrap_or(0); + + // Only 1 overarching scope for now + let scopes = vec![Scope { + name: String::from("Locals"), + presentation_hint: Some(ScopePresentationhint::Locals), + variables_reference, + named_variables: None, + indexed_variables: None, + expensive: false, + source: None, + line: None, + column: None, + end_line: None, + end_column: None, + }]; + + drop(state); + Ok(DapHandlerOutput::new(ResponseBody::Scopes( + ScopesResponse { scopes }, + ))) + } + + fn handle_variables(&self, args: VariablesArguments) -> anyhow::Result { + let variables_reference = args.variables_reference; + let variables = self.collect_r_variables(variables_reference); + let variables = self.make_variables(variables); + Ok(DapHandlerOutput::new(ResponseBody::Variables( + VariablesResponse { variables }, + ))) + } + + fn collect_r_variables(&self, variables_reference: i64) -> Vec { + // Wait until we're in the `r_task()` to lock + // See https://github.com/posit-dev/positron/issues/5024 + let state = self.state.clone(); + + let variables = r_task(move || { + let state = state.lock().unwrap(); + let variables_reference_to_r_object = &state.variables_reference_to_r_object; + + let Some(object) = variables_reference_to_r_object.get(&variables_reference) else { + log::error!( + "Failed to locate R object for `variables_reference` {variables_reference}." + ); + return Vec::new(); + }; + + let object = object.get(); + object_variables(object.sexp) + }); + + variables + } + + fn make_variables(&self, variables: Vec) -> Vec { + self.state.lock().unwrap().make_variables(variables) + } + + fn handle_step( + &self, + _args: A, + cmd: DebugRequest, + resp: ResponseBody, + ) -> anyhow::Result { + Ok(DapHandlerOutput { + body: resp, + dap_events: vec![], + console_events: vec![DapConsoleEvent::DebugCommand(cmd)], + }) + } + + fn handle_pause(&self, _args: PauseArguments) -> anyhow::Result { + self.state.lock().unwrap().is_interrupting_for_debugger = true; + + log::info!("DAP: Received request to pause R, sending interrupt"); + + Ok(DapHandlerOutput { + body: ResponseBody::Pause, + dap_events: vec![], + console_events: vec![DapConsoleEvent::Interrupt], + }) + } +} + +// TODO: Handle comm close to shut down the DAP server thread. +// +// The DAP comm is allowed to persist across TCP sessions. This supports session +// switching on the frontend. Ideally the frontend would be allowed to close the +// DAP comm in addition to the DAP TCP connection, which would shut down the DAP +// server. To achive this, the DAP server, once disconnected should wait for both +// the connection becoming ready and a channel event signalling comm close. If +// the latter fires, shut the server down. + +pub fn start_dap( + state: Arc>, + server_start: ServerStartMessage, + server_started_tx: Sender, + r_request_tx: Sender, + comm_tx: CommOutgoingTx, +) { + let ip_address = server_start.ip_address(); + + // Binding to port `0` to allow the OS to allocate a port for us to bind to + let listener = TcpListener::bind(format!("{ip_address}:0",)).unwrap(); + + let address = match listener.local_addr() { + Ok(address) => address, + Err(error) => { + log::error!("DAP: Failed to bind to {ip_address}:0: {error}"); + return; + }, + }; + + // Get the OS allocated port + let port = address.port(); + + log::trace!("DAP: Thread starting at address {ip_address}:{port}."); + + // Send the port back to `Shell` and eventually out to the frontend so it can connect + server_started_tx + .send(ServerStartedMessage::new(port)) + .log_err(); + + loop { + log::trace!("DAP: Waiting for client"); + + let stream = match listener.accept() { + Ok((stream, addr)) => { + log::info!("DAP: Connected to client {addr:?}"); + + let mut state = state.lock().unwrap(); + state.is_connected = true; + + stream + }, + Err(e) => { + log::error!("DAP: Can't get client: {e:?}"); + continue; + }, + }; + + let reader = BufReader::new(&stream); + let writer = BufWriter::new(&stream); + let (responses_tx, responses_rx) = unbounded::(); + + let mut server = DapServer::new( + reader, + writer, + state.clone(), + r_request_tx.clone(), + comm_tx.clone(), + responses_tx, + ); + + let (backend_events_tx, backend_events_rx) = unbounded::(); + let (done_tx, done_rx) = bounded::(0); + let output_clone = server.output.clone(); + + // We need a scope to let the borrow checker know that + // `output_clone` drops before the next iteration (it gets tangled + // to the stack variable `stream` through `server`) + let _ = crossbeam::thread::scope(|scope| { + spawn!(scope, "ark-dap-events", { + move |_| listen_dap_events(output_clone, backend_events_rx, responses_rx, done_rx) + }); + + // Connect the backend to the events thread + state.lock().unwrap().backend_events_tx = Some(backend_events_tx); + + loop { + // If disconnected, break and accept a new connection to create a new server + if !server.serve() { + log::trace!("DAP: Disconnected from client"); + let mut state = state.lock().unwrap(); + state.is_connected = false; + break; + } + } + + // Terminate the events thread + let _ = done_tx.send(true); + }); + } +} + +// Thread that listens for events sent by the backend, usually the +// `ReadConsole()` method. These are forwarded to the DAP client. +fn listen_dap_events( + output: Arc>>, + backend_events_rx: Receiver, + responses_rx: Receiver, + done_rx: Receiver, +) { + loop { + select!( + recv(backend_events_rx) -> event => { + let event = match event { + Ok(event) => event, + Err(err) => { + // Channel closed, sender dropped + log::info!("DAP: Event channel closed: {err:?}"); + return; + }, + }; + + log::trace!("DAP: Got event from backend: {:?}", event); + + let event = match event { + DapBackendEvent::Continued => { + Event::Continued(ContinuedEventBody { + thread_id: THREAD_ID, + all_threads_continued: Some(true) + }) + }, + + DapBackendEvent::Stopped => { + Event::Stopped(StoppedEventBody { + reason: StoppedEventReason::Step, + description: None, + thread_id: Some(THREAD_ID), + preserve_focus_hint: Some(false), + text: None, + all_threads_stopped: Some(true), + hit_breakpoint_ids: None, + }) + }, + + DapBackendEvent::Exception(DapExceptionEvent { class, message }) => { + let text = format!("<{class}>\n{message}"); + Event::Stopped(StoppedEventBody { + reason: StoppedEventReason::Exception, + description: Some(message), + thread_id: Some(THREAD_ID), + preserve_focus_hint: Some(false), + text: Some(text), + all_threads_stopped: Some(true), + hit_breakpoint_ids: None, + }) + }, + + DapBackendEvent::Invalidated => { + Event::Invalidated(InvalidatedEventBody { + areas: Some(vec![types::InvalidatedAreas::Variables]), + thread_id: Some(THREAD_ID), + stack_frame_id: None, + }) + }, + + DapBackendEvent::Terminated => { + Event::Terminated(None) + }, + + DapBackendEvent::BreakpointState { id, line, verified, message } => { + Event::Breakpoint(BreakpointEventBody { + reason: BreakpointEventReason::Changed, + breakpoint: dap::types::Breakpoint { + id: Some(id), + line: Some(Breakpoint::to_dap_line(line)), + verified, + message, + ..Default::default() + }, + }) + }, }; - dap::types::Breakpoint { - id: Some(bp.id), - verified: matches!(bp.state, BreakpointState::Verified), - line: Some(Breakpoint::to_dap_line(bp.line)), - message, - ..Default::default() + + let mut output = output.lock().unwrap(); + if let Err(err) = output.send_event(event) { + log::warn!("DAP: Failed to send event, closing: {err:?}"); + return; } - }) - .collect(); + }, - state.breakpoints.insert(uri, (doc_hash, new_breakpoints)); + recv(responses_rx) -> response => { + let response = match response { + Ok(response) => response, + Err(err) => { + log::info!("DAP: Responses channel closed: {err:?}"); + return; + }, + }; - drop(state); + log::trace!("DAP: Sending async response: {:?}", response); - let rsp = req.success(ResponseBody::SetBreakpoints(SetBreakpointsResponse { - breakpoints: response_breakpoints, - })); + let mut output = output.lock().unwrap(); + if let Err(err) = output.send(Sendable::Response(response)) { + log::warn!("DAP: Failed to send response, closing: {err:?}"); + return; + } + }, - self.respond(rsp) + // Break the loop and terminate the thread + recv(done_rx) -> _ => { return; }, + ) } +} - fn handle_attach( - &mut self, - req: Request, - _args: AttachRequestArguments, - ) -> Result<(), ServerError> { - let rsp = req.success(ResponseBody::Attach); - self.respond(rsp)?; - - self.send_event(Event::Thread(ThreadEventBody { - reason: ThreadEventReason::Started, - thread_id: THREAD_ID, - })) - } +pub struct DapServer { + server: Server, + pub output: Arc>>, + pub(crate) handler: DapHandler, + comm_tx: Option, + responses_tx: Sender, +} - fn handle_disconnect( - &mut self, - req: Request, - _args: DisconnectArguments, - ) -> Result<(), ServerError> { - // Only send `Q` if currently in a debugging session. - let is_debugging = { self.state.lock().unwrap().is_debugging }; - if is_debugging { - self.send_command(DebugRequest::Quit); +impl DapServer { + pub fn new( + reader: BufReader, + writer: BufWriter, + state: Arc>, + r_request_tx: Sender, + comm_tx: CommOutgoingTx, + responses_tx: Sender, + ) -> Self { + let server = Server::new(reader, writer); + let output = server.output.clone(); + let handler = DapHandler::new(state, r_request_tx); + Self { + server, + output, + handler, + comm_tx: Some(comm_tx), + responses_tx, } - - let rsp = req.success(ResponseBody::Disconnect); - self.respond(rsp) } - fn handle_restart(&mut self, req: Request, _args: T) -> Result<(), ServerError> { - // If connected to Positron, forward the restart command to the - // frontend. Otherwise ignore it. - if let Some(tx) = &self.comm_tx { - let msg = amalthea::comm_rpc_message!("restart"); - tx.send(msg).log_err(); - } + pub fn serve(&mut self) -> bool { + log::trace!("DAP: Polling"); + let req = match self.server.poll_request() { + Ok(Some(req)) => req, + Ok(None) => return false, + Err(err) => { + log::warn!("DAP: Connection closed: {err:?}"); + return false; + }, + }; + log::trace!("DAP: Got request: {:#?}", req); - let rsp = req.success(ResponseBody::Restart); - self.respond(rsp) - } + // Evaluate is async: the response is sent later via `responses_tx`. + // It is the only command that needs transport-specific handling. + if matches!(&req.command, Command::Evaluate(_)) { + let Command::Evaluate(args) = req.command.clone() else { + unreachable!() + }; + if let Err(err) = self.handle_evaluate(req, args) { + log::warn!("DAP: Handler failed, closing connection: {err:?}"); + return false; + } + return true; + } - // All servers must respond to `Threads` requests, possibly with - // a dummy thread as is the case here - fn handle_threads(&mut self, req: Request) -> Result<(), ServerError> { - let rsp = req.success(ResponseBody::Threads(ThreadsResponse { - threads: vec![Thread { - id: THREAD_ID, - name: String::from("R console"), - }], - })); - self.respond(rsp) + let output = self.handler.dispatch(req); + self.deliver(output) } - fn handle_set_exception_breakpoints( - &mut self, - req: Request, - args: SetExceptionBreakpointsArguments, - ) -> Result<(), ServerError> { - { - let mut state = self.state.lock().unwrap(); - state.exception_breakpoint_filters = args.filters; + fn deliver(&mut self, output: DapOutput) -> bool { + if self.respond(output.response).log_err().is_none() { + return false; } - let rsp = req.success(ResponseBody::SetExceptionBreakpoints( - SetExceptionBreakpointsResponse { - // This field is only useful for reporting problems with - // individual filters. Since we always accept all filters, - // `None` is fine here. - breakpoints: None, - }, - )); - self.respond(rsp) - } - fn handle_stacktrace( - &mut self, - req: Request, - args: StackTraceArguments, - ) -> Result<(), ServerError> { - let stack = { - let state = self.state.lock().unwrap(); - let fallback_sources = &state.fallback_sources; - match &state.stack { - Some(stack) => stack - .iter() - .map(|frame| into_dap_frame(frame, fallback_sources)) - .collect(), - _ => vec![], + for event in output.dap_events { + if self.send_event(event).log_err().is_none() { + return false; } - }; - - // Slice the stack as requested - let n_usize = stack.len(); - - let start_frame = args.start_frame.unwrap_or(0); - let Ok(start) = usize::try_from(start_frame) else { - let rsp = req.error(&format!("Invalid start_frame: {start_frame}")); - return self.respond(rsp); - }; - let start = std::cmp::min(start, n_usize); - - let end = if let Some(levels) = args.levels { - let Ok(levels) = usize::try_from(levels) else { - let rsp = req.error(&format!("Invalid levels: {levels}")); - return self.respond(rsp); - }; - std::cmp::min(start.saturating_add(levels), n_usize) - } else { - n_usize - }; - - let Ok(total_frames) = i64::try_from(n_usize) else { - let rsp = req.error(&format!("Stack frame count overflows i64: {n_usize}")); - return self.respond(rsp); - }; - let stack = stack[start..end].to_vec(); + } - let rsp = req.success(ResponseBody::StackTrace(StackTraceResponse { - stack_frames: stack, - total_frames: Some(total_frames), - })); + for event in output.console_events { + self.handle_console_event(event); + } - self.respond(rsp) + true } - fn handle_source(&mut self, req: Request, _args: SourceArguments) -> Result<(), ServerError> { - let message = "Unsupported `source` request: {req:?}"; - log::error!("{message}"); - let rsp = req.error(message); - self.respond(rsp) + fn handle_console_event(&mut self, event: DapConsoleEvent) { + match event { + DapConsoleEvent::DebugCommand(cmd) => { + if let Some(tx) = &self.comm_tx { + // If we have a comm channel (always the case as of this + // writing) we are connected to Positron or similar. Send + // control events so that the IDE can execute these as if they + // were sent by the user. This ensures prompts are updated. + let msg = amalthea::comm_rpc_message!( + "execute", + command = debug_request_command(cmd) + ); + tx.send(msg).log_err(); + } else { + self.handler + .r_request_tx + .send(RRequest::DebugCommand(cmd)) + .log_err(); + } + }, + DapConsoleEvent::Interrupt => { + crate::sys::control::handle_interrupt_request(); + }, + DapConsoleEvent::Restart => { + if let Some(tx) = &self.comm_tx { + let msg = amalthea::comm_rpc_message!("restart"); + tx.send(msg).log_err(); + } + }, + } } - fn handle_scopes(&mut self, req: Request, args: ScopesArguments) -> Result<(), ServerError> { - let state = self.state.lock().unwrap(); - let frame_id_to_variables_reference = &state.frame_id_to_variables_reference; - - // Entirely possible that the requested `frame_id` doesn't have any - // variables (like the top most frame where the call was made). We send - // back `0` in those cases, which is an indication of "no variables". - let variables_reference = frame_id_to_variables_reference - .get(&args.frame_id) - .copied() - .unwrap_or(0); - - // Only 1 overarching scope for now - let scopes = vec![Scope { - name: String::from("Locals"), - presentation_hint: Some(ScopePresentationhint::Locals), - variables_reference, - named_variables: None, - indexed_variables: None, - expensive: false, - source: None, - line: None, - column: None, - end_line: None, - end_column: None, - }]; - - let rsp = req.success(ResponseBody::Scopes(ScopesResponse { scopes })); - - drop(state); - self.respond(rsp) + fn respond(&mut self, rsp: Response) -> Result<(), ServerError> { + log::trace!("DAP: Responding to request: {rsp:#?}"); + self.server.respond(rsp) } - fn handle_variables( - &mut self, - req: Request, - args: VariablesArguments, - ) -> Result<(), ServerError> { - let variables_reference = args.variables_reference; - let variables = self.collect_r_variables(variables_reference); - let variables = self.make_variables(variables); - let rsp = req.success(ResponseBody::Variables(VariablesResponse { variables })); - self.respond(rsp) + fn send_event(&mut self, event: Event) -> Result<(), ServerError> { + log::trace!("DAP: Sending event: {event:#?}"); + self.server.send_event(event) } + // Tied to the TCP transport for now. For Jupyter we need to figure out how + // to do async responses with Jupyter's Control channel. fn handle_evaluate( &mut self, req: Request, @@ -820,7 +990,7 @@ impl DapServer { ) -> Result<(), ServerError> { let expression = args.expression; let frame_id = args.frame_id; - let state = self.state.clone(); + let state = self.handler.state.clone(); let responses_tx = self.responses_tx.clone(); log::trace!("DAP: Spawning idle task for evaluate"); @@ -864,70 +1034,6 @@ impl DapServer { Ok(()) } - - fn collect_r_variables(&self, variables_reference: i64) -> Vec { - // Wait until we're in the `r_task()` to lock - // See https://github.com/posit-dev/positron/issues/5024 - let state = self.state.clone(); - - let variables = r_task(move || { - let state = state.lock().unwrap(); - let variables_reference_to_r_object = &state.variables_reference_to_r_object; - - let Some(object) = variables_reference_to_r_object.get(&variables_reference) else { - log::error!( - "Failed to locate R object for `variables_reference` {variables_reference}." - ); - return Vec::new(); - }; - - let object = object.get(); - object_variables(object.sexp) - }); - - variables - } - - fn make_variables(&self, variables: Vec) -> Vec { - self.state.lock().unwrap().make_variables(variables) - } - - fn handle_step( - &mut self, - req: Request, - _args: A, - cmd: DebugRequest, - resp: ResponseBody, - ) -> Result<(), ServerError> { - self.send_command(cmd); - let rsp = req.success(resp); - self.respond(rsp) - } - - fn handle_pause(&mut self, req: Request, _args: PauseArguments) -> Result<(), ServerError> { - self.state.lock().unwrap().is_interrupting_for_debugger = true; - - log::info!("DAP: Received request to pause R, sending interrupt"); - crate::sys::control::handle_interrupt_request(); - - let rsp = req.success(ResponseBody::Pause); - self.respond(rsp) - } - - fn send_command(&mut self, cmd: DebugRequest) { - if let Some(tx) = &self.comm_tx { - // If we have a comm channel (always the case as of this - // writing) we are connected to Positron or similar. Send - // control events so that the IDE can execute these as if they - // were sent by the user. This ensures prompts are updated. - let msg = amalthea::comm_rpc_message!("execute", command = debug_request_command(cmd)); - - tx.send(msg).log_err(); - } else { - // Otherwise, send command to R's `ReadConsole()` frontend method - self.r_request_tx.send(RRequest::DebugCommand(cmd)).unwrap(); - } - } } fn into_dap_frame(frame: &FrameInfo, fallback_sources: &HashMap) -> StackFrame {