From 06bb410fb1c67062328daef3a79d0bc32bd96e19 Mon Sep 17 00:00:00 2001 From: Lionel Henry Date: Tue, 10 Mar 2026 14:37:15 +0100 Subject: [PATCH 1/3] Run UI comm on the R thread --- crates/ark/src/console.rs | 10 +- crates/ark/src/console/console_comm.rs | 33 + crates/ark/src/console/console_integration.rs | 86 +-- crates/ark/src/console/console_repl.rs | 36 +- crates/ark/src/lsp/backend.rs | 4 +- crates/ark/src/request.rs | 14 +- crates/ark/src/shell.rs | 39 +- crates/ark/src/ui/events.rs | 54 +- crates/ark/src/ui/mod.rs | 7 +- crates/ark/src/ui/sender.rs | 103 ---- crates/ark/src/ui/ui.rs | 582 +++++++----------- crates/ark/src/viewer.rs | 9 +- crates/ark/tests/evaluate-code.rs | 9 +- crates/ark_test/src/dummy_frontend.rs | 56 +- 14 files changed, 397 insertions(+), 645 deletions(-) delete mode 100644 crates/ark/src/ui/sender.rs diff --git a/crates/ark/src/console.rs b/crates/ark/src/console.rs index 05efb80f1..09ea82f92 100644 --- a/crates/ark/src/console.rs +++ b/crates/ark/src/console.rs @@ -128,7 +128,6 @@ pub(crate) use console_repl::ConsoleNotification; pub(crate) use console_repl::ConsoleOutputCapture; pub(crate) use console_repl::KernelInfo; use console_repl::PendingInputs; -use console_repl::PromptInfo; use console_repl::ReadConsolePendingAction; pub use console_repl::SessionMode; @@ -166,8 +165,7 @@ use crate::srcref::ns_populate_srcref; use crate::srcref::resource_loaded_namespaces; use crate::startup; use crate::sys::console::console_to_utf8; -use crate::ui::UiCommMessage; -use crate::ui::UiCommSender; +use crate::ui::send_ui_event; use crate::url::UrlId; thread_local! { @@ -224,9 +222,9 @@ pub(crate) struct Console { tasks_idle_any_rx: Receiver, pending_futures: HashMap, RTaskStartInfo, Option)>, - /// Channel to communicate requests and events to the frontend - /// by forwarding them through the UI comm. Optional, and really Positron specific. - ui_comm_tx: Option, + /// The comm ID of the currently connected UI comm, if any. + /// Used for fast lookup when sending events to the frontend. + ui_comm_id: Option, /// Error captured by our global condition handler during the last iteration /// of the REPL. diff --git a/crates/ark/src/console/console_comm.rs b/crates/ark/src/console/console_comm.rs index f10fbf754..5e01a619c 100644 --- a/crates/ark/src/console/console_comm.rs +++ b/crates/ark/src/console/console_comm.rs @@ -8,6 +8,7 @@ use amalthea::comm::comm_channel::CommMsg; use amalthea::comm::event::CommEvent; use amalthea::socket::comm::CommInitiator; +use amalthea::socket::comm::CommOutgoingTx; use amalthea::socket::comm::CommSocket; use stdext::result::ResultExt; use uuid::Uuid; @@ -17,6 +18,7 @@ use crate::comm_handler::CommHandlerContext; use crate::comm_handler::ConsoleComm; use crate::comm_handler::EnvironmentChanged; use crate::console::Console; +use crate::ui::UI_COMM_NAME; impl Console { pub(super) fn comm_handle_msg(&mut self, comm_id: &str, msg: CommMsg) { @@ -29,6 +31,11 @@ impl Console { } pub(super) fn comm_handle_close(&mut self, comm_id: &str) { + // Clear UI comm ID if this is the UI comm being closed + if self.ui_comm_id.as_deref() == Some(comm_id) { + self.ui_comm_id = None; + } + let Some(mut reg) = self.comms.remove(comm_id) else { log::warn!("Received close for unknown registered comm {comm_id}"); return; @@ -67,6 +74,32 @@ impl Console { Ok(comm_id) } + /// Register a frontend-initiated comm on the R thread. + /// + /// Unlike `comm_register` (which is for backend-initiated comms and sends + /// `CommEvent::Opened`), this is called when the frontend opened the comm. + /// The `CommSocket` already exists in amalthea's open_comms list, so we + /// only need to register the handler and call `handle_open`. + pub(super) fn comm_open_frontend( + &mut self, + comm_id: String, + comm_name: &str, + outgoing_tx: CommOutgoingTx, + mut handler: Box, + ) { + let ctx = CommHandlerContext::new(outgoing_tx, self.comm_event_tx.clone()); + handler.handle_open(&ctx); + + if comm_name == UI_COMM_NAME { + if self.ui_comm_id.is_some() { + log::info!("Replacing an existing UI comm."); + } + self.ui_comm_id = Some(comm_id.clone()); + } + + self.comms.insert(comm_id, ConsoleComm { handler, ctx }); + } + pub(super) fn comm_notify_environment_changed(&mut self, event: EnvironmentChanged) { for (_, reg) in self.comms.iter_mut() { reg.handler.handle_environment(event, ®.ctx); diff --git a/crates/ark/src/console/console_integration.rs b/crates/ark/src/console/console_integration.rs index 274357083..3c1b7ac67 100644 --- a/crates/ark/src/console/console_integration.rs +++ b/crates/ark/src/console/console_integration.rs @@ -11,69 +11,24 @@ use super::*; /// UI comm integration. impl Console { - pub(super) fn handle_establish_ui_comm_channel( - &mut self, - ui_comm_tx: Sender, - info: &PromptInfo, - ) { - if self.ui_comm_tx.is_some() { - log::info!("Replacing an existing UI comm channel."); - } - - // Create and store the sender channel - self.ui_comm_tx = Some(UiCommSender::new(ui_comm_tx)); - - // Go ahead and do an initial refresh - self.with_mut_ui_comm_tx(|ui_comm_tx| { - let input_prompt = info.input_prompt.clone(); - let continuation_prompt = info.continuation_prompt.clone(); - - ui_comm_tx.send_refresh(input_prompt, continuation_prompt); - }); - } - pub(crate) fn session_mode(&self) -> SessionMode { self.session_mode } - pub(crate) fn get_ui_comm_tx(&self) -> Option<&UiCommSender> { - self.ui_comm_tx.as_ref() - } - - fn get_mut_ui_comm_tx(&mut self) -> Option<&mut UiCommSender> { - self.ui_comm_tx.as_mut() - } - - pub(super) fn with_ui_comm_tx(&self, f: F) - where - F: FnOnce(&UiCommSender), - { - match self.get_ui_comm_tx() { - Some(ui_comm_tx) => f(ui_comm_tx), - None => { - // Trace level logging, its typically not a bug if the frontend - // isn't connected. Happens in all Jupyter use cases. - log::trace!("UI comm isn't connected, dropping `f`."); - }, - } - } - - pub(super) fn with_mut_ui_comm_tx(&mut self, mut f: F) - where - F: FnMut(&mut UiCommSender), - { - match self.get_mut_ui_comm_tx() { - Some(ui_comm_tx) => f(ui_comm_tx), - None => { - // Trace level logging, its typically not a bug if the frontend - // isn't connected. Happens in all Jupyter use cases. - log::trace!("UI comm isn't connected, dropping `f`."); - }, - } + /// Send a `UiFrontendEvent` to the frontend via the UI comm, if connected. + /// Silently drops the event if no UI comm is open (common in Jupyter). + pub(crate) fn send_ui_event(&self, event: &UiFrontendEvent) { + let Some(reg) = self.comms.get(self.ui_comm_id.as_deref().unwrap_or("")) else { + log::trace!("UI comm isn't connected, dropping event."); + return; + }; + send_ui_event(®.ctx.outgoing_tx, event); } pub(crate) fn is_ui_comm_connected(&self) -> bool { - self.get_ui_comm_tx().is_some() + self.ui_comm_id + .as_deref() + .is_some_and(|id| self.comms.contains_key(id)) } pub(crate) fn call_frontend_method( @@ -82,24 +37,27 @@ impl Console { ) -> anyhow::Result { log::trace!("Calling frontend method {request:?}"); - let ui_comm_tx = self.get_ui_comm_tx().ok_or_else(|| { - anyhow::anyhow!("UI comm is not connected. Can't execute request {request:?}") - })?; + if !self.is_ui_comm_connected() { + return Err(anyhow!( + "UI comm is not connected. Can't execute request {request:?}" + )); + } let (reply_tx, reply_rx) = bounded(1); let Some(req) = &self.active_request else { - return Err(anyhow::anyhow!( + return Err(anyhow!( "No active request. Can't execute request {request:?}" )); }; - // Forward request to UI comm - ui_comm_tx.send_request(UiCommFrontendRequest { + // Forward request directly to the stdin channel + let comm_msg = StdInRequest::Comm(UiCommFrontendRequest { originator: req.originator.clone(), reply_tx, request: request.clone(), }); + self.stdin_request_tx.send(comm_msg)?; // Block for reply let reply = reply_rx.recv().unwrap(); @@ -112,7 +70,7 @@ impl Console { // Deserialize to Rust first to verify the OpenRPC contract. // Errors are propagated to R. if let Err(err) = ui_frontend_reply_from_value(reply.result.clone(), &request) { - return Err(anyhow::anyhow!( + return Err(anyhow!( "Can't deserialize RPC reply for {request:?}:\n{err:?}" )); } @@ -123,7 +81,7 @@ impl Console { JsonRpcReply::Error(reply) => { let message = reply.error.message; - return Err(anyhow::anyhow!( + return Err(anyhow!( "While calling frontend method:\n\ {message}", )); diff --git a/crates/ark/src/console/console_repl.rs b/crates/ark/src/console/console_repl.rs index 167da981d..d0015825d 100644 --- a/crates/ark/src/console/console_repl.rs +++ b/crates/ark/src/console/console_repl.rs @@ -608,7 +608,7 @@ impl Console { active_request: None, execution_count: 0, autoprint_output: String::new(), - ui_comm_tx: None, + ui_comm_id: None, last_error: None, help_event_tx: None, help_port: None, @@ -1197,7 +1197,7 @@ impl Console { Some(exception) } - fn handle_active_request(&mut self, info: &PromptInfo, value: ConsoleValue) { + fn handle_active_request(&mut self, _info: &PromptInfo, value: ConsoleValue) { self.reset_global_env_rdebug(); // If we get here we finished evaluating all pending inputs. Check if we @@ -1209,17 +1209,6 @@ impl Console { return; }; - // Perform a refresh of the frontend state (Prompts, working - // directory, etc) - // TODO: Once the UI comm is migrated to the `CommHandler` path, this - // becomes a `handle_environment` impl reacting to `Execution`. - self.with_mut_ui_comm_tx(|ui_comm_tx| { - let input_prompt = info.input_prompt.clone(); - let continuation_prompt = info.continuation_prompt.clone(); - - ui_comm_tx.send_refresh(input_prompt, continuation_prompt); - }); - // Check for pending graphics updates // (Important that this occurs while in the "busy" state of this ExecuteRequest // so that the `parent` message is set correctly in any Jupyter messages) @@ -1835,12 +1824,19 @@ impl Console { } } - fn handle_kernel_request(&mut self, req: KernelRequest, info: &PromptInfo) { + fn handle_kernel_request(&mut self, req: KernelRequest, _info: &PromptInfo) { log::trace!("Received kernel request {req:?}"); match req { - KernelRequest::EstablishUiCommChannel(ref ui_comm_tx) => { - self.handle_establish_ui_comm_channel(ui_comm_tx.clone(), info) + KernelRequest::CommOpen { + comm_id, + comm_name, + outgoing_tx, + handler, + done_tx, + } => { + self.comm_open_frontend(comm_id, &comm_name, outgoing_tx, handler); + done_tx.send(()).log_err(); }, KernelRequest::CommMsg { comm_id, @@ -2182,9 +2178,7 @@ impl Console { let busy = which != 0; // Send updated state to the frontend over the UI comm - self.with_ui_comm_tx(|ui_comm_tx| { - ui_comm_tx.send_event(UiFrontendEvent::Busy(BusyParams { busy })); - }); + self.send_ui_event(&UiFrontendEvent::Busy(BusyParams { busy })); } /// Invoked by R to show a message to the user. @@ -2193,9 +2187,7 @@ impl Console { let message = message.to_str().unwrap().to_string(); // Deliver message to the frontend over the UI comm - self.with_ui_comm_tx(|ui_comm_tx| { - ui_comm_tx.send_event(UiFrontendEvent::ShowMessage(ShowMessageParams { message })) - }); + self.send_ui_event(&UiFrontendEvent::ShowMessage(ShowMessageParams { message })); } /// Invoked by the R event loop diff --git a/crates/ark/src/lsp/backend.rs b/crates/ark/src/lsp/backend.rs index f7b745bc3..0584c709b 100644 --- a/crates/ark/src/lsp/backend.rs +++ b/crates/ark/src/lsp/backend.rs @@ -112,9 +112,7 @@ fn report_crash() { message: String::from(user_message), }); - if let Some(ui_comm_tx) = Console::get().get_ui_comm_tx() { - ui_comm_tx.send_event(event); - } + Console::get().send_ui_event(&event); }); } diff --git a/crates/ark/src/request.rs b/crates/ark/src/request.rs index 0f7390de8..6d3370207 100644 --- a/crates/ark/src/request.rs +++ b/crates/ark/src/request.rs @@ -6,12 +6,13 @@ // use amalthea::comm::comm_channel::CommMsg; +use amalthea::socket::comm::CommOutgoingTx; use amalthea::wire::execute_reply::ExecuteReply; use amalthea::wire::execute_request::ExecuteRequest; use amalthea::wire::originator::Originator; use crossbeam::channel::Sender; -use crate::ui::UiCommMessage; +use crate::comm_handler::CommHandler; /// Represents requests to the primary R execution thread. #[derive(Debug, Clone)] @@ -53,8 +54,15 @@ pub fn debug_request_command(req: DebugRequest) -> String { /// Represents requests to the kernel. #[derive(Debug)] pub enum KernelRequest { - /// Establish a channel to the UI comm which forwards messages to the frontend - EstablishUiCommChannel(Sender), + /// Register a frontend-initiated comm handler on the R thread. + /// The handler is constructed on the Shell thread and sent here for registration. + CommOpen { + comm_id: String, + comm_name: String, + outgoing_tx: CommOutgoingTx, + handler: Box, + done_tx: Sender<()>, + }, /// Deliver an incoming comm message to the R thread CommMsg { diff --git a/crates/ark/src/shell.rs b/crates/ark/src/shell.rs index 3089105e5..ac284ff78 100644 --- a/crates/ark/src/shell.rs +++ b/crates/ark/src/shell.rs @@ -55,16 +55,17 @@ use crate::r_task; use crate::request::KernelRequest; use crate::request::RRequest; use crate::ui::UiComm; +use crate::ui::UI_COMM_NAME; use crate::variables::r_variables::RVariables; pub struct Shell { r_request_tx: Sender, - stdin_request_tx: Sender, + _stdin_request_tx: Sender, kernel_request_tx: Sender, kernel_init_rx: BusReader, kernel_info: Option, graphics_device_tx: AsyncUnboundedSender, - console_notification_tx: AsyncUnboundedSender, + _console_notification_tx: AsyncUnboundedSender, } #[derive(Debug)] @@ -84,12 +85,12 @@ impl Shell { ) -> Self { Self { r_request_tx, - stdin_request_tx, + _stdin_request_tx: stdin_request_tx, kernel_request_tx, kernel_init_rx, kernel_info: None, graphics_device_tx, - console_notification_tx, + _console_notification_tx: console_notification_tx, } } @@ -249,10 +250,8 @@ impl ShellHandler for Shell { Comm::Variables => handle_comm_open_variables(comm), Comm::Ui => handle_comm_open_ui( comm, - self.stdin_request_tx.clone(), self.kernel_request_tx.clone(), self.graphics_device_tx.clone(), - self.console_notification_tx.clone(), ), Comm::Help => handle_comm_open_help(comm), Comm::Other(target_name) if target_name == "ark" => ArkComm::handle_comm_open(comm), @@ -267,7 +266,7 @@ impl ShellHandler for Shell { msg: CommMsg, ) -> amalthea::Result { match comm_name { - DATA_EXPLORER_COMM_NAME => { + DATA_EXPLORER_COMM_NAME | UI_COMM_NAME => { self.dispatch_kernel_request(|done_tx| KernelRequest::CommMsg { comm_id: comm_id.to_string(), msg, @@ -285,7 +284,7 @@ impl ShellHandler for Shell { comm_name: &str, ) -> amalthea::Result { match comm_name { - DATA_EXPLORER_COMM_NAME => { + DATA_EXPLORER_COMM_NAME | UI_COMM_NAME => { self.dispatch_kernel_request(|done_tx| KernelRequest::CommClose { comm_id: comm_id.to_string(), done_tx, @@ -324,20 +323,24 @@ fn handle_comm_open_variables(comm: CommSocket) -> amalthea::Result { fn handle_comm_open_ui( comm: CommSocket, - stdin_request_tx: Sender, kernel_request_tx: Sender, graphics_device_tx: AsyncUnboundedSender, - _console_notification_tx: AsyncUnboundedSender, ) -> amalthea::Result { - // Create a frontend to wrap the comm channel we were just given. This starts - // a thread that proxies messages to the frontend. - let ui_comm_tx = UiComm::start(comm, stdin_request_tx, graphics_device_tx); + let handler = UiComm::new(graphics_device_tx); - // Send the frontend event channel to the execution thread so it can emit - // events to the frontend. - if let Err(err) = kernel_request_tx.send(KernelRequest::EstablishUiCommChannel(ui_comm_tx)) { - log::error!("Could not deliver UI comm channel to execution thread: {err:?}"); - }; + let (done_tx, done_rx) = bounded(0); + kernel_request_tx + .send(KernelRequest::CommOpen { + comm_id: comm.comm_id.clone(), + comm_name: comm.comm_name.clone(), + outgoing_tx: comm.outgoing_tx.clone(), + handler: Box::new(handler), + done_tx, + }) + .map_err(|err| amalthea::Error::SendError(err.to_string()))?; + done_rx + .recv() + .map_err(|err| amalthea::Error::ReceiveError(err.to_string()))?; Ok(true) } diff --git a/crates/ark/src/ui/events.rs b/crates/ark/src/ui/events.rs index 241ac48f5..4c96603bc 100644 --- a/crates/ark/src/ui/events.rs +++ b/crates/ark/src/ui/events.rs @@ -31,10 +31,11 @@ pub unsafe extern "C-unwind" fn ps_ui_show_message(message: SEXP) -> anyhow::Res let event = UiFrontendEvent::ShowMessage(params); - let ui_comm_tx = Console::get() - .get_ui_comm_tx() - .ok_or_else(|| ui_comm_not_connected("ui_show_message"))?; - ui_comm_tx.send_event(event); + let console = Console::get(); + if !console.is_ui_comm_connected() { + return Err(ui_comm_not_connected("ui_show_message")); + } + console.send_ui_event(&event); Ok(R_NilValue) } @@ -51,10 +52,11 @@ pub unsafe extern "C-unwind" fn ps_ui_open_workspace( let event = UiFrontendEvent::OpenWorkspace(params); - let ui_comm_tx = Console::get() - .get_ui_comm_tx() - .ok_or_else(|| ui_comm_not_connected("ui_open_workspace"))?; - ui_comm_tx.send_event(event); + let console = Console::get(); + if !console.is_ui_comm_connected() { + return Err(ui_comm_not_connected("ui_open_workspace")); + } + console.send_ui_event(&event); Ok(R_NilValue) } @@ -79,10 +81,11 @@ pub unsafe extern "C-unwind" fn ps_ui_navigate_to_file( let event = UiFrontendEvent::OpenEditor(params); - let ui_comm_tx = Console::get() - .get_ui_comm_tx() - .ok_or_else(|| ui_comm_not_connected("ui_navigate_to_file"))?; - ui_comm_tx.send_event(event); + let console = Console::get(); + if !console.is_ui_comm_connected() { + return Err(ui_comm_not_connected("ui_navigate_to_file")); + } + console.send_ui_event(&event); Ok(R_NilValue) } @@ -94,10 +97,11 @@ pub unsafe extern "C-unwind" fn ps_ui_set_selection_ranges(ranges: SEXP) -> anyh let event = UiFrontendEvent::SetEditorSelections(params); - let ui_comm_tx = Console::get() - .get_ui_comm_tx() - .ok_or_else(|| ui_comm_not_connected("ui_set_selection_ranges"))?; - ui_comm_tx.send_event(event); + let console = Console::get(); + if !console.is_ui_comm_connected() { + return Err(ui_comm_not_connected("ui_set_selection_ranges")); + } + console.send_ui_event(&event); Ok(R_NilValue) } @@ -109,10 +113,11 @@ pub fn send_show_url_event(url: &str) -> anyhow::Result<()> { }; let event = UiFrontendEvent::ShowUrl(params); - let ui_comm_tx = Console::get() - .get_ui_comm_tx() - .ok_or_else(|| ui_comm_not_connected("show_url"))?; - ui_comm_tx.send_event(event); + let console = Console::get(); + if !console.is_ui_comm_connected() { + return Err(ui_comm_not_connected("show_url")); + } + console.send_ui_event(&event); Ok(()) } @@ -130,10 +135,11 @@ pub fn send_open_with_system_event(path: &str) -> anyhow::Result<()> { }; let event = UiFrontendEvent::OpenWithSystem(params); - let ui_comm_tx = Console::get() - .get_ui_comm_tx() - .ok_or_else(|| ui_comm_not_connected("open_with_system"))?; - ui_comm_tx.send_event(event); + let console = Console::get(); + if !console.is_ui_comm_connected() { + return Err(ui_comm_not_connected("open_with_system")); + } + console.send_ui_event(&event); Ok(()) } diff --git a/crates/ark/src/ui/mod.rs b/crates/ark/src/ui/mod.rs index d41652211..f6cafd0f2 100644 --- a/crates/ark/src/ui/mod.rs +++ b/crates/ark/src/ui/mod.rs @@ -8,8 +8,7 @@ pub mod events; pub mod methods; -mod sender; -pub use sender::*; - mod ui; -pub use ui::*; +pub use ui::send_ui_event; +pub use ui::UiComm; +pub use ui::UI_COMM_NAME; diff --git a/crates/ark/src/ui/sender.rs b/crates/ark/src/ui/sender.rs deleted file mode 100644 index fe60e5d91..000000000 --- a/crates/ark/src/ui/sender.rs +++ /dev/null @@ -1,103 +0,0 @@ -// -// sender.rs -// -// Copyright (C) 2024 by Posit Software, PBC -// -// - -use std::path::PathBuf; - -use amalthea::comm::ui_comm::PromptStateParams; -use amalthea::comm::ui_comm::UiFrontendEvent; -use amalthea::comm::ui_comm::WorkingDirectoryParams; -use amalthea::wire::input_request::UiCommFrontendRequest; -use crossbeam::channel::Sender; - -use crate::ui::UiCommMessage; - -/// Wrapper around a `Sender` that communicates -/// messages to the `UiComm` -/// -/// Adds convenience methods for sending `Event`s and `Request`s. -/// -/// Manages a bit of state for performing a state refresh -/// (the `working_directory`). -pub struct UiCommSender { - ui_comm_tx: Sender, - working_directory: PathBuf, -} - -impl UiCommSender { - pub fn new(ui_comm_tx: Sender) -> Self { - // Empty path buf will get updated on first directory refresh - let working_directory = PathBuf::new(); - - Self { - ui_comm_tx, - working_directory, - } - } - - pub fn send_event(&self, event: UiFrontendEvent) { - self.send(UiCommMessage::Event(event)) - } - - pub fn send_request(&self, request: UiCommFrontendRequest) { - self.send(UiCommMessage::Request(request)) - } - - fn send(&self, msg: UiCommMessage) { - log::info!("Sending message to UI comm: {msg:?}"); - - if let Err(err) = self.ui_comm_tx.send(msg) { - log::error!("Error sending message to UI comm: {err:?}"); - - // TODO: Something is wrong with the UI thread, we should - // disconnect to avoid more errors but then we need a mutable self - // self.ui_comm_tx = None; - } - } - - pub fn send_refresh(&mut self, input_prompt: String, continuation_prompt: String) { - self.refresh_prompt_info(input_prompt, continuation_prompt); - - if let Err(err) = self.refresh_working_directory() { - log::error!("Can't refresh working directory: {err:?}"); - } - } - - fn refresh_prompt_info(&self, input_prompt: String, continuation_prompt: String) { - self.send_event(UiFrontendEvent::PromptState(PromptStateParams { - input_prompt, - continuation_prompt, - })); - } - - /// Checks for changes to the working directory, and sends an event to the - /// frontend if the working directory has changed. - fn refresh_working_directory(&mut self) -> anyhow::Result<()> { - // Get the current working directory - let mut new_working_directory = std::env::current_dir()?; - - // If it isn't the same as the last working directory, send an event - if new_working_directory != self.working_directory { - self.working_directory = new_working_directory.clone(); - - // Attempt to alias the directory, if it's within the home directory - if let Some(home_dir) = home::home_dir() { - if let Ok(stripped_dir) = new_working_directory.strip_prefix(home_dir) { - let mut new_path = PathBuf::from("~"); - new_path.push(stripped_dir); - new_working_directory = new_path; - } - } - - // Deliver event to client - self.send_event(UiFrontendEvent::WorkingDirectory(WorkingDirectoryParams { - directory: new_working_directory.to_string_lossy().to_string(), - })); - }; - - Ok(()) - } -} diff --git a/crates/ark/src/ui/ui.rs b/crates/ark/src/ui/ui.rs index 9fb1a26ac..62126be75 100644 --- a/crates/ark/src/ui/ui.rs +++ b/crates/ark/src/ui/ui.rs @@ -5,154 +5,71 @@ // // +use std::path::PathBuf; + use amalthea::comm::comm_channel::CommMsg; use amalthea::comm::ui_comm::CallMethodParams; use amalthea::comm::ui_comm::DidChangePlotsRenderSettingsParams; use amalthea::comm::ui_comm::EditorContextChangedParams; use amalthea::comm::ui_comm::EvalResult; use amalthea::comm::ui_comm::EvaluateCodeParams; +use amalthea::comm::ui_comm::PromptStateParams; use amalthea::comm::ui_comm::UiBackendReply; use amalthea::comm::ui_comm::UiBackendRequest; use amalthea::comm::ui_comm::UiFrontendEvent; -use amalthea::socket::comm::CommSocket; -use amalthea::socket::stdin::StdInRequest; -use amalthea::wire::input_request::UiCommFrontendRequest; -use crossbeam::channel::Receiver; -use crossbeam::channel::Sender; -use crossbeam::select; +use amalthea::comm::ui_comm::WorkingDirectoryParams; +use amalthea::socket::comm::CommOutgoingTx; use harp::eval::parse_eval_global; use harp::exec::RFunction; use harp::exec::RFunctionExt; use harp::object::RObject; use serde_json::Value; -use stdext::spawn; -use stdext::unwrap; +use stdext::result::ResultExt; use tokio::sync::mpsc::UnboundedSender as AsyncUnboundedSender; +use crate::comm_handler::handle_rpc_request; +use crate::comm_handler::CommHandler; +use crate::comm_handler::CommHandlerContext; +use crate::comm_handler::EnvironmentChanged; use crate::console::Console; use crate::console::ConsoleOutputCapture; use crate::plots::graphics_device::GraphicsDeviceNotification; -use crate::r_task; -#[derive(Debug)] -pub enum UiCommMessage { - Event(UiFrontendEvent), - Request(UiCommFrontendRequest), +pub const UI_COMM_NAME: &str = "positron.ui"; + +/// Send a `UiFrontendEvent` as a `CommMsg::Data` on the given outgoing channel. +pub fn send_ui_event(outgoing_tx: &CommOutgoingTx, event: &UiFrontendEvent) { + let Ok(json) = serde_json::to_value(event) else { + log::error!("Failed to serialize UI event"); + return; + }; + outgoing_tx.send(CommMsg::Data(json)).log_err(); } -/// UiComm is a wrapper around a comm channel whose lifetime matches -/// that of the Positron UI frontend. It is used to perform communication with the -/// frontend that isn't scoped to any particular view. +/// Comm handler for the Positron UI comm. +/// +/// Runs on the R thread via the `CommHandler` trait. Handles incoming RPCs +/// from the frontend (e.g. `callMethod`, `evaluateCode`) and sends events +/// to the frontend (e.g. prompt state, working directory changes). +#[derive(Debug)] pub struct UiComm { - comm: CommSocket, - ui_comm_rx: Receiver, - stdin_request_tx: Sender, graphics_device_tx: AsyncUnboundedSender, + working_directory: PathBuf, } impl UiComm { - pub(crate) fn start( - comm: CommSocket, - stdin_request_tx: Sender, + pub(crate) fn new( graphics_device_tx: AsyncUnboundedSender, - ) -> Sender { - // Create a sender-receiver pair for Positron global events - let (ui_comm_tx, ui_comm_rx) = crossbeam::channel::unbounded::(); - - spawn!("ark-comm-ui", move || { - let frontend = Self { - comm, - ui_comm_rx, - stdin_request_tx, - graphics_device_tx, - }; - frontend.execution_thread(); - }); - - ui_comm_tx - } - - fn execution_thread(&self) { - loop { - // Wait for an event on either the event channel (which forwards - // Positron events to the frontend) or the comm channel (which - // receives requests from the frontend) - select! { - recv(&self.ui_comm_rx) -> msg => { - let msg = unwrap!(msg, Err(err) => { - log::error!( - "Error receiving Positron event; closing event listener: {err:?}" - ); - // Most likely the channel was closed, so we should stop the thread - break; - }); - match msg { - UiCommMessage::Event(event) => self.dispatch_event(&event), - UiCommMessage::Request(request) => self.call_frontend_method(request).unwrap(), - } - }, - - recv(&self.comm.incoming_rx) -> msg => { - match msg { - Ok(msg) => { - if !self.handle_comm_message(msg) { - log::info!("UI comm {} closing by request from frontend.", self.comm.comm_id); - break; - } - }, - Err(err) => { - log::error!("Error receiving message from frontend: {:?}", err); - break; - }, - } - }, - } - } - } - - fn dispatch_event(&self, event: &UiFrontendEvent) { - let json = serde_json::to_value(event).unwrap(); - - // Deliver the event to the frontend over the comm channel - if let Err(err) = self.comm.outgoing_tx.send(CommMsg::Data(json)) { - log::error!("Error sending UI event to frontend: {}", err); - }; - } - - /** - * Handles a comm message from the frontend. - * - * Returns true if the thread should continue, false if it should exit. - */ - fn handle_comm_message(&self, message: CommMsg) -> bool { - if let CommMsg::Close = message { - // The frontend has closed the connection; let the - // thread exit. - return false; - } - - if self - .comm - .handle_request(message.clone(), |req| self.handle_backend_method(req)) - { - return true; + ) -> Self { + Self { + graphics_device_tx, + working_directory: PathBuf::new(), } - - // We don't really expect to receive data messages from the - // frontend; they are events - log::warn!("Unexpected data message from frontend: {message:?}"); - true } - /** - * Handles an RPC request from the frontend. - */ - fn handle_backend_method( - &self, - request: UiBackendRequest, - ) -> anyhow::Result { + fn handle_rpc(&mut self, request: UiBackendRequest) -> anyhow::Result { match request { - UiBackendRequest::CallMethod(request) => self.handle_call_method(request), + UiBackendRequest::CallMethod(params) => self.handle_call_method(params), UiBackendRequest::DidChangePlotsRenderSettings(params) => { self.handle_did_change_plot_render_settings(params) }, @@ -163,10 +80,7 @@ impl UiComm { } } - fn handle_call_method( - &self, - request: CallMethodParams, - ) -> anyhow::Result { + fn handle_call_method(&self, request: CallMethodParams) -> anyhow::Result { log::trace!("Handling '{}' frontend RPC method", request.method); // Today, all RPCs are fulfilled by R directly. Check to see if an R @@ -176,31 +90,25 @@ impl UiComm { // fulfilled here on the Rust side, with only some requests forwarded to // R; Rust methods may wish to establish their own RPC handlers. - // The method name is prefixed with ".ps.rpc.", by convention let method = format!(".ps.rpc.{}", request.method); - // Use the `exists` function to see if the method exists - let exists = r_task(|| unsafe { - let exists = RFunction::from("exists") - .param("x", method.clone()) - .call()?; - RObject::to::(exists) - })?; + let exists_obj = RFunction::from("exists") + .param("x", method.clone()) + .call()?; + let exists = unsafe { RObject::to::(exists_obj) }?; if !exists { - anyhow::bail!("No such method: {}", request.method); + let method = &request.method; + return Err(anyhow::anyhow!("No such method: {method}")); } - // Form an R function call from the request - let result = r_task(|| { - let mut call = RFunction::from(method); - for param in request.params.iter() { - let p = RObject::try_from(param.clone())?; - call.add(p); - } - let result = call.call()?; - Value::try_from(result) - })?; + let mut call = RFunction::from(method); + for param in request.params.iter() { + let p = RObject::try_from(param.clone())?; + call.add(p); + } + let result = call.call()?; + let result = Value::try_from(result)?; Ok(UiBackendReply::CallMethodReply(result)) } @@ -208,8 +116,8 @@ impl UiComm { fn handle_did_change_plot_render_settings( &self, params: DidChangePlotsRenderSettingsParams, - ) -> anyhow::Result { - // The frontend shoudn't send invalid sizes but be defensive. Sometimes + ) -> anyhow::Result { + // The frontend shouldn't send invalid sizes but be defensive. Sometimes // the plot container is in a strange state when it's hidden. if params.settings.size.height <= 0 || params.settings.size.width <= 0 { return Err(anyhow::anyhow!( @@ -222,7 +130,7 @@ impl UiComm { .send(GraphicsDeviceNotification::DidChangePlotRenderSettings( params.settings, )) - .unwrap(); + .map_err(|err| anyhow::anyhow!("Failed to send plot render settings: {err}"))?; Ok(UiBackendReply::DidChangePlotsRenderSettingsReply()) } @@ -230,7 +138,7 @@ impl UiComm { fn handle_editor_context_changed( &self, params: EditorContextChangedParams, - ) -> anyhow::Result { + ) -> anyhow::Result { log::trace!( "Editor context changed: document_uri={}, is_execution_source={}", params.document_uri, @@ -239,38 +147,28 @@ impl UiComm { Ok(UiBackendReply::EditorContextChangedReply()) } - fn handle_evaluate_code( - &self, - params: EvaluateCodeParams, - ) -> anyhow::Result { + fn handle_evaluate_code(&self, params: EvaluateCodeParams) -> anyhow::Result { log::trace!("Evaluating code: {}", params.code); - let result = r_task(|| { - let mut capture = if Console::is_initialized() { - Console::get_mut().start_capture() - } else { - ConsoleOutputCapture::dummy() - }; - - // Evaluate the user's code - let eval_result = parse_eval_global(¶ms.code); - - // Take captured output before dropping the capture guard - let output = capture.take(); - drop(capture); + let mut capture = if Console::is_initialized() { + Console::get_mut().start_capture() + } else { + ConsoleOutputCapture::dummy() + }; - // Now handle the eval result - let evaluated = eval_result?; - let value = Value::try_from(evaluated)?; + let eval_result = parse_eval_global(¶ms.code); - Ok((value, output)) - }); + let output = capture.take(); + drop(capture); - match result { - Ok((value, output)) => Ok(UiBackendReply::EvaluateCodeReply(EvalResult { - result: value, - output, - })), + match eval_result { + Ok(evaluated) => { + let value = Value::try_from(evaluated)?; + Ok(UiBackendReply::EvaluateCodeReply(EvalResult { + result: value, + output, + })) + }, Err(err) => { let message = match err { harp::Error::TryCatchError { message, .. } => message, @@ -283,101 +181,176 @@ impl UiComm { } } - /** - * Send an RPC request to the frontend. - */ - fn call_frontend_method(&self, request: UiCommFrontendRequest) -> anyhow::Result<()> { - let comm_msg = StdInRequest::Comm(request); - self.stdin_request_tx.send(comm_msg)?; + fn send_refresh(&mut self, ctx: &CommHandlerContext) { + self.refresh_prompt_info(ctx); + + if let Err(err) = self.refresh_working_directory(ctx) { + log::error!("Can't refresh working directory: {err:?}"); + } + } + + fn refresh_prompt_info(&self, ctx: &CommHandlerContext) { + let input_prompt = Self::get_r_option("prompt").unwrap_or_else(|| String::from("> ")); + let continuation_prompt = + Self::get_r_option("continue").unwrap_or_else(|| String::from("+ ")); + + send_ui_event( + &ctx.outgoing_tx, + &UiFrontendEvent::PromptState(PromptStateParams { + input_prompt, + continuation_prompt, + }), + ); + } + + /// Checks for changes to the working directory, and sends an event to the + /// frontend if the working directory has changed. + fn refresh_working_directory(&mut self, ctx: &CommHandlerContext) -> anyhow::Result<()> { + let mut new_working_directory = std::env::current_dir()?; + + if new_working_directory != self.working_directory { + self.working_directory = new_working_directory.clone(); + + // Attempt to alias the directory, if it's within the home directory + if let Some(home_dir) = home::home_dir() { + if let Ok(stripped_dir) = new_working_directory.strip_prefix(home_dir) { + let mut new_path = PathBuf::from("~"); + new_path.push(stripped_dir); + new_working_directory = new_path; + } + } + + send_ui_event( + &ctx.outgoing_tx, + &UiFrontendEvent::WorkingDirectory(WorkingDirectoryParams { + directory: new_working_directory.to_string_lossy().to_string(), + }), + ); + } Ok(()) } + + fn get_r_option(name: &str) -> Option { + let obj = RFunction::from("getOption").param("x", name).call().ok()?; + let value = Value::try_from(obj).ok()?; + value.as_str().map(String::from) + } +} + +impl CommHandler for UiComm { + fn handle_open(&mut self, ctx: &CommHandlerContext) { + self.send_refresh(ctx); + } + + fn handle_msg(&mut self, msg: CommMsg, ctx: &CommHandlerContext) { + handle_rpc_request(&ctx.outgoing_tx, UI_COMM_NAME, msg, |req| { + self.handle_rpc(req) + }); + } + + fn handle_environment(&mut self, event: EnvironmentChanged, ctx: &CommHandlerContext) { + let EnvironmentChanged::Execution = event else { + return; + }; + self.send_refresh(ctx); + } } #[cfg(test)] mod tests { use amalthea::comm::base_comm::JsonRpcError; use amalthea::comm::comm_channel::CommMsg; - use amalthea::comm::ui_comm::BusyParams; + use amalthea::comm::event::CommEvent; use amalthea::comm::ui_comm::CallMethodParams; use amalthea::comm::ui_comm::EvalResult; use amalthea::comm::ui_comm::EvaluateCodeParams; use amalthea::comm::ui_comm::UiBackendReply; use amalthea::comm::ui_comm::UiBackendRequest; - use amalthea::comm::ui_comm::UiFrontendEvent; - use amalthea::socket::comm::CommInitiator; - use amalthea::socket::comm::CommSocket; + use amalthea::socket::comm::CommOutgoingTx; use amalthea::socket::iopub::IOPubMessage; - use amalthea::socket::stdin::StdInRequest; use ark_test::dummy_jupyter_header; use ark_test::IOPubReceiverExt; use crossbeam::channel::bounded; - use crossbeam::channel::Sender; use harp::exec::RFunction; use harp::exec::RFunctionExt; use harp::object::RObject; use serde_json::Value; - use crate::plots::graphics_device::GraphicsDeviceNotification; - use crate::r_task::r_task; - use crate::ui::UiComm; - use crate::ui::UiCommMessage; + use super::*; + use crate::comm_handler::CommHandlerContext; + + fn setup_ui_comm( + iopub_tx: crossbeam::channel::Sender, + ) -> (UiComm, CommHandlerContext) { + let comm_id = uuid::Uuid::new_v4().to_string(); + let outgoing_tx = CommOutgoingTx::new(comm_id, iopub_tx); + let (comm_event_tx, _) = bounded::(10); + let ctx = CommHandlerContext::new(outgoing_tx, comm_event_tx); + let (graphics_device_tx, _) = tokio::sync::mpsc::unbounded_channel(); + let handler = UiComm::new(graphics_device_tx); + (handler, ctx) + } #[test] fn test_ui_comm() { - // Create a dummy iopub channel to receive responses. let (iopub_tx, iopub_rx) = bounded::(10); - // Create a sender/receiver pair for the comm channel. - let comm_socket = CommSocket::new( - CommInitiator::FrontEnd, - String::from("test-ui-comm-id"), - String::from("positron.UI"), - iopub_tx, - ); - - // Communication channel between the main thread and the Amalthea - // StdIn socket thread - let (stdin_request_tx, _stdin_request_rx) = bounded::(1); + let old_width = crate::r_task::r_task(move || { + let (mut handler, ctx) = setup_ui_comm(iopub_tx); - let (graphics_device_tx, _graphics_device_rx) = - tokio::sync::mpsc::unbounded_channel::(); + // Get the current console width + let old_width = unsafe { + let width = RFunction::from("getOption") + .param("x", "width") + .call() + .unwrap(); + RObject::to::(width).unwrap() + }; - // Create a frontend instance, get access to the sender channel - let ui_comm_tx = UiComm::start(comm_socket.clone(), stdin_request_tx, graphics_device_tx); + // Send a setConsoleWidth RPC + let msg = CommMsg::Rpc { + id: String::from("test-id-1"), + parent_header: dummy_jupyter_header(), + data: serde_json::to_value(UiBackendRequest::CallMethod(CallMethodParams { + method: String::from("setConsoleWidth"), + params: vec![Value::from(123)], + })) + .unwrap(), + }; + handler.handle_msg(msg, &ctx); + + // Assert that the console width changed + let new_width = unsafe { + let width = RFunction::from("getOption") + .param("x", "width") + .call() + .unwrap(); + RObject::to::(width).unwrap() + }; + assert_eq!(new_width, 123); - // Get the current console width - let old_width = r_task(|| unsafe { - let width = RFunction::from("getOption") - .param("x", "width") - .call() - .unwrap(); - RObject::to::(width).unwrap() - }); + // Now try to invoke an RPC that doesn't exist + let msg = CommMsg::Rpc { + id: String::from("test-id-2"), + parent_header: dummy_jupyter_header(), + data: serde_json::to_value(UiBackendRequest::CallMethod(CallMethodParams { + method: String::from("thisRpcDoesNotExist"), + params: vec![], + })) + .unwrap(), + }; + handler.handle_msg(msg, &ctx); - // Send a message to the frontend - let id = String::from("test-id-1"); - let request = UiBackendRequest::CallMethod(CallMethodParams { - method: String::from("setConsoleWidth"), - params: vec![Value::from(123)], + old_width }); - comm_socket - .incoming_tx - .send(CommMsg::Rpc { - id, - parent_header: dummy_jupyter_header(), - data: serde_json::to_value(request).unwrap(), - }) - .unwrap(); - // Wait for the reply; this should be a FrontendRpcResult. + // Check first response (setConsoleWidth) let response = iopub_rx.recv_comm_msg(); match response { CommMsg::Rpc { id, data, .. } => { - println!("Got RPC result: {:?}", data); let result = serde_json::from_value::(data).unwrap(); assert_eq!(id, "test-id-1"); - // This RPC should return the old width assert_eq!( result, UiBackendReply::CallMethodReply(Value::from(old_width)) @@ -386,151 +359,52 @@ mod tests { _ => panic!("Unexpected response: {:?}", response), } - // Get the new console width - let new_width = r_task(|| unsafe { - let width = RFunction::from("getOption") - .param("x", "width") - .call() - .unwrap(); - RObject::to::(width).unwrap() - }); - - // Assert that the console width changed - assert_eq!(new_width, 123); - - // Now try to invoke an RPC that doesn't exist - let id = String::from("test-id-2"); - let request = UiBackendRequest::CallMethod(CallMethodParams { - method: String::from("thisRpcDoesNotExist"), - params: vec![], - }); - comm_socket - .incoming_tx - .send(CommMsg::Rpc { - id, - parent_header: dummy_jupyter_header(), - data: serde_json::to_value(request).unwrap(), - }) - .unwrap(); - - // Wait for the reply + // Check second response (non-existent method error) let response = iopub_rx.recv_comm_msg(); match response { CommMsg::Rpc { id, data, .. } => { - println!("Got RPC result: {:?}", data); let _reply = serde_json::from_value::(data).unwrap(); - // Ensure that the error code is -32601 (method not found) assert_eq!(id, "test-id-2"); - - // TODO: This should normally throw a `MethodNotFound` but - // that's currently a bit hard because of the nested method - // call. One way to solve this would be for RPC handler - // functions to return a typed JSON-RPC error instead of a - // `anyhow::Result`. Then we could return a `MethodNotFound` from - // `callMethod()`. - // - // assert_eq!(reply.error.code, JsonRpcErrorCode::MethodNotFound); }, _ => panic!("Unexpected response: {:?}", response), } - - // Mark not busy (this prevents the frontend comm from being closed due to - // the Sender being dropped) - ui_comm_tx - .send(UiCommMessage::Event(UiFrontendEvent::Busy(BusyParams { - busy: false, - }))) - .unwrap(); } - /// Helper to set up a UiComm and return the pieces needed for testing - fn setup_ui_comm() -> ( - CommSocket, - crossbeam::channel::Receiver, - Sender, - ) { + #[test] + fn test_evaluate_code() { let (iopub_tx, iopub_rx) = bounded::(10); - let comm_socket = CommSocket::new( - CommInitiator::FrontEnd, - String::from("test-eval-comm-id"), - String::from("positron.UI"), - iopub_tx, - ); - let (stdin_request_tx, _stdin_request_rx) = bounded::(1); - let (graphics_device_tx, _graphics_device_rx) = - tokio::sync::mpsc::unbounded_channel::(); - let ui_comm_tx = UiComm::start(comm_socket.clone(), stdin_request_tx, graphics_device_tx); - (comm_socket, iopub_rx, ui_comm_tx) - } - /// Send an evaluate_code RPC and return the reply - fn send_evaluate_code( - comm_socket: &CommSocket, - iopub_rx: &crossbeam::channel::Receiver, - id: &str, - code: &str, - ) -> UiBackendReply { - let request = UiBackendRequest::EvaluateCode(EvaluateCodeParams { - code: String::from(code), - }); - comm_socket - .incoming_tx - .send(CommMsg::Rpc { - id: String::from(id), + crate::r_task::r_task(move || { + let (mut handler, ctx) = setup_ui_comm(iopub_tx); + + // Pure result with no output (e.g. 1 + 1) + let msg = CommMsg::Rpc { + id: String::from("eval-1"), parent_header: dummy_jupyter_header(), - data: serde_json::to_value(request).unwrap(), - }) - .unwrap(); + data: serde_json::to_value(UiBackendRequest::EvaluateCode(EvaluateCodeParams { + code: String::from("1 + 1"), + })) + .unwrap(), + }; + handler.handle_msg(msg, &ctx); + }); let response = iopub_rx.recv_comm_msg(); match response { - CommMsg::Rpc { data, .. } => serde_json::from_value::(data).unwrap(), + CommMsg::Rpc { data, .. } => { + let result = serde_json::from_value::(data).unwrap(); + assert_eq!( + result, + UiBackendReply::EvaluateCodeReply(EvalResult { + result: Value::from(2.0), + // Output capture relies on Console::start_capture(), which is + // not available in unit tests (Console is not initialized). + // Output capture is exercised in integration tests instead. + output: String::from(""), + }) + ); + }, _ => panic!("Unexpected response: {:?}", response), } } - - #[test] - fn test_evaluate_code() { - let (comm_socket, iopub_rx, ui_comm_tx) = setup_ui_comm(); - - // Test 1: Pure result with no output (e.g. 1 + 1) - let reply = send_evaluate_code(&comm_socket, &iopub_rx, "eval-1", "1 + 1"); - assert_eq!( - reply, - UiBackendReply::EvaluateCodeReply(EvalResult { - result: Value::from(2.0), - output: String::from(""), - }) - ); - - // Test 2: Code that returns a value - let reply = send_evaluate_code(&comm_socket, &iopub_rx, "eval-2", "isTRUE(cat('oatmeal'))"); - assert_eq!( - reply, - UiBackendReply::EvaluateCodeReply(EvalResult { - result: Value::from(false), - // Output capture relies on Console::start_capture(), which is - // not available in unit tests (Console is not initialized). - // Output capture is exercised in integration tests instead. - output: String::from(""), - }) - ); - - // Test 3: Code that only prints, with an invisible NULL result - let reply = send_evaluate_code(&comm_socket, &iopub_rx, "eval-3", "cat('hello\\nworld')"); - assert_eq!( - reply, - UiBackendReply::EvaluateCodeReply(EvalResult { - result: Value::Null, - output: String::from(""), - }) - ); - - // Keep the comm alive - ui_comm_tx - .send(UiCommMessage::Event(UiFrontendEvent::Busy(BusyParams { - busy: false, - }))) - .unwrap(); - } } diff --git a/crates/ark/src/viewer.rs b/crates/ark/src/viewer.rs index e6de45c55..9883701be 100644 --- a/crates/ark/src/viewer.rs +++ b/crates/ark/src/viewer.rs @@ -112,11 +112,10 @@ pub unsafe extern "C-unwind" fn ps_html_viewer( // TODO: What's the right thing to do in `Console` mode when // we aren't connected to Positron? Right now we error. - let ui_comm_tx = console - .get_ui_comm_tx() - .ok_or_else(|| anyhow::anyhow!("UI comm not connected."))?; - - ui_comm_tx.send_event(event); + if !console.is_ui_comm_connected() { + return Err(anyhow::anyhow!("UI comm not connected.")); + } + console.send_ui_event(&event); }, } }, diff --git a/crates/ark/tests/evaluate-code.rs b/crates/ark/tests/evaluate-code.rs index 939a6bc7d..8a9073b17 100644 --- a/crates/ark/tests/evaluate-code.rs +++ b/crates/ark/tests/evaluate-code.rs @@ -22,14 +22,15 @@ fn evaluate_code(frontend: &DummyArkFrontend, comm_id: &str, code: &str) -> UiBa frontend.send_shell_comm_msg(String::from(comm_id), data); - // The shell routes the message to the UI comm thread and goes busy/idle. - // The RPC reply is sent from the UI comm thread and can arrive on IOPub - // either before or after the shell's Idle status. + // The UI comm runs on the R thread via CommHandler, so the RPC reply + // arrives deterministically before Idle. frontend.recv_iopub_busy(); - let reply = frontend.recv_iopub_comm_msg_and_idle(); + let reply = frontend.recv_iopub_comm_msg(); assert_eq!(reply.comm_id, comm_id); + frontend.recv_iopub_idle(); + serde_json::from_value::(reply.data).unwrap() } diff --git a/crates/ark_test/src/dummy_frontend.rs b/crates/ark_test/src/dummy_frontend.rs index 09edb1814..e3ad91e19 100644 --- a/crates/ark_test/src/dummy_frontend.rs +++ b/crates/ark_test/src/dummy_frontend.rs @@ -1094,54 +1094,40 @@ impl DummyArkFrontend { data: serde_json::json!({}), }); - // The comm_open triggers busy/idle on the shell thread and also - // sends an EstablishUiCommChannel kernel request to the console. - // The UI comm then sends events (prompt_state, working_directory) - // as CommMsg on IOPub. These can arrive in any order relative to - // the busy/idle. We wait for the prompt_state CommMsg as evidence - // that the UI comm has been established, draining everything. - let deadline = Instant::now() + RECV_TIMEOUT; - let mut got_prompt_state = false; - let mut idle_count = 0u32; + // The UI comm now runs on the R thread via CommHandler. The + // comm_open blocks Shell while the handler's `handle_open()` runs, + // so events (prompt_state, working_directory) arrive + // deterministically within the Busy/Idle window. + self.recv_iopub_busy(); - // We need to see the prompt_state AND at least one idle - while !got_prompt_state || idle_count == 0 { - let remaining = deadline.saturating_duration_since(Instant::now()); - let Some(msg) = self.recv_iopub_with_timeout(remaining) else { - panic!( - "Timed out waiting for UI comm (got_prompt_state={got_prompt_state}, \ - idle_count={idle_count})" - ); - }; - match &msg { - Message::CommMsg(data) => { - if let Some(method) = data.content.data.get("method").and_then(|v| v.as_str()) { - if method == "prompt_state" { - got_prompt_state = true; - } - } + // Drain the initial events sent by `handle_open()` (prompt_state, + // working_directory). We don't assert on their content here. + let mut comm_msg_count = 0; + loop { + let msg = self.recv_iopub_next(); + match msg { + Message::CommMsg(_) => { + comm_msg_count += 1; }, Message::Status(ref data) if data.content.execution_state == amalthea::wire::status::ExecutionState::Idle => { - idle_count += 1; + self.flush_streams_at_boundary(); + break; }, Message::Stream(ref data) => { self.buffer_stream(&data.content); }, - _ => {}, + other => panic!("Unexpected message during open_ui_comm: {other:?}"), } } - // The UI comm sends events asynchronously. Some may arrive after - // idle. Drain any stragglers with a short timeout. - while let Some(msg) = self.recv_iopub_with_timeout(Duration::from_millis(200)) { - if let Message::Stream(ref data) = msg { - self.buffer_stream(&data.content); - } - // Discard late comm messages and other events - } + // We expect at least the prompt_state event + assert!( + comm_msg_count >= 1, + "Expected at least 1 comm event from UI comm open, got {comm_msg_count}" + ); comm_id } From 2eda5c1c7b3d6c4546c51f3e12e7e2c874cb9788 Mon Sep 17 00:00:00 2001 From: Lionel Henry Date: Tue, 10 Mar 2026 14:59:28 +0100 Subject: [PATCH 2/3] Review and simplify sync UI comm --- crates/ark/src/comm_handler.rs | 9 + crates/ark/src/console.rs | 5 +- crates/ark/src/console/console_comm.rs | 22 ++- crates/ark/src/console/console_integration.rs | 165 ++++++++++-------- crates/ark/src/console/console_repl.rs | 38 ++-- .../ark/src/data_explorer/r_data_explorer.rs | 2 +- crates/ark/src/lsp/backend.rs | 12 +- crates/ark/src/plots/graphics_device.rs | 2 +- crates/ark/src/shell.rs | 8 - crates/ark/src/start.rs | 2 - crates/ark/src/ui/events.rs | 47 +---- crates/ark/src/ui/methods.rs | 49 ++++-- crates/ark/src/ui/mod.rs | 1 - crates/ark/src/ui/ui.rs | 133 +++++--------- crates/ark/src/variables/r_variables.rs | 2 +- crates/ark/src/viewer.rs | 5 +- crates/ark/tests/evaluate-code.rs | 4 - crates/ark_test/src/dummy_frontend.rs | 78 ++------- 18 files changed, 248 insertions(+), 336 deletions(-) diff --git a/crates/ark/src/comm_handler.rs b/crates/ark/src/comm_handler.rs index 0a5654246..5b7f1ccaa 100644 --- a/crates/ark/src/comm_handler.rs +++ b/crates/ark/src/comm_handler.rs @@ -47,6 +47,15 @@ impl CommHandlerContext { pub fn is_closed(&self) -> bool { self.closed.get() } + + /// Send a serializable event as `CommMsg::Data` on the outgoing channel. + /// Serialization or send errors are logged and ignored. + pub fn send_event(&self, event: &T) { + let Some(json) = serde_json::to_value(event).log_err() else { + return; + }; + self.outgoing_tx.send(CommMsg::Data(json)).log_err(); + } } /// Trait for comm handlers that run synchronously on the R thread. diff --git a/crates/ark/src/console.rs b/crates/ark/src/console.rs index 09ea82f92..2b291ae4c 100644 --- a/crates/ark/src/console.rs +++ b/crates/ark/src/console.rs @@ -165,7 +165,6 @@ use crate::srcref::ns_populate_srcref; use crate::srcref::resource_loaded_namespaces; use crate::startup; use crate::sys::console::console_to_utf8; -use crate::ui::send_ui_event; use crate::url::UrlId; thread_local! { @@ -222,8 +221,8 @@ pub(crate) struct Console { tasks_idle_any_rx: Receiver, pending_futures: HashMap, RTaskStartInfo, Option)>, - /// The comm ID of the currently connected UI comm, if any. - /// Used for fast lookup when sending events to the frontend. + /// Comm ID of the currently connected UI comm, if any. + /// The handler lives in `self.comms`; this is just an index into it. ui_comm_id: Option, /// Error captured by our global condition handler during the last iteration diff --git a/crates/ark/src/console/console_comm.rs b/crates/ark/src/console/console_comm.rs index 5e01a619c..ef8d1d6fb 100644 --- a/crates/ark/src/console/console_comm.rs +++ b/crates/ark/src/console/console_comm.rs @@ -3,7 +3,6 @@ // // Copyright (C) 2026 Posit Software, PBC. All rights reserved. // -// use amalthea::comm::comm_channel::CommMsg; use amalthea::comm::event::CommEvent; @@ -31,7 +30,6 @@ impl Console { } pub(super) fn comm_handle_close(&mut self, comm_id: &str) { - // Clear UI comm ID if this is the UI comm being closed if self.ui_comm_id.as_deref() == Some(comm_id) { self.ui_comm_id = None; } @@ -47,7 +45,7 @@ impl Console { /// /// Creates the `CommSocket` and `CommHandlerContext`, calls `handle_open`, /// sends `CommEvent::Opened` to amalthea, and returns the comm ID. - pub(crate) fn comm_register( + pub(crate) fn comm_open_backend( &mut self, comm_name: &str, mut handler: Box, @@ -76,10 +74,10 @@ impl Console { /// Register a frontend-initiated comm on the R thread. /// - /// Unlike `comm_register` (which is for backend-initiated comms and sends - /// `CommEvent::Opened`), this is called when the frontend opened the comm. - /// The `CommSocket` already exists in amalthea's open_comms list, so we - /// only need to register the handler and call `handle_open`. + /// Unlike `comm_open_backend` (which is for backend-initiated comms and + /// sends `CommEvent::Opened`), this is called when the frontend opened the + /// comm. The `CommSocket` already exists in amalthea's open_comms list, so + /// we only need to register the handler and call `handle_open`. pub(super) fn comm_open_frontend( &mut self, comm_id: String, @@ -91,8 +89,11 @@ impl Console { handler.handle_open(&ctx); if comm_name == UI_COMM_NAME { - if self.ui_comm_id.is_some() { + if let Some(old_id) = self.ui_comm_id.take() { log::info!("Replacing an existing UI comm."); + if let Some(mut old) = self.comms.remove(&old_id) { + old.handler.handle_close(&old.ctx); + } } self.ui_comm_id = Some(comm_id.clone()); } @@ -117,6 +118,11 @@ impl Console { .collect(); for comm_id in closed_ids { + // We're not expecting the UI comm to close itself but we handle the + // case explicitly to be defensive + if self.ui_comm_id.as_deref() == Some(comm_id.as_str()) { + self.ui_comm_id = None; + } if let Some(reg) = self.comms.remove(&comm_id) { self.comm_notify_closed(&comm_id, ®); } diff --git a/crates/ark/src/console/console_integration.rs b/crates/ark/src/console/console_integration.rs index 3c1b7ac67..52d61e463 100644 --- a/crates/ark/src/console/console_integration.rs +++ b/crates/ark/src/console/console_integration.rs @@ -15,83 +15,18 @@ impl Console { self.session_mode } - /// Send a `UiFrontendEvent` to the frontend via the UI comm, if connected. - /// Silently drops the event if no UI comm is open (common in Jupyter). - pub(crate) fn send_ui_event(&self, event: &UiFrontendEvent) { - let Some(reg) = self.comms.get(self.ui_comm_id.as_deref().unwrap_or("")) else { - log::trace!("UI comm isn't connected, dropping event."); - return; - }; - send_ui_event(®.ctx.outgoing_tx, event); - } - - pub(crate) fn is_ui_comm_connected(&self) -> bool { - self.ui_comm_id - .as_deref() - .is_some_and(|id| self.comms.contains_key(id)) + pub(crate) fn ui_comm(&self) -> Option> { + let comm = self.comms.get(self.ui_comm_id.as_deref()?)?; + Some(UiCommRef { + comm, + originator: self.active_request.as_ref().map(|r| &r.originator), + stdin_request_tx: &self.stdin_request_tx, + }) } - pub(crate) fn call_frontend_method( - &self, - request: UiFrontendRequest, - ) -> anyhow::Result { - log::trace!("Calling frontend method {request:?}"); - - if !self.is_ui_comm_connected() { - return Err(anyhow!( - "UI comm is not connected. Can't execute request {request:?}" - )); - } - - let (reply_tx, reply_rx) = bounded(1); - - let Some(req) = &self.active_request else { - return Err(anyhow!( - "No active request. Can't execute request {request:?}" - )); - }; - - // Forward request directly to the stdin channel - let comm_msg = StdInRequest::Comm(UiCommFrontendRequest { - originator: req.originator.clone(), - reply_tx, - request: request.clone(), - }); - self.stdin_request_tx.send(comm_msg)?; - - // Block for reply - let reply = reply_rx.recv().unwrap(); - - log::trace!("Got reply from frontend method: {reply:?}"); - - match reply { - StdInRpcReply::Reply(reply) => match reply { - JsonRpcReply::Result(reply) => { - // Deserialize to Rust first to verify the OpenRPC contract. - // Errors are propagated to R. - if let Err(err) = ui_frontend_reply_from_value(reply.result.clone(), &request) { - return Err(anyhow!( - "Can't deserialize RPC reply for {request:?}:\n{err:?}" - )); - } - - // Now deserialize to an R object - Ok(RObject::try_from(reply.result)?) - }, - JsonRpcReply::Error(reply) => { - let message = reply.error.message; - - return Err(anyhow!( - "While calling frontend method:\n\ - {message}", - )); - }, - }, - // If an interrupt was signalled, return `NULL`. This should not be - // visible to the caller since `r_unwrap()` (called e.g. by - // `harp::register`) will trigger an interrupt jump right away. - StdInRpcReply::Interrupt => Ok(RObject::null()), - } + pub(crate) fn try_ui_comm(&self) -> anyhow::Result> { + self.ui_comm() + .ok_or_else(|| anyhow!("UI comm is not connected")) } } @@ -228,3 +163,83 @@ impl Console { self.lsp_virtual_documents.get(uri).cloned() } } + +/// Reference to the UI comm. Returned by `Console::ui_comm()`. +/// +/// Existence of this value guarantees the comm is connected. +pub(crate) struct UiCommRef<'a> { + comm: &'a ConsoleComm, + originator: Option<&'a Originator>, + stdin_request_tx: &'a Sender, +} + +impl UiCommRef<'_> { + pub(crate) fn send_event(&self, event: &UiFrontendEvent) { + self.comm.ctx.send_event(event); + } + + pub(crate) fn busy(&self, busy: bool) { + self.send_event(&UiFrontendEvent::Busy(BusyParams { busy })); + } + + pub(crate) fn show_message(&self, message: String) { + self.send_event(&UiFrontendEvent::ShowMessage(ShowMessageParams { message })); + } + + pub(crate) fn call_frontend_method( + &self, + request: UiFrontendRequest, + ) -> anyhow::Result { + log::trace!("Calling frontend method {request:?}"); + + let (reply_tx, reply_rx) = bounded(1); + + let Some(originator) = self.originator else { + return Err(anyhow!( + "No active request. Can't execute request {request:?}" + )); + }; + + // Forward request directly to the stdin channel + let comm_msg = StdInRequest::Comm(UiCommFrontendRequest { + originator: originator.clone(), + reply_tx, + request: request.clone(), + }); + self.stdin_request_tx.send(comm_msg)?; + + // Block for reply + let reply = reply_rx.recv()?; + + log::trace!("Got reply from frontend method: {reply:?}"); + + match reply { + StdInRpcReply::Reply(reply) => match reply { + JsonRpcReply::Result(reply) => { + // Deserialize to Rust first to verify the OpenRPC contract. + // Errors are propagated to R. + if let Err(err) = ui_frontend_reply_from_value(reply.result.clone(), &request) { + return Err(anyhow!( + "Can't deserialize RPC reply for {request:?}:\n{err:?}" + )); + } + + // Now deserialize to an R object + Ok(RObject::try_from(reply.result)?) + }, + JsonRpcReply::Error(reply) => { + let message = reply.error.message; + + return Err(anyhow!( + "While calling frontend method:\n\ + {message}", + )); + }, + }, + // If an interrupt was signalled, return `NULL`. This should not be + // visible to the caller since `r_unwrap()` (called e.g. by + // `harp::register`) will trigger an interrupt jump right away. + StdInRpcReply::Interrupt => Ok(RObject::null()), + } + } +} diff --git a/crates/ark/src/console/console_repl.rs b/crates/ark/src/console/console_repl.rs index d0015825d..0e249fd78 100644 --- a/crates/ark/src/console/console_repl.rs +++ b/crates/ark/src/console/console_repl.rs @@ -212,15 +212,6 @@ pub(super) struct PromptInfo { /// case of a browser prompt or a readline prompt. pub(super) input_prompt: String, - /// The continuation prompt string when user supplies incomplete - /// inputs. This always corresponds to `getOption("continue")`. We send - /// it to frontends along with `prompt` because some frontends such as - /// Positron do not send incomplete inputs to Ark and take charge of - /// continuation prompts themselves. For frontends that can send - /// incomplete inputs to Ark, like Jupyter Notebooks, we immediately - /// error on them rather than requesting that this be shown. - pub(super) continuation_prompt: String, - /// The kind of prompt we're handling. pub(super) kind: PromptKind, } @@ -881,7 +872,7 @@ impl Console { self.pending_inputs = None; // Reply to active request with error, then fall through to event loop - self.handle_active_request(&info, ConsoleValue::Error(exception)); + self.handle_active_request(ConsoleValue::Error(exception)); } else if matches!(info.kind, PromptKind::InputRequest) { // Request input from the frontend and return it to R return self.handle_input_request(&info, buf, buflen); @@ -892,7 +883,7 @@ impl Console { // Otherwise reply to active request with accumulated result, then // fall through to event loop let result = self.take_result(); - self.handle_active_request(&info, ConsoleValue::Success(result)); + self.handle_active_request(ConsoleValue::Success(result)); } // In the future we'll also send browser information, see @@ -1022,7 +1013,7 @@ impl Console { // We've got a kernel request i if i == kernel_request_index => { let req = oper.recv(&kernel_request_rx).unwrap(); - self.handle_kernel_request(req, &info); + self.handle_kernel_request(req); }, // An interrupt task woke us up @@ -1064,10 +1055,6 @@ impl Console { let prompt_slice = unsafe { CStr::from_ptr(prompt_c) }; let prompt = prompt_slice.to_string_lossy().into_owned(); - // Sent to the frontend after each top-level command so users can - // customise their prompts - let continuation_prompt: String = harp::get_option("continue").try_into().unwrap(); - // Detect browser prompt by matching the prompt string // https://github.com/posit-dev/positron/issues/4742. // There are ways to break this detection, for instance setting @@ -1089,7 +1076,6 @@ impl Console { return PromptInfo { input_prompt: prompt, - continuation_prompt, kind, }; } @@ -1197,7 +1183,7 @@ impl Console { Some(exception) } - fn handle_active_request(&mut self, _info: &PromptInfo, value: ConsoleValue) { + fn handle_active_request(&mut self, value: ConsoleValue) { self.reset_global_env_rdebug(); // If we get here we finished evaluating all pending inputs. Check if we @@ -1337,7 +1323,7 @@ impl Console { } else { // Otherwise we got an empty input, e.g. `""` and there's // nothing to do. Close active request. - self.handle_active_request(info, ConsoleValue::Success(Default::default())); + self.handle_active_request(ConsoleValue::Success(Default::default())); // And return to event loop None @@ -1824,7 +1810,7 @@ impl Console { } } - fn handle_kernel_request(&mut self, req: KernelRequest, _info: &PromptInfo) { + fn handle_kernel_request(&mut self, req: KernelRequest) { log::trace!("Received kernel request {req:?}"); match req { @@ -2178,7 +2164,9 @@ impl Console { let busy = which != 0; // Send updated state to the frontend over the UI comm - self.send_ui_event(&UiFrontendEvent::Busy(BusyParams { busy })); + if let Some(ui) = self.ui_comm() { + ui.busy(busy); + } } /// Invoked by R to show a message to the user. @@ -2186,8 +2174,12 @@ impl Console { let message = unsafe { CStr::from_ptr(buf) }; let message = message.to_str().unwrap().to_string(); - // Deliver message to the frontend over the UI comm - self.send_ui_event(&UiFrontendEvent::ShowMessage(ShowMessageParams { message })); + if let Some(ui) = self.ui_comm() { + ui.show_message(message); + } else { + // Should we emit the message in the Console? + log::info!("`show_message`: {message}") + } } /// Invoked by the R event loop diff --git a/crates/ark/src/data_explorer/r_data_explorer.rs b/crates/ark/src/data_explorer/r_data_explorer.rs index 559261e22..b7570389d 100644 --- a/crates/ark/src/data_explorer/r_data_explorer.rs +++ b/crates/ark/src/data_explorer/r_data_explorer.rs @@ -1213,7 +1213,7 @@ pub unsafe extern "C-unwind" fn ps_view_data_frame( }; let explorer = RDataExplorer::new(title, x, env_info)?; - Console::get_mut().comm_register(DATA_EXPLORER_COMM_NAME, Box::new(explorer))?; + Console::get_mut().comm_open_backend(DATA_EXPLORER_COMM_NAME, Box::new(explorer))?; Ok(R_NilValue) } diff --git a/crates/ark/src/lsp/backend.rs b/crates/ark/src/lsp/backend.rs index 0584c709b..2c26a70cd 100644 --- a/crates/ark/src/lsp/backend.rs +++ b/crates/ark/src/lsp/backend.rs @@ -12,8 +12,6 @@ use std::sync::Arc; use amalthea::comm::server_comm::ServerStartMessage; use amalthea::comm::server_comm::ServerStartedMessage; -use amalthea::comm::ui_comm::ShowMessageParams as UiShowMessageParams; -use amalthea::comm::ui_comm::UiFrontendEvent; use anyhow::Context; use crossbeam::channel::Sender; use serde_json::Value; @@ -107,12 +105,12 @@ fn report_crash() { "with full logs (see https://positron.posit.co/troubleshooting.html#python-and-r-logs)." ); + // NOTE: This is a legit use of interrupt-time task. No R access here, and + // we need to go through Console since it owns the UI comm. r_task(|| { - let event = UiFrontendEvent::ShowMessage(UiShowMessageParams { - message: String::from(user_message), - }); - - Console::get().send_ui_event(&event); + if let Some(ui) = Console::get().ui_comm() { + ui.show_message(String::from(user_message)); + } }); } diff --git a/crates/ark/src/plots/graphics_device.rs b/crates/ark/src/plots/graphics_device.rs index 96b99512a..f2fe7afa6 100644 --- a/crates/ark/src/plots/graphics_device.rs +++ b/crates/ark/src/plots/graphics_device.rs @@ -301,7 +301,7 @@ impl DeviceContext { /// [SessionMode::Console] mode. fn should_use_dynamic_plots(&self) -> bool { let console = Console::get(); - console.is_ui_comm_connected() && console.session_mode() == SessionMode::Console + console.ui_comm().is_some() && console.session_mode() == SessionMode::Console } /// Deactivation hook diff --git a/crates/ark/src/shell.rs b/crates/ark/src/shell.rs index ac284ff78..f55ff8430 100644 --- a/crates/ark/src/shell.rs +++ b/crates/ark/src/shell.rs @@ -10,7 +10,6 @@ use amalthea::comm::comm_channel::CommMsg; use amalthea::language::shell_handler::CommHandled; use amalthea::language::shell_handler::ShellHandler; use amalthea::socket::comm::CommSocket; -use amalthea::socket::stdin::StdInRequest; use amalthea::wire::complete_reply::CompleteReply; use amalthea::wire::complete_request::CompleteRequest; use amalthea::wire::execute_reply::ExecuteReply; @@ -45,7 +44,6 @@ use tokio::sync::mpsc::UnboundedSender as AsyncUnboundedSender; use crate::ark_comm::ArkComm; use crate::console::Console; -use crate::console::ConsoleNotification; use crate::console::KernelInfo; use crate::data_explorer::r_data_explorer::DATA_EXPLORER_COMM_NAME; use crate::help::r_help::RHelp; @@ -60,12 +58,10 @@ use crate::variables::r_variables::RVariables; pub struct Shell { r_request_tx: Sender, - _stdin_request_tx: Sender, kernel_request_tx: Sender, kernel_init_rx: BusReader, kernel_info: Option, graphics_device_tx: AsyncUnboundedSender, - _console_notification_tx: AsyncUnboundedSender, } #[derive(Debug)] @@ -77,20 +73,16 @@ impl Shell { /// Creates a new instance of the shell message handler. pub(crate) fn new( r_request_tx: Sender, - stdin_request_tx: Sender, kernel_init_rx: BusReader, kernel_request_tx: Sender, graphics_device_tx: AsyncUnboundedSender, - console_notification_tx: AsyncUnboundedSender, ) -> Self { Self { r_request_tx, - _stdin_request_tx: stdin_request_tx, kernel_request_tx, kernel_init_rx, kernel_info: None, graphics_device_tx, - _console_notification_tx: console_notification_tx, } } diff --git a/crates/ark/src/start.rs b/crates/ark/src/start.rs index 2c3cbf5b1..8bce3a448 100644 --- a/crates/ark/src/start.rs +++ b/crates/ark/src/start.rs @@ -88,11 +88,9 @@ pub fn start_kernel( let kernel_init_rx = kernel_init_tx.add_rx(); let shell = Box::new(Shell::new( r_request_tx.clone(), - stdin_request_tx.clone(), kernel_init_rx, kernel_request_tx, graphics_device_tx, - console_notification_tx, )); // Create the control handler; this is used to handle shutdown/interrupt and diff --git a/crates/ark/src/ui/events.rs b/crates/ark/src/ui/events.rs index 4c96603bc..52828911e 100644 --- a/crates/ark/src/ui/events.rs +++ b/crates/ark/src/ui/events.rs @@ -14,7 +14,6 @@ use amalthea::comm::ui_comm::OpenWorkspaceParams; use amalthea::comm::ui_comm::Position; use amalthea::comm::ui_comm::Range; use amalthea::comm::ui_comm::SetEditorSelectionsParams; -use amalthea::comm::ui_comm::ShowMessageParams; use amalthea::comm::ui_comm::ShowUrlParams; use amalthea::comm::ui_comm::UiFrontendEvent; use harp::object::RObject; @@ -25,17 +24,9 @@ use crate::console::Console; #[harp::register] pub unsafe extern "C-unwind" fn ps_ui_show_message(message: SEXP) -> anyhow::Result { - let params = ShowMessageParams { - message: RObject::view(message).try_into()?, - }; - - let event = UiFrontendEvent::ShowMessage(params); + let message: String = RObject::view(message).try_into()?; - let console = Console::get(); - if !console.is_ui_comm_connected() { - return Err(ui_comm_not_connected("ui_show_message")); - } - console.send_ui_event(&event); + Console::get().try_ui_comm()?.show_message(message); Ok(R_NilValue) } @@ -52,11 +43,7 @@ pub unsafe extern "C-unwind" fn ps_ui_open_workspace( let event = UiFrontendEvent::OpenWorkspace(params); - let console = Console::get(); - if !console.is_ui_comm_connected() { - return Err(ui_comm_not_connected("ui_open_workspace")); - } - console.send_ui_event(&event); + Console::get().try_ui_comm()?.send_event(&event); Ok(R_NilValue) } @@ -81,11 +68,7 @@ pub unsafe extern "C-unwind" fn ps_ui_navigate_to_file( let event = UiFrontendEvent::OpenEditor(params); - let console = Console::get(); - if !console.is_ui_comm_connected() { - return Err(ui_comm_not_connected("ui_navigate_to_file")); - } - console.send_ui_event(&event); + Console::get().try_ui_comm()?.send_event(&event); Ok(R_NilValue) } @@ -97,11 +80,7 @@ pub unsafe extern "C-unwind" fn ps_ui_set_selection_ranges(ranges: SEXP) -> anyh let event = UiFrontendEvent::SetEditorSelections(params); - let console = Console::get(); - if !console.is_ui_comm_connected() { - return Err(ui_comm_not_connected("ui_set_selection_ranges")); - } - console.send_ui_event(&event); + Console::get().try_ui_comm()?.send_event(&event); Ok(R_NilValue) } @@ -113,11 +92,7 @@ pub fn send_show_url_event(url: &str) -> anyhow::Result<()> { }; let event = UiFrontendEvent::ShowUrl(params); - let console = Console::get(); - if !console.is_ui_comm_connected() { - return Err(ui_comm_not_connected("show_url")); - } - console.send_ui_event(&event); + Console::get().try_ui_comm()?.send_event(&event); Ok(()) } @@ -135,11 +110,7 @@ pub fn send_open_with_system_event(path: &str) -> anyhow::Result<()> { }; let event = UiFrontendEvent::OpenWithSystem(params); - let console = Console::get(); - if !console.is_ui_comm_connected() { - return Err(ui_comm_not_connected("open_with_system")); - } - console.send_ui_event(&event); + Console::get().try_ui_comm()?.send_event(&event); Ok(()) } @@ -166,7 +137,3 @@ pub fn ps_ui_robj_as_ranges(ranges: SEXP) -> anyhow::Result> { .collect(); Ok(selections) } - -fn ui_comm_not_connected(name: &str) -> anyhow::Error { - anyhow::anyhow!("UI comm not connected, can't run `{name}`.") -} diff --git a/crates/ark/src/ui/methods.rs b/crates/ark/src/ui/methods.rs index 5e9778c24..3036077c5 100644 --- a/crates/ark/src/ui/methods.rs +++ b/crates/ark/src/ui/methods.rs @@ -25,7 +25,9 @@ use crate::ui::events::ps_ui_robj_as_ranges; #[harp::register] pub unsafe extern "C-unwind" fn ps_ui_last_active_editor_context() -> anyhow::Result { - let out = Console::get().call_frontend_method(UiFrontendRequest::LastActiveEditorContext)?; + let out = Console::get() + .try_ui_comm()? + .call_frontend_method(UiFrontendRequest::LastActiveEditorContext)?; Ok(out.sexp) } @@ -43,14 +45,17 @@ pub unsafe extern "C-unwind" fn ps_ui_modify_editor_selections( } let params = ModifyEditorSelectionsParams { selections, values }; - let out = - Console::get().call_frontend_method(UiFrontendRequest::ModifyEditorSelections(params))?; + let out = Console::get() + .try_ui_comm()? + .call_frontend_method(UiFrontendRequest::ModifyEditorSelections(params))?; Ok(out.sexp) } #[harp::register] pub unsafe extern "C-unwind" fn ps_ui_workspace_folder() -> anyhow::Result { - let out = Console::get().call_frontend_method(UiFrontendRequest::WorkspaceFolder)?; + let out = Console::get() + .try_ui_comm()? + .call_frontend_method(UiFrontendRequest::WorkspaceFolder)?; Ok(out.sexp) } @@ -64,7 +69,9 @@ pub unsafe extern "C-unwind" fn ps_ui_show_dialog( message: RObject::view(message).try_into()?, }; - let out = Console::get().call_frontend_method(UiFrontendRequest::ShowDialog(params))?; + let out = Console::get() + .try_ui_comm()? + .call_frontend_method(UiFrontendRequest::ShowDialog(params))?; Ok(out.sexp) } @@ -90,7 +97,9 @@ pub unsafe extern "C-unwind" fn ps_ui_show_question( }, }; - let out = Console::get().call_frontend_method(UiFrontendRequest::ShowQuestion(params))?; + let out = Console::get() + .try_ui_comm()? + .call_frontend_method(UiFrontendRequest::ShowQuestion(params))?; Ok(out.sexp) } @@ -120,7 +129,9 @@ pub extern "C-unwind" fn ps_ui_show_prompt( timeout: timeout_secs, }; - let out = Console::get().call_frontend_method(UiFrontendRequest::ShowPrompt(params))?; + let out = Console::get() + .try_ui_comm()? + .call_frontend_method(UiFrontendRequest::ShowPrompt(params))?; Ok(out.sexp) } @@ -130,7 +141,9 @@ pub unsafe extern "C-unwind" fn ps_ui_ask_for_password(prompt: SEXP) -> anyhow:: prompt: RObject::view(prompt).try_into()?, }; - let out = Console::get().call_frontend_method(UiFrontendRequest::AskForPassword(params))?; + let out = Console::get() + .try_ui_comm()? + .call_frontend_method(UiFrontendRequest::AskForPassword(params))?; Ok(out.sexp) } @@ -144,7 +157,9 @@ pub unsafe extern "C-unwind" fn ps_ui_new_document( language_id: RObject::view(language_id).try_into()?, }; - let out = Console::get().call_frontend_method(UiFrontendRequest::NewDocument(params))?; + let out = Console::get() + .try_ui_comm()? + .call_frontend_method(UiFrontendRequest::NewDocument(params))?; Ok(out.sexp) } @@ -154,7 +169,9 @@ pub unsafe extern "C-unwind" fn ps_ui_execute_command(command: SEXP) -> anyhow:: command: RObject::view(command).try_into()?, }; - let out = Console::get().call_frontend_method(UiFrontendRequest::ExecuteCommand(params))?; + let out = Console::get() + .try_ui_comm()? + .call_frontend_method(UiFrontendRequest::ExecuteCommand(params))?; Ok(out.sexp) } @@ -170,7 +187,9 @@ pub unsafe extern "C-unwind" fn ps_ui_execute_code( allow_incomplete: false, }; - let out = Console::get().call_frontend_method(UiFrontendRequest::ExecuteCode(params))?; + let out = Console::get() + .try_ui_comm()? + .call_frontend_method(UiFrontendRequest::ExecuteCode(params))?; Ok(out.sexp) } @@ -182,7 +201,9 @@ pub unsafe extern "C-unwind" fn ps_ui_evaluate_when_clause( when_clause: RObject::view(when_clause).try_into()?, }; - let out = Console::get().call_frontend_method(UiFrontendRequest::EvaluateWhenClause(params))?; + let out = Console::get() + .try_ui_comm()? + .call_frontend_method(UiFrontendRequest::EvaluateWhenClause(params))?; Ok(out.sexp) } @@ -192,6 +213,8 @@ pub unsafe extern "C-unwind" fn ps_ui_debug_sleep(ms: SEXP) -> anyhow::Result, working_directory: PathBuf, } +impl CommHandler for UiComm { + fn handle_open(&mut self, ctx: &CommHandlerContext) { + self.refresh(ctx); + } + + fn handle_msg(&mut self, msg: CommMsg, ctx: &CommHandlerContext) { + handle_rpc_request(&ctx.outgoing_tx, UI_COMM_NAME, msg, |req| { + self.handle_rpc(req) + }); + } + + fn handle_environment(&mut self, event: EnvironmentChanged, ctx: &CommHandlerContext) { + let EnvironmentChanged::Execution = event else { + return; + }; + self.refresh(ctx); + } +} + impl UiComm { pub(crate) fn new( graphics_device_tx: AsyncUnboundedSender, @@ -95,7 +100,7 @@ impl UiComm { let exists_obj = RFunction::from("exists") .param("x", method.clone()) .call()?; - let exists = unsafe { RObject::to::(exists_obj) }?; + let exists: bool = exists_obj.try_into()?; if !exists { let method = &request.method; @@ -156,16 +161,16 @@ impl UiComm { ConsoleOutputCapture::dummy() }; - let eval_result = parse_eval_global(¶ms.code); + let value = parse_eval_global(¶ms.code); let output = capture.take(); drop(capture); - match eval_result { + match value { Ok(evaluated) => { - let value = Value::try_from(evaluated)?; + let result = Value::try_from(evaluated)?; Ok(UiBackendReply::EvaluateCodeReply(EvalResult { - result: value, + result, output, })) }, @@ -181,26 +186,23 @@ impl UiComm { } } - fn send_refresh(&mut self, ctx: &CommHandlerContext) { + fn refresh(&mut self, ctx: &CommHandlerContext) { self.refresh_prompt_info(ctx); - - if let Err(err) = self.refresh_working_directory(ctx) { - log::error!("Can't refresh working directory: {err:?}"); - } + self.refresh_working_directory(ctx).log_err(); } fn refresh_prompt_info(&self, ctx: &CommHandlerContext) { - let input_prompt = Self::get_r_option("prompt").unwrap_or_else(|| String::from("> ")); - let continuation_prompt = - Self::get_r_option("continue").unwrap_or_else(|| String::from("+ ")); - - send_ui_event( - &ctx.outgoing_tx, - &UiFrontendEvent::PromptState(PromptStateParams { - input_prompt, - continuation_prompt, - }), - ); + let input_prompt: String = harp::get_option("prompt") + .try_into() + .unwrap_or_else(|_| String::from("> ")); + let continuation_prompt: String = harp::get_option("continue") + .try_into() + .unwrap_or_else(|_| String::from("+ ")); + + ctx.send_event(&UiFrontendEvent::PromptState(PromptStateParams { + input_prompt, + continuation_prompt, + })); } /// Checks for changes to the working directory, and sends an event to the @@ -220,41 +222,13 @@ impl UiComm { } } - send_ui_event( - &ctx.outgoing_tx, - &UiFrontendEvent::WorkingDirectory(WorkingDirectoryParams { - directory: new_working_directory.to_string_lossy().to_string(), - }), - ); + ctx.send_event(&UiFrontendEvent::WorkingDirectory(WorkingDirectoryParams { + directory: new_working_directory.to_string_lossy().to_string(), + })); } Ok(()) } - - fn get_r_option(name: &str) -> Option { - let obj = RFunction::from("getOption").param("x", name).call().ok()?; - let value = Value::try_from(obj).ok()?; - value.as_str().map(String::from) - } -} - -impl CommHandler for UiComm { - fn handle_open(&mut self, ctx: &CommHandlerContext) { - self.send_refresh(ctx); - } - - fn handle_msg(&mut self, msg: CommMsg, ctx: &CommHandlerContext) { - handle_rpc_request(&ctx.outgoing_tx, UI_COMM_NAME, msg, |req| { - self.handle_rpc(req) - }); - } - - fn handle_environment(&mut self, event: EnvironmentChanged, ctx: &CommHandlerContext) { - let EnvironmentChanged::Execution = event else { - return; - }; - self.send_refresh(ctx); - } } #[cfg(test)] @@ -272,23 +246,24 @@ mod tests { use ark_test::dummy_jupyter_header; use ark_test::IOPubReceiverExt; use crossbeam::channel::bounded; - use harp::exec::RFunction; - use harp::exec::RFunctionExt; - use harp::object::RObject; use serde_json::Value; use super::*; use crate::comm_handler::CommHandlerContext; + use crate::r_task; fn setup_ui_comm( iopub_tx: crossbeam::channel::Sender, ) -> (UiComm, CommHandlerContext) { let comm_id = uuid::Uuid::new_v4().to_string(); + let outgoing_tx = CommOutgoingTx::new(comm_id, iopub_tx); let (comm_event_tx, _) = bounded::(10); let ctx = CommHandlerContext::new(outgoing_tx, comm_event_tx); + let (graphics_device_tx, _) = tokio::sync::mpsc::unbounded_channel(); let handler = UiComm::new(graphics_device_tx); + (handler, ctx) } @@ -296,17 +271,11 @@ mod tests { fn test_ui_comm() { let (iopub_tx, iopub_rx) = bounded::(10); - let old_width = crate::r_task::r_task(move || { + let old_width = r_task(move || { let (mut handler, ctx) = setup_ui_comm(iopub_tx); // Get the current console width - let old_width = unsafe { - let width = RFunction::from("getOption") - .param("x", "width") - .call() - .unwrap(); - RObject::to::(width).unwrap() - }; + let old_width: i32 = harp::get_option("width").try_into().unwrap(); // Send a setConsoleWidth RPC let msg = CommMsg::Rpc { @@ -321,13 +290,7 @@ mod tests { handler.handle_msg(msg, &ctx); // Assert that the console width changed - let new_width = unsafe { - let width = RFunction::from("getOption") - .param("x", "width") - .call() - .unwrap(); - RObject::to::(width).unwrap() - }; + let new_width: i32 = harp::get_option("width").try_into().unwrap(); assert_eq!(new_width, 123); // Now try to invoke an RPC that doesn't exist @@ -374,7 +337,7 @@ mod tests { fn test_evaluate_code() { let (iopub_tx, iopub_rx) = bounded::(10); - crate::r_task::r_task(move || { + r_task(move || { let (mut handler, ctx) = setup_ui_comm(iopub_tx); // Pure result with no output (e.g. 1 + 1) diff --git a/crates/ark/src/variables/r_variables.rs b/crates/ark/src/variables/r_variables.rs index e3e0f973b..9af576f43 100644 --- a/crates/ark/src/variables/r_variables.rs +++ b/crates/ark/src/variables/r_variables.rs @@ -361,7 +361,7 @@ impl RVariables { let explorer = RDataExplorer::new(name.clone(), obj, Some(binding)) .map_err(|err| harp::Error::Anyhow(err))?; let viewer_id = Console::get_mut() - .comm_register(DATA_EXPLORER_COMM_NAME, Box::new(explorer)) + .comm_open_backend(DATA_EXPLORER_COMM_NAME, Box::new(explorer)) .map_err(|err| harp::Error::Anyhow(err))?; Ok(Some(viewer_id)) }) diff --git a/crates/ark/src/viewer.rs b/crates/ark/src/viewer.rs index 9883701be..351ffa698 100644 --- a/crates/ark/src/viewer.rs +++ b/crates/ark/src/viewer.rs @@ -112,10 +112,7 @@ pub unsafe extern "C-unwind" fn ps_html_viewer( // TODO: What's the right thing to do in `Console` mode when // we aren't connected to Positron? Right now we error. - if !console.is_ui_comm_connected() { - return Err(anyhow::anyhow!("UI comm not connected.")); - } - console.send_ui_event(&event); + console.try_ui_comm()?.send_event(&event); }, } }, diff --git a/crates/ark/tests/evaluate-code.rs b/crates/ark/tests/evaluate-code.rs index 8a9073b17..f3e4a8cb7 100644 --- a/crates/ark/tests/evaluate-code.rs +++ b/crates/ark/tests/evaluate-code.rs @@ -21,14 +21,10 @@ fn evaluate_code(frontend: &DummyArkFrontend, comm_id: &str, code: &str) -> UiBa }); frontend.send_shell_comm_msg(String::from(comm_id), data); - - // The UI comm runs on the R thread via CommHandler, so the RPC reply - // arrives deterministically before Idle. frontend.recv_iopub_busy(); let reply = frontend.recv_iopub_comm_msg(); assert_eq!(reply.comm_id, comm_id); - frontend.recv_iopub_idle(); serde_json::from_value::(reply.data).unwrap() diff --git a/crates/ark_test/src/dummy_frontend.rs b/crates/ark_test/src/dummy_frontend.rs index e3ad91e19..743fd0be0 100644 --- a/crates/ark_test/src/dummy_frontend.rs +++ b/crates/ark_test/src/dummy_frontend.rs @@ -636,36 +636,6 @@ impl DummyArkFrontend { } } - /// Receive a CommMsg and Idle from IOPub in either order. - /// - /// Some comm RPC replies race with the shell's Idle status because the - /// reply is sent from a separate thread (e.g. the UI comm thread). This - /// helper accepts both orderings and returns the CommMsg content. - #[track_caller] - pub fn recv_iopub_comm_msg_and_idle(&self) -> amalthea::wire::comm_msg::CommWireMsg { - let first = self.recv_iopub_next(); - let second = self.recv_iopub_next(); - - let (comm_msg, idle) = match (first, second) { - (Message::CommMsg(comm), Message::Status(status)) => (comm, status), - (Message::Status(status), Message::CommMsg(comm)) => (comm, status), - (a, b) => panic!( - "Expected CommMsg and Idle in either order, got {:?} and {:?}", - a, b - ), - }; - - assert_eq!( - idle.content.execution_state, - amalthea::wire::status::ExecutionState::Idle, - "Expected Idle status" - ); - - self.flush_streams_at_boundary(); - - comm_msg.content - } - /// Receive from IOPub and assert CommOpen message. /// Automatically skips any Stream messages. #[track_caller] @@ -1094,41 +1064,29 @@ impl DummyArkFrontend { data: serde_json::json!({}), }); - // The UI comm now runs on the R thread via CommHandler. The - // comm_open blocks Shell while the handler's `handle_open()` runs, - // so events (prompt_state, working_directory) arrive - // deterministically within the Busy/Idle window. + // The UI comm runs on the R thread via CommHandler. The comm_open + // blocks Shell while the handler's `handle_open()` runs, so events + // arrive deterministically within the Busy/Idle window. self.recv_iopub_busy(); - // Drain the initial events sent by `handle_open()` (prompt_state, - // working_directory). We don't assert on their content here. - let mut comm_msg_count = 0; - loop { - let msg = self.recv_iopub_next(); - match msg { - Message::CommMsg(_) => { - comm_msg_count += 1; - }, - Message::Status(ref data) - if data.content.execution_state == - amalthea::wire::status::ExecutionState::Idle => - { - self.flush_streams_at_boundary(); - break; - }, - Message::Stream(ref data) => { - self.buffer_stream(&data.content); - }, - other => panic!("Unexpected message during open_ui_comm: {other:?}"), - } - } + // `handle_open()` calls `refresh()` which sends prompt_state then + // working_directory. + let prompt_state = self.recv_iopub_comm_msg(); + assert_eq!(prompt_state.comm_id, comm_id); + assert_eq!( + prompt_state.data.get("method").and_then(|v| v.as_str()), + Some("prompt_state") + ); - // We expect at least the prompt_state event - assert!( - comm_msg_count >= 1, - "Expected at least 1 comm event from UI comm open, got {comm_msg_count}" + let working_dir = self.recv_iopub_comm_msg(); + assert_eq!(working_dir.comm_id, comm_id); + assert_eq!( + working_dir.data.get("method").and_then(|v| v.as_str()), + Some("working_directory") ); + self.recv_iopub_idle(); + comm_id } From ef3eb591e8f48551aa60d3f0d79ef0bab5b3143d Mon Sep 17 00:00:00 2001 From: Lionel Henry Date: Wed, 11 Mar 2026 08:08:21 +0100 Subject: [PATCH 3/3] Rename to `ui_comm.rs` to fix clippy --- crates/ark/src/ui/mod.rs | 6 +++--- crates/ark/src/ui/{ui.rs => ui_comm.rs} | 0 2 files changed, 3 insertions(+), 3 deletions(-) rename crates/ark/src/ui/{ui.rs => ui_comm.rs} (100%) diff --git a/crates/ark/src/ui/mod.rs b/crates/ark/src/ui/mod.rs index b36d0c2c0..fcae53fc7 100644 --- a/crates/ark/src/ui/mod.rs +++ b/crates/ark/src/ui/mod.rs @@ -8,6 +8,6 @@ pub mod events; pub mod methods; -mod ui; -pub use ui::UiComm; -pub use ui::UI_COMM_NAME; +mod ui_comm; +pub use ui_comm::UiComm; +pub use ui_comm::UI_COMM_NAME; diff --git a/crates/ark/src/ui/ui.rs b/crates/ark/src/ui/ui_comm.rs similarity index 100% rename from crates/ark/src/ui/ui.rs rename to crates/ark/src/ui/ui_comm.rs