diff --git a/crates/ark/src/comm_handler.rs b/crates/ark/src/comm_handler.rs index 0a5654246..5b7f1ccaa 100644 --- a/crates/ark/src/comm_handler.rs +++ b/crates/ark/src/comm_handler.rs @@ -47,6 +47,15 @@ impl CommHandlerContext { pub fn is_closed(&self) -> bool { self.closed.get() } + + /// Send a serializable event as `CommMsg::Data` on the outgoing channel. + /// Serialization or send errors are logged and ignored. + pub fn send_event(&self, event: &T) { + let Some(json) = serde_json::to_value(event).log_err() else { + return; + }; + self.outgoing_tx.send(CommMsg::Data(json)).log_err(); + } } /// Trait for comm handlers that run synchronously on the R thread. diff --git a/crates/ark/src/console.rs b/crates/ark/src/console.rs index 05efb80f1..2b291ae4c 100644 --- a/crates/ark/src/console.rs +++ b/crates/ark/src/console.rs @@ -128,7 +128,6 @@ pub(crate) use console_repl::ConsoleNotification; pub(crate) use console_repl::ConsoleOutputCapture; pub(crate) use console_repl::KernelInfo; use console_repl::PendingInputs; -use console_repl::PromptInfo; use console_repl::ReadConsolePendingAction; pub use console_repl::SessionMode; @@ -166,8 +165,6 @@ use crate::srcref::ns_populate_srcref; use crate::srcref::resource_loaded_namespaces; use crate::startup; use crate::sys::console::console_to_utf8; -use crate::ui::UiCommMessage; -use crate::ui::UiCommSender; use crate::url::UrlId; thread_local! { @@ -224,9 +221,9 @@ pub(crate) struct Console { tasks_idle_any_rx: Receiver, pending_futures: HashMap, RTaskStartInfo, Option)>, - /// Channel to communicate requests and events to the frontend - /// by forwarding them through the UI comm. Optional, and really Positron specific. - ui_comm_tx: Option, + /// Comm ID of the currently connected UI comm, if any. + /// The handler lives in `self.comms`; this is just an index into it. + ui_comm_id: Option, /// Error captured by our global condition handler during the last iteration /// of the REPL. diff --git a/crates/ark/src/console/console_comm.rs b/crates/ark/src/console/console_comm.rs index f10fbf754..ef8d1d6fb 100644 --- a/crates/ark/src/console/console_comm.rs +++ b/crates/ark/src/console/console_comm.rs @@ -3,11 +3,11 @@ // // Copyright (C) 2026 Posit Software, PBC. All rights reserved. // -// use amalthea::comm::comm_channel::CommMsg; use amalthea::comm::event::CommEvent; use amalthea::socket::comm::CommInitiator; +use amalthea::socket::comm::CommOutgoingTx; use amalthea::socket::comm::CommSocket; use stdext::result::ResultExt; use uuid::Uuid; @@ -17,6 +17,7 @@ use crate::comm_handler::CommHandlerContext; use crate::comm_handler::ConsoleComm; use crate::comm_handler::EnvironmentChanged; use crate::console::Console; +use crate::ui::UI_COMM_NAME; impl Console { pub(super) fn comm_handle_msg(&mut self, comm_id: &str, msg: CommMsg) { @@ -29,6 +30,10 @@ impl Console { } pub(super) fn comm_handle_close(&mut self, comm_id: &str) { + if self.ui_comm_id.as_deref() == Some(comm_id) { + self.ui_comm_id = None; + } + let Some(mut reg) = self.comms.remove(comm_id) else { log::warn!("Received close for unknown registered comm {comm_id}"); return; @@ -40,7 +45,7 @@ impl Console { /// /// Creates the `CommSocket` and `CommHandlerContext`, calls `handle_open`, /// sends `CommEvent::Opened` to amalthea, and returns the comm ID. - pub(crate) fn comm_register( + pub(crate) fn comm_open_backend( &mut self, comm_name: &str, mut handler: Box, @@ -67,6 +72,35 @@ impl Console { Ok(comm_id) } + /// Register a frontend-initiated comm on the R thread. + /// + /// Unlike `comm_open_backend` (which is for backend-initiated comms and + /// sends `CommEvent::Opened`), this is called when the frontend opened the + /// comm. The `CommSocket` already exists in amalthea's open_comms list, so + /// we only need to register the handler and call `handle_open`. + pub(super) fn comm_open_frontend( + &mut self, + comm_id: String, + comm_name: &str, + outgoing_tx: CommOutgoingTx, + mut handler: Box, + ) { + let ctx = CommHandlerContext::new(outgoing_tx, self.comm_event_tx.clone()); + handler.handle_open(&ctx); + + if comm_name == UI_COMM_NAME { + if let Some(old_id) = self.ui_comm_id.take() { + log::info!("Replacing an existing UI comm."); + if let Some(mut old) = self.comms.remove(&old_id) { + old.handler.handle_close(&old.ctx); + } + } + self.ui_comm_id = Some(comm_id.clone()); + } + + self.comms.insert(comm_id, ConsoleComm { handler, ctx }); + } + pub(super) fn comm_notify_environment_changed(&mut self, event: EnvironmentChanged) { for (_, reg) in self.comms.iter_mut() { reg.handler.handle_environment(event, ®.ctx); @@ -84,6 +118,11 @@ impl Console { .collect(); for comm_id in closed_ids { + // We're not expecting the UI comm to close itself but we handle the + // case explicitly to be defensive + if self.ui_comm_id.as_deref() == Some(comm_id.as_str()) { + self.ui_comm_id = None; + } if let Some(reg) = self.comms.remove(&comm_id) { self.comm_notify_closed(&comm_id, ®); } diff --git a/crates/ark/src/console/console_integration.rs b/crates/ark/src/console/console_integration.rs index 274357083..52d61e463 100644 --- a/crates/ark/src/console/console_integration.rs +++ b/crates/ark/src/console/console_integration.rs @@ -11,129 +11,22 @@ use super::*; /// UI comm integration. impl Console { - pub(super) fn handle_establish_ui_comm_channel( - &mut self, - ui_comm_tx: Sender, - info: &PromptInfo, - ) { - if self.ui_comm_tx.is_some() { - log::info!("Replacing an existing UI comm channel."); - } - - // Create and store the sender channel - self.ui_comm_tx = Some(UiCommSender::new(ui_comm_tx)); - - // Go ahead and do an initial refresh - self.with_mut_ui_comm_tx(|ui_comm_tx| { - let input_prompt = info.input_prompt.clone(); - let continuation_prompt = info.continuation_prompt.clone(); - - ui_comm_tx.send_refresh(input_prompt, continuation_prompt); - }); - } - pub(crate) fn session_mode(&self) -> SessionMode { self.session_mode } - pub(crate) fn get_ui_comm_tx(&self) -> Option<&UiCommSender> { - self.ui_comm_tx.as_ref() - } - - fn get_mut_ui_comm_tx(&mut self) -> Option<&mut UiCommSender> { - self.ui_comm_tx.as_mut() - } - - pub(super) fn with_ui_comm_tx(&self, f: F) - where - F: FnOnce(&UiCommSender), - { - match self.get_ui_comm_tx() { - Some(ui_comm_tx) => f(ui_comm_tx), - None => { - // Trace level logging, its typically not a bug if the frontend - // isn't connected. Happens in all Jupyter use cases. - log::trace!("UI comm isn't connected, dropping `f`."); - }, - } + pub(crate) fn ui_comm(&self) -> Option> { + let comm = self.comms.get(self.ui_comm_id.as_deref()?)?; + Some(UiCommRef { + comm, + originator: self.active_request.as_ref().map(|r| &r.originator), + stdin_request_tx: &self.stdin_request_tx, + }) } - pub(super) fn with_mut_ui_comm_tx(&mut self, mut f: F) - where - F: FnMut(&mut UiCommSender), - { - match self.get_mut_ui_comm_tx() { - Some(ui_comm_tx) => f(ui_comm_tx), - None => { - // Trace level logging, its typically not a bug if the frontend - // isn't connected. Happens in all Jupyter use cases. - log::trace!("UI comm isn't connected, dropping `f`."); - }, - } - } - - pub(crate) fn is_ui_comm_connected(&self) -> bool { - self.get_ui_comm_tx().is_some() - } - - pub(crate) fn call_frontend_method( - &self, - request: UiFrontendRequest, - ) -> anyhow::Result { - log::trace!("Calling frontend method {request:?}"); - - let ui_comm_tx = self.get_ui_comm_tx().ok_or_else(|| { - anyhow::anyhow!("UI comm is not connected. Can't execute request {request:?}") - })?; - - let (reply_tx, reply_rx) = bounded(1); - - let Some(req) = &self.active_request else { - return Err(anyhow::anyhow!( - "No active request. Can't execute request {request:?}" - )); - }; - - // Forward request to UI comm - ui_comm_tx.send_request(UiCommFrontendRequest { - originator: req.originator.clone(), - reply_tx, - request: request.clone(), - }); - - // Block for reply - let reply = reply_rx.recv().unwrap(); - - log::trace!("Got reply from frontend method: {reply:?}"); - - match reply { - StdInRpcReply::Reply(reply) => match reply { - JsonRpcReply::Result(reply) => { - // Deserialize to Rust first to verify the OpenRPC contract. - // Errors are propagated to R. - if let Err(err) = ui_frontend_reply_from_value(reply.result.clone(), &request) { - return Err(anyhow::anyhow!( - "Can't deserialize RPC reply for {request:?}:\n{err:?}" - )); - } - - // Now deserialize to an R object - Ok(RObject::try_from(reply.result)?) - }, - JsonRpcReply::Error(reply) => { - let message = reply.error.message; - - return Err(anyhow::anyhow!( - "While calling frontend method:\n\ - {message}", - )); - }, - }, - // If an interrupt was signalled, return `NULL`. This should not be - // visible to the caller since `r_unwrap()` (called e.g. by - // `harp::register`) will trigger an interrupt jump right away. - StdInRpcReply::Interrupt => Ok(RObject::null()), - } + pub(crate) fn try_ui_comm(&self) -> anyhow::Result> { + self.ui_comm() + .ok_or_else(|| anyhow!("UI comm is not connected")) } } @@ -270,3 +163,83 @@ impl Console { self.lsp_virtual_documents.get(uri).cloned() } } + +/// Reference to the UI comm. Returned by `Console::ui_comm()`. +/// +/// Existence of this value guarantees the comm is connected. +pub(crate) struct UiCommRef<'a> { + comm: &'a ConsoleComm, + originator: Option<&'a Originator>, + stdin_request_tx: &'a Sender, +} + +impl UiCommRef<'_> { + pub(crate) fn send_event(&self, event: &UiFrontendEvent) { + self.comm.ctx.send_event(event); + } + + pub(crate) fn busy(&self, busy: bool) { + self.send_event(&UiFrontendEvent::Busy(BusyParams { busy })); + } + + pub(crate) fn show_message(&self, message: String) { + self.send_event(&UiFrontendEvent::ShowMessage(ShowMessageParams { message })); + } + + pub(crate) fn call_frontend_method( + &self, + request: UiFrontendRequest, + ) -> anyhow::Result { + log::trace!("Calling frontend method {request:?}"); + + let (reply_tx, reply_rx) = bounded(1); + + let Some(originator) = self.originator else { + return Err(anyhow!( + "No active request. Can't execute request {request:?}" + )); + }; + + // Forward request directly to the stdin channel + let comm_msg = StdInRequest::Comm(UiCommFrontendRequest { + originator: originator.clone(), + reply_tx, + request: request.clone(), + }); + self.stdin_request_tx.send(comm_msg)?; + + // Block for reply + let reply = reply_rx.recv()?; + + log::trace!("Got reply from frontend method: {reply:?}"); + + match reply { + StdInRpcReply::Reply(reply) => match reply { + JsonRpcReply::Result(reply) => { + // Deserialize to Rust first to verify the OpenRPC contract. + // Errors are propagated to R. + if let Err(err) = ui_frontend_reply_from_value(reply.result.clone(), &request) { + return Err(anyhow!( + "Can't deserialize RPC reply for {request:?}:\n{err:?}" + )); + } + + // Now deserialize to an R object + Ok(RObject::try_from(reply.result)?) + }, + JsonRpcReply::Error(reply) => { + let message = reply.error.message; + + return Err(anyhow!( + "While calling frontend method:\n\ + {message}", + )); + }, + }, + // If an interrupt was signalled, return `NULL`. This should not be + // visible to the caller since `r_unwrap()` (called e.g. by + // `harp::register`) will trigger an interrupt jump right away. + StdInRpcReply::Interrupt => Ok(RObject::null()), + } + } +} diff --git a/crates/ark/src/console/console_repl.rs b/crates/ark/src/console/console_repl.rs index 167da981d..0e249fd78 100644 --- a/crates/ark/src/console/console_repl.rs +++ b/crates/ark/src/console/console_repl.rs @@ -212,15 +212,6 @@ pub(super) struct PromptInfo { /// case of a browser prompt or a readline prompt. pub(super) input_prompt: String, - /// The continuation prompt string when user supplies incomplete - /// inputs. This always corresponds to `getOption("continue")`. We send - /// it to frontends along with `prompt` because some frontends such as - /// Positron do not send incomplete inputs to Ark and take charge of - /// continuation prompts themselves. For frontends that can send - /// incomplete inputs to Ark, like Jupyter Notebooks, we immediately - /// error on them rather than requesting that this be shown. - pub(super) continuation_prompt: String, - /// The kind of prompt we're handling. pub(super) kind: PromptKind, } @@ -608,7 +599,7 @@ impl Console { active_request: None, execution_count: 0, autoprint_output: String::new(), - ui_comm_tx: None, + ui_comm_id: None, last_error: None, help_event_tx: None, help_port: None, @@ -881,7 +872,7 @@ impl Console { self.pending_inputs = None; // Reply to active request with error, then fall through to event loop - self.handle_active_request(&info, ConsoleValue::Error(exception)); + self.handle_active_request(ConsoleValue::Error(exception)); } else if matches!(info.kind, PromptKind::InputRequest) { // Request input from the frontend and return it to R return self.handle_input_request(&info, buf, buflen); @@ -892,7 +883,7 @@ impl Console { // Otherwise reply to active request with accumulated result, then // fall through to event loop let result = self.take_result(); - self.handle_active_request(&info, ConsoleValue::Success(result)); + self.handle_active_request(ConsoleValue::Success(result)); } // In the future we'll also send browser information, see @@ -1022,7 +1013,7 @@ impl Console { // We've got a kernel request i if i == kernel_request_index => { let req = oper.recv(&kernel_request_rx).unwrap(); - self.handle_kernel_request(req, &info); + self.handle_kernel_request(req); }, // An interrupt task woke us up @@ -1064,10 +1055,6 @@ impl Console { let prompt_slice = unsafe { CStr::from_ptr(prompt_c) }; let prompt = prompt_slice.to_string_lossy().into_owned(); - // Sent to the frontend after each top-level command so users can - // customise their prompts - let continuation_prompt: String = harp::get_option("continue").try_into().unwrap(); - // Detect browser prompt by matching the prompt string // https://github.com/posit-dev/positron/issues/4742. // There are ways to break this detection, for instance setting @@ -1089,7 +1076,6 @@ impl Console { return PromptInfo { input_prompt: prompt, - continuation_prompt, kind, }; } @@ -1197,7 +1183,7 @@ impl Console { Some(exception) } - fn handle_active_request(&mut self, info: &PromptInfo, value: ConsoleValue) { + fn handle_active_request(&mut self, value: ConsoleValue) { self.reset_global_env_rdebug(); // If we get here we finished evaluating all pending inputs. Check if we @@ -1209,17 +1195,6 @@ impl Console { return; }; - // Perform a refresh of the frontend state (Prompts, working - // directory, etc) - // TODO: Once the UI comm is migrated to the `CommHandler` path, this - // becomes a `handle_environment` impl reacting to `Execution`. - self.with_mut_ui_comm_tx(|ui_comm_tx| { - let input_prompt = info.input_prompt.clone(); - let continuation_prompt = info.continuation_prompt.clone(); - - ui_comm_tx.send_refresh(input_prompt, continuation_prompt); - }); - // Check for pending graphics updates // (Important that this occurs while in the "busy" state of this ExecuteRequest // so that the `parent` message is set correctly in any Jupyter messages) @@ -1348,7 +1323,7 @@ impl Console { } else { // Otherwise we got an empty input, e.g. `""` and there's // nothing to do. Close active request. - self.handle_active_request(info, ConsoleValue::Success(Default::default())); + self.handle_active_request(ConsoleValue::Success(Default::default())); // And return to event loop None @@ -1835,12 +1810,19 @@ impl Console { } } - fn handle_kernel_request(&mut self, req: KernelRequest, info: &PromptInfo) { + fn handle_kernel_request(&mut self, req: KernelRequest) { log::trace!("Received kernel request {req:?}"); match req { - KernelRequest::EstablishUiCommChannel(ref ui_comm_tx) => { - self.handle_establish_ui_comm_channel(ui_comm_tx.clone(), info) + KernelRequest::CommOpen { + comm_id, + comm_name, + outgoing_tx, + handler, + done_tx, + } => { + self.comm_open_frontend(comm_id, &comm_name, outgoing_tx, handler); + done_tx.send(()).log_err(); }, KernelRequest::CommMsg { comm_id, @@ -2182,9 +2164,9 @@ impl Console { let busy = which != 0; // Send updated state to the frontend over the UI comm - self.with_ui_comm_tx(|ui_comm_tx| { - ui_comm_tx.send_event(UiFrontendEvent::Busy(BusyParams { busy })); - }); + if let Some(ui) = self.ui_comm() { + ui.busy(busy); + } } /// Invoked by R to show a message to the user. @@ -2192,10 +2174,12 @@ impl Console { let message = unsafe { CStr::from_ptr(buf) }; let message = message.to_str().unwrap().to_string(); - // Deliver message to the frontend over the UI comm - self.with_ui_comm_tx(|ui_comm_tx| { - ui_comm_tx.send_event(UiFrontendEvent::ShowMessage(ShowMessageParams { message })) - }); + if let Some(ui) = self.ui_comm() { + ui.show_message(message); + } else { + // Should we emit the message in the Console? + log::info!("`show_message`: {message}") + } } /// Invoked by the R event loop diff --git a/crates/ark/src/data_explorer/r_data_explorer.rs b/crates/ark/src/data_explorer/r_data_explorer.rs index 559261e22..b7570389d 100644 --- a/crates/ark/src/data_explorer/r_data_explorer.rs +++ b/crates/ark/src/data_explorer/r_data_explorer.rs @@ -1213,7 +1213,7 @@ pub unsafe extern "C-unwind" fn ps_view_data_frame( }; let explorer = RDataExplorer::new(title, x, env_info)?; - Console::get_mut().comm_register(DATA_EXPLORER_COMM_NAME, Box::new(explorer))?; + Console::get_mut().comm_open_backend(DATA_EXPLORER_COMM_NAME, Box::new(explorer))?; Ok(R_NilValue) } diff --git a/crates/ark/src/lsp/backend.rs b/crates/ark/src/lsp/backend.rs index f7b745bc3..2c26a70cd 100644 --- a/crates/ark/src/lsp/backend.rs +++ b/crates/ark/src/lsp/backend.rs @@ -12,8 +12,6 @@ use std::sync::Arc; use amalthea::comm::server_comm::ServerStartMessage; use amalthea::comm::server_comm::ServerStartedMessage; -use amalthea::comm::ui_comm::ShowMessageParams as UiShowMessageParams; -use amalthea::comm::ui_comm::UiFrontendEvent; use anyhow::Context; use crossbeam::channel::Sender; use serde_json::Value; @@ -107,13 +105,11 @@ fn report_crash() { "with full logs (see https://positron.posit.co/troubleshooting.html#python-and-r-logs)." ); + // NOTE: This is a legit use of interrupt-time task. No R access here, and + // we need to go through Console since it owns the UI comm. r_task(|| { - let event = UiFrontendEvent::ShowMessage(UiShowMessageParams { - message: String::from(user_message), - }); - - if let Some(ui_comm_tx) = Console::get().get_ui_comm_tx() { - ui_comm_tx.send_event(event); + if let Some(ui) = Console::get().ui_comm() { + ui.show_message(String::from(user_message)); } }); } diff --git a/crates/ark/src/plots/graphics_device.rs b/crates/ark/src/plots/graphics_device.rs index 96b99512a..f2fe7afa6 100644 --- a/crates/ark/src/plots/graphics_device.rs +++ b/crates/ark/src/plots/graphics_device.rs @@ -301,7 +301,7 @@ impl DeviceContext { /// [SessionMode::Console] mode. fn should_use_dynamic_plots(&self) -> bool { let console = Console::get(); - console.is_ui_comm_connected() && console.session_mode() == SessionMode::Console + console.ui_comm().is_some() && console.session_mode() == SessionMode::Console } /// Deactivation hook diff --git a/crates/ark/src/request.rs b/crates/ark/src/request.rs index 0f7390de8..6d3370207 100644 --- a/crates/ark/src/request.rs +++ b/crates/ark/src/request.rs @@ -6,12 +6,13 @@ // use amalthea::comm::comm_channel::CommMsg; +use amalthea::socket::comm::CommOutgoingTx; use amalthea::wire::execute_reply::ExecuteReply; use amalthea::wire::execute_request::ExecuteRequest; use amalthea::wire::originator::Originator; use crossbeam::channel::Sender; -use crate::ui::UiCommMessage; +use crate::comm_handler::CommHandler; /// Represents requests to the primary R execution thread. #[derive(Debug, Clone)] @@ -53,8 +54,15 @@ pub fn debug_request_command(req: DebugRequest) -> String { /// Represents requests to the kernel. #[derive(Debug)] pub enum KernelRequest { - /// Establish a channel to the UI comm which forwards messages to the frontend - EstablishUiCommChannel(Sender), + /// Register a frontend-initiated comm handler on the R thread. + /// The handler is constructed on the Shell thread and sent here for registration. + CommOpen { + comm_id: String, + comm_name: String, + outgoing_tx: CommOutgoingTx, + handler: Box, + done_tx: Sender<()>, + }, /// Deliver an incoming comm message to the R thread CommMsg { diff --git a/crates/ark/src/shell.rs b/crates/ark/src/shell.rs index 3089105e5..f55ff8430 100644 --- a/crates/ark/src/shell.rs +++ b/crates/ark/src/shell.rs @@ -10,7 +10,6 @@ use amalthea::comm::comm_channel::CommMsg; use amalthea::language::shell_handler::CommHandled; use amalthea::language::shell_handler::ShellHandler; use amalthea::socket::comm::CommSocket; -use amalthea::socket::stdin::StdInRequest; use amalthea::wire::complete_reply::CompleteReply; use amalthea::wire::complete_request::CompleteRequest; use amalthea::wire::execute_reply::ExecuteReply; @@ -45,7 +44,6 @@ use tokio::sync::mpsc::UnboundedSender as AsyncUnboundedSender; use crate::ark_comm::ArkComm; use crate::console::Console; -use crate::console::ConsoleNotification; use crate::console::KernelInfo; use crate::data_explorer::r_data_explorer::DATA_EXPLORER_COMM_NAME; use crate::help::r_help::RHelp; @@ -55,16 +53,15 @@ use crate::r_task; use crate::request::KernelRequest; use crate::request::RRequest; use crate::ui::UiComm; +use crate::ui::UI_COMM_NAME; use crate::variables::r_variables::RVariables; pub struct Shell { r_request_tx: Sender, - stdin_request_tx: Sender, kernel_request_tx: Sender, kernel_init_rx: BusReader, kernel_info: Option, graphics_device_tx: AsyncUnboundedSender, - console_notification_tx: AsyncUnboundedSender, } #[derive(Debug)] @@ -76,20 +73,16 @@ impl Shell { /// Creates a new instance of the shell message handler. pub(crate) fn new( r_request_tx: Sender, - stdin_request_tx: Sender, kernel_init_rx: BusReader, kernel_request_tx: Sender, graphics_device_tx: AsyncUnboundedSender, - console_notification_tx: AsyncUnboundedSender, ) -> Self { Self { r_request_tx, - stdin_request_tx, kernel_request_tx, kernel_init_rx, kernel_info: None, graphics_device_tx, - console_notification_tx, } } @@ -249,10 +242,8 @@ impl ShellHandler for Shell { Comm::Variables => handle_comm_open_variables(comm), Comm::Ui => handle_comm_open_ui( comm, - self.stdin_request_tx.clone(), self.kernel_request_tx.clone(), self.graphics_device_tx.clone(), - self.console_notification_tx.clone(), ), Comm::Help => handle_comm_open_help(comm), Comm::Other(target_name) if target_name == "ark" => ArkComm::handle_comm_open(comm), @@ -267,7 +258,7 @@ impl ShellHandler for Shell { msg: CommMsg, ) -> amalthea::Result { match comm_name { - DATA_EXPLORER_COMM_NAME => { + DATA_EXPLORER_COMM_NAME | UI_COMM_NAME => { self.dispatch_kernel_request(|done_tx| KernelRequest::CommMsg { comm_id: comm_id.to_string(), msg, @@ -285,7 +276,7 @@ impl ShellHandler for Shell { comm_name: &str, ) -> amalthea::Result { match comm_name { - DATA_EXPLORER_COMM_NAME => { + DATA_EXPLORER_COMM_NAME | UI_COMM_NAME => { self.dispatch_kernel_request(|done_tx| KernelRequest::CommClose { comm_id: comm_id.to_string(), done_tx, @@ -324,20 +315,24 @@ fn handle_comm_open_variables(comm: CommSocket) -> amalthea::Result { fn handle_comm_open_ui( comm: CommSocket, - stdin_request_tx: Sender, kernel_request_tx: Sender, graphics_device_tx: AsyncUnboundedSender, - _console_notification_tx: AsyncUnboundedSender, ) -> amalthea::Result { - // Create a frontend to wrap the comm channel we were just given. This starts - // a thread that proxies messages to the frontend. - let ui_comm_tx = UiComm::start(comm, stdin_request_tx, graphics_device_tx); + let handler = UiComm::new(graphics_device_tx); - // Send the frontend event channel to the execution thread so it can emit - // events to the frontend. - if let Err(err) = kernel_request_tx.send(KernelRequest::EstablishUiCommChannel(ui_comm_tx)) { - log::error!("Could not deliver UI comm channel to execution thread: {err:?}"); - }; + let (done_tx, done_rx) = bounded(0); + kernel_request_tx + .send(KernelRequest::CommOpen { + comm_id: comm.comm_id.clone(), + comm_name: comm.comm_name.clone(), + outgoing_tx: comm.outgoing_tx.clone(), + handler: Box::new(handler), + done_tx, + }) + .map_err(|err| amalthea::Error::SendError(err.to_string()))?; + done_rx + .recv() + .map_err(|err| amalthea::Error::ReceiveError(err.to_string()))?; Ok(true) } diff --git a/crates/ark/src/start.rs b/crates/ark/src/start.rs index 2c3cbf5b1..8bce3a448 100644 --- a/crates/ark/src/start.rs +++ b/crates/ark/src/start.rs @@ -88,11 +88,9 @@ pub fn start_kernel( let kernel_init_rx = kernel_init_tx.add_rx(); let shell = Box::new(Shell::new( r_request_tx.clone(), - stdin_request_tx.clone(), kernel_init_rx, kernel_request_tx, graphics_device_tx, - console_notification_tx, )); // Create the control handler; this is used to handle shutdown/interrupt and diff --git a/crates/ark/src/ui/events.rs b/crates/ark/src/ui/events.rs index 241ac48f5..52828911e 100644 --- a/crates/ark/src/ui/events.rs +++ b/crates/ark/src/ui/events.rs @@ -14,7 +14,6 @@ use amalthea::comm::ui_comm::OpenWorkspaceParams; use amalthea::comm::ui_comm::Position; use amalthea::comm::ui_comm::Range; use amalthea::comm::ui_comm::SetEditorSelectionsParams; -use amalthea::comm::ui_comm::ShowMessageParams; use amalthea::comm::ui_comm::ShowUrlParams; use amalthea::comm::ui_comm::UiFrontendEvent; use harp::object::RObject; @@ -25,16 +24,9 @@ use crate::console::Console; #[harp::register] pub unsafe extern "C-unwind" fn ps_ui_show_message(message: SEXP) -> anyhow::Result { - let params = ShowMessageParams { - message: RObject::view(message).try_into()?, - }; - - let event = UiFrontendEvent::ShowMessage(params); + let message: String = RObject::view(message).try_into()?; - let ui_comm_tx = Console::get() - .get_ui_comm_tx() - .ok_or_else(|| ui_comm_not_connected("ui_show_message"))?; - ui_comm_tx.send_event(event); + Console::get().try_ui_comm()?.show_message(message); Ok(R_NilValue) } @@ -51,10 +43,7 @@ pub unsafe extern "C-unwind" fn ps_ui_open_workspace( let event = UiFrontendEvent::OpenWorkspace(params); - let ui_comm_tx = Console::get() - .get_ui_comm_tx() - .ok_or_else(|| ui_comm_not_connected("ui_open_workspace"))?; - ui_comm_tx.send_event(event); + Console::get().try_ui_comm()?.send_event(&event); Ok(R_NilValue) } @@ -79,10 +68,7 @@ pub unsafe extern "C-unwind" fn ps_ui_navigate_to_file( let event = UiFrontendEvent::OpenEditor(params); - let ui_comm_tx = Console::get() - .get_ui_comm_tx() - .ok_or_else(|| ui_comm_not_connected("ui_navigate_to_file"))?; - ui_comm_tx.send_event(event); + Console::get().try_ui_comm()?.send_event(&event); Ok(R_NilValue) } @@ -94,10 +80,7 @@ pub unsafe extern "C-unwind" fn ps_ui_set_selection_ranges(ranges: SEXP) -> anyh let event = UiFrontendEvent::SetEditorSelections(params); - let ui_comm_tx = Console::get() - .get_ui_comm_tx() - .ok_or_else(|| ui_comm_not_connected("ui_set_selection_ranges"))?; - ui_comm_tx.send_event(event); + Console::get().try_ui_comm()?.send_event(&event); Ok(R_NilValue) } @@ -109,10 +92,7 @@ pub fn send_show_url_event(url: &str) -> anyhow::Result<()> { }; let event = UiFrontendEvent::ShowUrl(params); - let ui_comm_tx = Console::get() - .get_ui_comm_tx() - .ok_or_else(|| ui_comm_not_connected("show_url"))?; - ui_comm_tx.send_event(event); + Console::get().try_ui_comm()?.send_event(&event); Ok(()) } @@ -130,10 +110,7 @@ pub fn send_open_with_system_event(path: &str) -> anyhow::Result<()> { }; let event = UiFrontendEvent::OpenWithSystem(params); - let ui_comm_tx = Console::get() - .get_ui_comm_tx() - .ok_or_else(|| ui_comm_not_connected("open_with_system"))?; - ui_comm_tx.send_event(event); + Console::get().try_ui_comm()?.send_event(&event); Ok(()) } @@ -160,7 +137,3 @@ pub fn ps_ui_robj_as_ranges(ranges: SEXP) -> anyhow::Result> { .collect(); Ok(selections) } - -fn ui_comm_not_connected(name: &str) -> anyhow::Error { - anyhow::anyhow!("UI comm not connected, can't run `{name}`.") -} diff --git a/crates/ark/src/ui/methods.rs b/crates/ark/src/ui/methods.rs index 5e9778c24..3036077c5 100644 --- a/crates/ark/src/ui/methods.rs +++ b/crates/ark/src/ui/methods.rs @@ -25,7 +25,9 @@ use crate::ui::events::ps_ui_robj_as_ranges; #[harp::register] pub unsafe extern "C-unwind" fn ps_ui_last_active_editor_context() -> anyhow::Result { - let out = Console::get().call_frontend_method(UiFrontendRequest::LastActiveEditorContext)?; + let out = Console::get() + .try_ui_comm()? + .call_frontend_method(UiFrontendRequest::LastActiveEditorContext)?; Ok(out.sexp) } @@ -43,14 +45,17 @@ pub unsafe extern "C-unwind" fn ps_ui_modify_editor_selections( } let params = ModifyEditorSelectionsParams { selections, values }; - let out = - Console::get().call_frontend_method(UiFrontendRequest::ModifyEditorSelections(params))?; + let out = Console::get() + .try_ui_comm()? + .call_frontend_method(UiFrontendRequest::ModifyEditorSelections(params))?; Ok(out.sexp) } #[harp::register] pub unsafe extern "C-unwind" fn ps_ui_workspace_folder() -> anyhow::Result { - let out = Console::get().call_frontend_method(UiFrontendRequest::WorkspaceFolder)?; + let out = Console::get() + .try_ui_comm()? + .call_frontend_method(UiFrontendRequest::WorkspaceFolder)?; Ok(out.sexp) } @@ -64,7 +69,9 @@ pub unsafe extern "C-unwind" fn ps_ui_show_dialog( message: RObject::view(message).try_into()?, }; - let out = Console::get().call_frontend_method(UiFrontendRequest::ShowDialog(params))?; + let out = Console::get() + .try_ui_comm()? + .call_frontend_method(UiFrontendRequest::ShowDialog(params))?; Ok(out.sexp) } @@ -90,7 +97,9 @@ pub unsafe extern "C-unwind" fn ps_ui_show_question( }, }; - let out = Console::get().call_frontend_method(UiFrontendRequest::ShowQuestion(params))?; + let out = Console::get() + .try_ui_comm()? + .call_frontend_method(UiFrontendRequest::ShowQuestion(params))?; Ok(out.sexp) } @@ -120,7 +129,9 @@ pub extern "C-unwind" fn ps_ui_show_prompt( timeout: timeout_secs, }; - let out = Console::get().call_frontend_method(UiFrontendRequest::ShowPrompt(params))?; + let out = Console::get() + .try_ui_comm()? + .call_frontend_method(UiFrontendRequest::ShowPrompt(params))?; Ok(out.sexp) } @@ -130,7 +141,9 @@ pub unsafe extern "C-unwind" fn ps_ui_ask_for_password(prompt: SEXP) -> anyhow:: prompt: RObject::view(prompt).try_into()?, }; - let out = Console::get().call_frontend_method(UiFrontendRequest::AskForPassword(params))?; + let out = Console::get() + .try_ui_comm()? + .call_frontend_method(UiFrontendRequest::AskForPassword(params))?; Ok(out.sexp) } @@ -144,7 +157,9 @@ pub unsafe extern "C-unwind" fn ps_ui_new_document( language_id: RObject::view(language_id).try_into()?, }; - let out = Console::get().call_frontend_method(UiFrontendRequest::NewDocument(params))?; + let out = Console::get() + .try_ui_comm()? + .call_frontend_method(UiFrontendRequest::NewDocument(params))?; Ok(out.sexp) } @@ -154,7 +169,9 @@ pub unsafe extern "C-unwind" fn ps_ui_execute_command(command: SEXP) -> anyhow:: command: RObject::view(command).try_into()?, }; - let out = Console::get().call_frontend_method(UiFrontendRequest::ExecuteCommand(params))?; + let out = Console::get() + .try_ui_comm()? + .call_frontend_method(UiFrontendRequest::ExecuteCommand(params))?; Ok(out.sexp) } @@ -170,7 +187,9 @@ pub unsafe extern "C-unwind" fn ps_ui_execute_code( allow_incomplete: false, }; - let out = Console::get().call_frontend_method(UiFrontendRequest::ExecuteCode(params))?; + let out = Console::get() + .try_ui_comm()? + .call_frontend_method(UiFrontendRequest::ExecuteCode(params))?; Ok(out.sexp) } @@ -182,7 +201,9 @@ pub unsafe extern "C-unwind" fn ps_ui_evaluate_when_clause( when_clause: RObject::view(when_clause).try_into()?, }; - let out = Console::get().call_frontend_method(UiFrontendRequest::EvaluateWhenClause(params))?; + let out = Console::get() + .try_ui_comm()? + .call_frontend_method(UiFrontendRequest::EvaluateWhenClause(params))?; Ok(out.sexp) } @@ -192,6 +213,8 @@ pub unsafe extern "C-unwind" fn ps_ui_debug_sleep(ms: SEXP) -> anyhow::Result` that communicates -/// messages to the `UiComm` -/// -/// Adds convenience methods for sending `Event`s and `Request`s. -/// -/// Manages a bit of state for performing a state refresh -/// (the `working_directory`). -pub struct UiCommSender { - ui_comm_tx: Sender, - working_directory: PathBuf, -} - -impl UiCommSender { - pub fn new(ui_comm_tx: Sender) -> Self { - // Empty path buf will get updated on first directory refresh - let working_directory = PathBuf::new(); - - Self { - ui_comm_tx, - working_directory, - } - } - - pub fn send_event(&self, event: UiFrontendEvent) { - self.send(UiCommMessage::Event(event)) - } - - pub fn send_request(&self, request: UiCommFrontendRequest) { - self.send(UiCommMessage::Request(request)) - } - - fn send(&self, msg: UiCommMessage) { - log::info!("Sending message to UI comm: {msg:?}"); - - if let Err(err) = self.ui_comm_tx.send(msg) { - log::error!("Error sending message to UI comm: {err:?}"); - - // TODO: Something is wrong with the UI thread, we should - // disconnect to avoid more errors but then we need a mutable self - // self.ui_comm_tx = None; - } - } - - pub fn send_refresh(&mut self, input_prompt: String, continuation_prompt: String) { - self.refresh_prompt_info(input_prompt, continuation_prompt); - - if let Err(err) = self.refresh_working_directory() { - log::error!("Can't refresh working directory: {err:?}"); - } - } - - fn refresh_prompt_info(&self, input_prompt: String, continuation_prompt: String) { - self.send_event(UiFrontendEvent::PromptState(PromptStateParams { - input_prompt, - continuation_prompt, - })); - } - - /// Checks for changes to the working directory, and sends an event to the - /// frontend if the working directory has changed. - fn refresh_working_directory(&mut self) -> anyhow::Result<()> { - // Get the current working directory - let mut new_working_directory = std::env::current_dir()?; - - // If it isn't the same as the last working directory, send an event - if new_working_directory != self.working_directory { - self.working_directory = new_working_directory.clone(); - - // Attempt to alias the directory, if it's within the home directory - if let Some(home_dir) = home::home_dir() { - if let Ok(stripped_dir) = new_working_directory.strip_prefix(home_dir) { - let mut new_path = PathBuf::from("~"); - new_path.push(stripped_dir); - new_working_directory = new_path; - } - } - - // Deliver event to client - self.send_event(UiFrontendEvent::WorkingDirectory(WorkingDirectoryParams { - directory: new_working_directory.to_string_lossy().to_string(), - })); - }; - - Ok(()) - } -} diff --git a/crates/ark/src/ui/ui.rs b/crates/ark/src/ui/ui.rs deleted file mode 100644 index 9fb1a26ac..000000000 --- a/crates/ark/src/ui/ui.rs +++ /dev/null @@ -1,536 +0,0 @@ -// -// ui.rs -// -// Copyright (C) 2023-2026 by Posit Software, PBC -// -// - -use amalthea::comm::comm_channel::CommMsg; -use amalthea::comm::ui_comm::CallMethodParams; -use amalthea::comm::ui_comm::DidChangePlotsRenderSettingsParams; -use amalthea::comm::ui_comm::EditorContextChangedParams; -use amalthea::comm::ui_comm::EvalResult; -use amalthea::comm::ui_comm::EvaluateCodeParams; -use amalthea::comm::ui_comm::UiBackendReply; -use amalthea::comm::ui_comm::UiBackendRequest; -use amalthea::comm::ui_comm::UiFrontendEvent; -use amalthea::socket::comm::CommSocket; -use amalthea::socket::stdin::StdInRequest; -use amalthea::wire::input_request::UiCommFrontendRequest; -use crossbeam::channel::Receiver; -use crossbeam::channel::Sender; -use crossbeam::select; -use harp::eval::parse_eval_global; -use harp::exec::RFunction; -use harp::exec::RFunctionExt; -use harp::object::RObject; -use serde_json::Value; -use stdext::spawn; -use stdext::unwrap; -use tokio::sync::mpsc::UnboundedSender as AsyncUnboundedSender; - -use crate::console::Console; -use crate::console::ConsoleOutputCapture; -use crate::plots::graphics_device::GraphicsDeviceNotification; -use crate::r_task; - -#[derive(Debug)] -pub enum UiCommMessage { - Event(UiFrontendEvent), - Request(UiCommFrontendRequest), -} - -/// UiComm is a wrapper around a comm channel whose lifetime matches -/// that of the Positron UI frontend. It is used to perform communication with the -/// frontend that isn't scoped to any particular view. -pub struct UiComm { - comm: CommSocket, - ui_comm_rx: Receiver, - stdin_request_tx: Sender, - graphics_device_tx: AsyncUnboundedSender, -} - -impl UiComm { - pub(crate) fn start( - comm: CommSocket, - stdin_request_tx: Sender, - graphics_device_tx: AsyncUnboundedSender, - ) -> Sender { - // Create a sender-receiver pair for Positron global events - let (ui_comm_tx, ui_comm_rx) = crossbeam::channel::unbounded::(); - - spawn!("ark-comm-ui", move || { - let frontend = Self { - comm, - ui_comm_rx, - stdin_request_tx, - graphics_device_tx, - }; - frontend.execution_thread(); - }); - - ui_comm_tx - } - - fn execution_thread(&self) { - loop { - // Wait for an event on either the event channel (which forwards - // Positron events to the frontend) or the comm channel (which - // receives requests from the frontend) - select! { - recv(&self.ui_comm_rx) -> msg => { - let msg = unwrap!(msg, Err(err) => { - log::error!( - "Error receiving Positron event; closing event listener: {err:?}" - ); - // Most likely the channel was closed, so we should stop the thread - break; - }); - match msg { - UiCommMessage::Event(event) => self.dispatch_event(&event), - UiCommMessage::Request(request) => self.call_frontend_method(request).unwrap(), - } - }, - - recv(&self.comm.incoming_rx) -> msg => { - match msg { - Ok(msg) => { - if !self.handle_comm_message(msg) { - log::info!("UI comm {} closing by request from frontend.", self.comm.comm_id); - break; - } - }, - Err(err) => { - log::error!("Error receiving message from frontend: {:?}", err); - break; - }, - } - }, - } - } - } - - fn dispatch_event(&self, event: &UiFrontendEvent) { - let json = serde_json::to_value(event).unwrap(); - - // Deliver the event to the frontend over the comm channel - if let Err(err) = self.comm.outgoing_tx.send(CommMsg::Data(json)) { - log::error!("Error sending UI event to frontend: {}", err); - }; - } - - /** - * Handles a comm message from the frontend. - * - * Returns true if the thread should continue, false if it should exit. - */ - fn handle_comm_message(&self, message: CommMsg) -> bool { - if let CommMsg::Close = message { - // The frontend has closed the connection; let the - // thread exit. - return false; - } - - if self - .comm - .handle_request(message.clone(), |req| self.handle_backend_method(req)) - { - return true; - } - - // We don't really expect to receive data messages from the - // frontend; they are events - log::warn!("Unexpected data message from frontend: {message:?}"); - true - } - - /** - * Handles an RPC request from the frontend. - */ - fn handle_backend_method( - &self, - request: UiBackendRequest, - ) -> anyhow::Result { - match request { - UiBackendRequest::CallMethod(request) => self.handle_call_method(request), - UiBackendRequest::DidChangePlotsRenderSettings(params) => { - self.handle_did_change_plot_render_settings(params) - }, - UiBackendRequest::EditorContextChanged(params) => { - self.handle_editor_context_changed(params) - }, - UiBackendRequest::EvaluateCode(params) => self.handle_evaluate_code(params), - } - } - - fn handle_call_method( - &self, - request: CallMethodParams, - ) -> anyhow::Result { - log::trace!("Handling '{}' frontend RPC method", request.method); - - // Today, all RPCs are fulfilled by R directly. Check to see if an R - // method of the appropriate name is defined. - // - // Consider: In the future, we may want to allow requests to be - // fulfilled here on the Rust side, with only some requests forwarded to - // R; Rust methods may wish to establish their own RPC handlers. - - // The method name is prefixed with ".ps.rpc.", by convention - let method = format!(".ps.rpc.{}", request.method); - - // Use the `exists` function to see if the method exists - let exists = r_task(|| unsafe { - let exists = RFunction::from("exists") - .param("x", method.clone()) - .call()?; - RObject::to::(exists) - })?; - - if !exists { - anyhow::bail!("No such method: {}", request.method); - } - - // Form an R function call from the request - let result = r_task(|| { - let mut call = RFunction::from(method); - for param in request.params.iter() { - let p = RObject::try_from(param.clone())?; - call.add(p); - } - let result = call.call()?; - Value::try_from(result) - })?; - - Ok(UiBackendReply::CallMethodReply(result)) - } - - fn handle_did_change_plot_render_settings( - &self, - params: DidChangePlotsRenderSettingsParams, - ) -> anyhow::Result { - // The frontend shoudn't send invalid sizes but be defensive. Sometimes - // the plot container is in a strange state when it's hidden. - if params.settings.size.height <= 0 || params.settings.size.width <= 0 { - return Err(anyhow::anyhow!( - "Got invalid plot render size: {size:?}", - size = params.settings.size, - )); - } - - self.graphics_device_tx - .send(GraphicsDeviceNotification::DidChangePlotRenderSettings( - params.settings, - )) - .unwrap(); - - Ok(UiBackendReply::DidChangePlotsRenderSettingsReply()) - } - - fn handle_editor_context_changed( - &self, - params: EditorContextChangedParams, - ) -> anyhow::Result { - log::trace!( - "Editor context changed: document_uri={}, is_execution_source={}", - params.document_uri, - params.is_execution_source - ); - Ok(UiBackendReply::EditorContextChangedReply()) - } - - fn handle_evaluate_code( - &self, - params: EvaluateCodeParams, - ) -> anyhow::Result { - log::trace!("Evaluating code: {}", params.code); - - let result = r_task(|| { - let mut capture = if Console::is_initialized() { - Console::get_mut().start_capture() - } else { - ConsoleOutputCapture::dummy() - }; - - // Evaluate the user's code - let eval_result = parse_eval_global(¶ms.code); - - // Take captured output before dropping the capture guard - let output = capture.take(); - drop(capture); - - // Now handle the eval result - let evaluated = eval_result?; - let value = Value::try_from(evaluated)?; - - Ok((value, output)) - }); - - match result { - Ok((value, output)) => Ok(UiBackendReply::EvaluateCodeReply(EvalResult { - result: value, - output, - })), - Err(err) => { - let message = match err { - harp::Error::TryCatchError { message, .. } => message, - harp::Error::ParseError { message, .. } => message, - harp::Error::ParseSyntaxError { message } => message, - _ => format!("{err}"), - }; - Err(anyhow::anyhow!("{message}")) - }, - } - } - - /** - * Send an RPC request to the frontend. - */ - fn call_frontend_method(&self, request: UiCommFrontendRequest) -> anyhow::Result<()> { - let comm_msg = StdInRequest::Comm(request); - self.stdin_request_tx.send(comm_msg)?; - - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use amalthea::comm::base_comm::JsonRpcError; - use amalthea::comm::comm_channel::CommMsg; - use amalthea::comm::ui_comm::BusyParams; - use amalthea::comm::ui_comm::CallMethodParams; - use amalthea::comm::ui_comm::EvalResult; - use amalthea::comm::ui_comm::EvaluateCodeParams; - use amalthea::comm::ui_comm::UiBackendReply; - use amalthea::comm::ui_comm::UiBackendRequest; - use amalthea::comm::ui_comm::UiFrontendEvent; - use amalthea::socket::comm::CommInitiator; - use amalthea::socket::comm::CommSocket; - use amalthea::socket::iopub::IOPubMessage; - use amalthea::socket::stdin::StdInRequest; - use ark_test::dummy_jupyter_header; - use ark_test::IOPubReceiverExt; - use crossbeam::channel::bounded; - use crossbeam::channel::Sender; - use harp::exec::RFunction; - use harp::exec::RFunctionExt; - use harp::object::RObject; - use serde_json::Value; - - use crate::plots::graphics_device::GraphicsDeviceNotification; - use crate::r_task::r_task; - use crate::ui::UiComm; - use crate::ui::UiCommMessage; - - #[test] - fn test_ui_comm() { - // Create a dummy iopub channel to receive responses. - let (iopub_tx, iopub_rx) = bounded::(10); - - // Create a sender/receiver pair for the comm channel. - let comm_socket = CommSocket::new( - CommInitiator::FrontEnd, - String::from("test-ui-comm-id"), - String::from("positron.UI"), - iopub_tx, - ); - - // Communication channel between the main thread and the Amalthea - // StdIn socket thread - let (stdin_request_tx, _stdin_request_rx) = bounded::(1); - - let (graphics_device_tx, _graphics_device_rx) = - tokio::sync::mpsc::unbounded_channel::(); - - // Create a frontend instance, get access to the sender channel - let ui_comm_tx = UiComm::start(comm_socket.clone(), stdin_request_tx, graphics_device_tx); - - // Get the current console width - let old_width = r_task(|| unsafe { - let width = RFunction::from("getOption") - .param("x", "width") - .call() - .unwrap(); - RObject::to::(width).unwrap() - }); - - // Send a message to the frontend - let id = String::from("test-id-1"); - let request = UiBackendRequest::CallMethod(CallMethodParams { - method: String::from("setConsoleWidth"), - params: vec![Value::from(123)], - }); - comm_socket - .incoming_tx - .send(CommMsg::Rpc { - id, - parent_header: dummy_jupyter_header(), - data: serde_json::to_value(request).unwrap(), - }) - .unwrap(); - - // Wait for the reply; this should be a FrontendRpcResult. - let response = iopub_rx.recv_comm_msg(); - match response { - CommMsg::Rpc { id, data, .. } => { - println!("Got RPC result: {:?}", data); - let result = serde_json::from_value::(data).unwrap(); - assert_eq!(id, "test-id-1"); - // This RPC should return the old width - assert_eq!( - result, - UiBackendReply::CallMethodReply(Value::from(old_width)) - ); - }, - _ => panic!("Unexpected response: {:?}", response), - } - - // Get the new console width - let new_width = r_task(|| unsafe { - let width = RFunction::from("getOption") - .param("x", "width") - .call() - .unwrap(); - RObject::to::(width).unwrap() - }); - - // Assert that the console width changed - assert_eq!(new_width, 123); - - // Now try to invoke an RPC that doesn't exist - let id = String::from("test-id-2"); - let request = UiBackendRequest::CallMethod(CallMethodParams { - method: String::from("thisRpcDoesNotExist"), - params: vec![], - }); - comm_socket - .incoming_tx - .send(CommMsg::Rpc { - id, - parent_header: dummy_jupyter_header(), - data: serde_json::to_value(request).unwrap(), - }) - .unwrap(); - - // Wait for the reply - let response = iopub_rx.recv_comm_msg(); - match response { - CommMsg::Rpc { id, data, .. } => { - println!("Got RPC result: {:?}", data); - let _reply = serde_json::from_value::(data).unwrap(); - // Ensure that the error code is -32601 (method not found) - assert_eq!(id, "test-id-2"); - - // TODO: This should normally throw a `MethodNotFound` but - // that's currently a bit hard because of the nested method - // call. One way to solve this would be for RPC handler - // functions to return a typed JSON-RPC error instead of a - // `anyhow::Result`. Then we could return a `MethodNotFound` from - // `callMethod()`. - // - // assert_eq!(reply.error.code, JsonRpcErrorCode::MethodNotFound); - }, - _ => panic!("Unexpected response: {:?}", response), - } - - // Mark not busy (this prevents the frontend comm from being closed due to - // the Sender being dropped) - ui_comm_tx - .send(UiCommMessage::Event(UiFrontendEvent::Busy(BusyParams { - busy: false, - }))) - .unwrap(); - } - - /// Helper to set up a UiComm and return the pieces needed for testing - fn setup_ui_comm() -> ( - CommSocket, - crossbeam::channel::Receiver, - Sender, - ) { - let (iopub_tx, iopub_rx) = bounded::(10); - let comm_socket = CommSocket::new( - CommInitiator::FrontEnd, - String::from("test-eval-comm-id"), - String::from("positron.UI"), - iopub_tx, - ); - let (stdin_request_tx, _stdin_request_rx) = bounded::(1); - let (graphics_device_tx, _graphics_device_rx) = - tokio::sync::mpsc::unbounded_channel::(); - let ui_comm_tx = UiComm::start(comm_socket.clone(), stdin_request_tx, graphics_device_tx); - (comm_socket, iopub_rx, ui_comm_tx) - } - - /// Send an evaluate_code RPC and return the reply - fn send_evaluate_code( - comm_socket: &CommSocket, - iopub_rx: &crossbeam::channel::Receiver, - id: &str, - code: &str, - ) -> UiBackendReply { - let request = UiBackendRequest::EvaluateCode(EvaluateCodeParams { - code: String::from(code), - }); - comm_socket - .incoming_tx - .send(CommMsg::Rpc { - id: String::from(id), - parent_header: dummy_jupyter_header(), - data: serde_json::to_value(request).unwrap(), - }) - .unwrap(); - - let response = iopub_rx.recv_comm_msg(); - match response { - CommMsg::Rpc { data, .. } => serde_json::from_value::(data).unwrap(), - _ => panic!("Unexpected response: {:?}", response), - } - } - - #[test] - fn test_evaluate_code() { - let (comm_socket, iopub_rx, ui_comm_tx) = setup_ui_comm(); - - // Test 1: Pure result with no output (e.g. 1 + 1) - let reply = send_evaluate_code(&comm_socket, &iopub_rx, "eval-1", "1 + 1"); - assert_eq!( - reply, - UiBackendReply::EvaluateCodeReply(EvalResult { - result: Value::from(2.0), - output: String::from(""), - }) - ); - - // Test 2: Code that returns a value - let reply = send_evaluate_code(&comm_socket, &iopub_rx, "eval-2", "isTRUE(cat('oatmeal'))"); - assert_eq!( - reply, - UiBackendReply::EvaluateCodeReply(EvalResult { - result: Value::from(false), - // Output capture relies on Console::start_capture(), which is - // not available in unit tests (Console is not initialized). - // Output capture is exercised in integration tests instead. - output: String::from(""), - }) - ); - - // Test 3: Code that only prints, with an invisible NULL result - let reply = send_evaluate_code(&comm_socket, &iopub_rx, "eval-3", "cat('hello\\nworld')"); - assert_eq!( - reply, - UiBackendReply::EvaluateCodeReply(EvalResult { - result: Value::Null, - output: String::from(""), - }) - ); - - // Keep the comm alive - ui_comm_tx - .send(UiCommMessage::Event(UiFrontendEvent::Busy(BusyParams { - busy: false, - }))) - .unwrap(); - } -} diff --git a/crates/ark/src/ui/ui_comm.rs b/crates/ark/src/ui/ui_comm.rs new file mode 100644 index 000000000..78689d200 --- /dev/null +++ b/crates/ark/src/ui/ui_comm.rs @@ -0,0 +1,373 @@ +// +// ui.rs +// +// Copyright (C) 2023-2026 by Posit Software, PBC +// +// + +use std::path::PathBuf; + +use amalthea::comm::comm_channel::CommMsg; +use amalthea::comm::ui_comm::CallMethodParams; +use amalthea::comm::ui_comm::DidChangePlotsRenderSettingsParams; +use amalthea::comm::ui_comm::EditorContextChangedParams; +use amalthea::comm::ui_comm::EvalResult; +use amalthea::comm::ui_comm::EvaluateCodeParams; +use amalthea::comm::ui_comm::PromptStateParams; +use amalthea::comm::ui_comm::UiBackendReply; +use amalthea::comm::ui_comm::UiBackendRequest; +use amalthea::comm::ui_comm::UiFrontendEvent; +use amalthea::comm::ui_comm::WorkingDirectoryParams; +use harp::eval::parse_eval_global; +use harp::exec::RFunction; +use harp::exec::RFunctionExt; +use harp::object::RObject; +use serde_json::Value; +use stdext::result::ResultExt; +use tokio::sync::mpsc::UnboundedSender as AsyncUnboundedSender; + +use crate::comm_handler::handle_rpc_request; +use crate::comm_handler::CommHandler; +use crate::comm_handler::CommHandlerContext; +use crate::comm_handler::EnvironmentChanged; +use crate::console::Console; +use crate::console::ConsoleOutputCapture; +use crate::plots::graphics_device::GraphicsDeviceNotification; + +pub const UI_COMM_NAME: &str = "positron.ui"; + +/// Comm handler for the Positron UI comm. +#[derive(Debug)] +pub struct UiComm { + graphics_device_tx: AsyncUnboundedSender, + working_directory: PathBuf, +} + +impl CommHandler for UiComm { + fn handle_open(&mut self, ctx: &CommHandlerContext) { + self.refresh(ctx); + } + + fn handle_msg(&mut self, msg: CommMsg, ctx: &CommHandlerContext) { + handle_rpc_request(&ctx.outgoing_tx, UI_COMM_NAME, msg, |req| { + self.handle_rpc(req) + }); + } + + fn handle_environment(&mut self, event: EnvironmentChanged, ctx: &CommHandlerContext) { + let EnvironmentChanged::Execution = event else { + return; + }; + self.refresh(ctx); + } +} + +impl UiComm { + pub(crate) fn new( + graphics_device_tx: AsyncUnboundedSender, + ) -> Self { + Self { + graphics_device_tx, + working_directory: PathBuf::new(), + } + } + + fn handle_rpc(&mut self, request: UiBackendRequest) -> anyhow::Result { + match request { + UiBackendRequest::CallMethod(params) => self.handle_call_method(params), + UiBackendRequest::DidChangePlotsRenderSettings(params) => { + self.handle_did_change_plot_render_settings(params) + }, + UiBackendRequest::EditorContextChanged(params) => { + self.handle_editor_context_changed(params) + }, + UiBackendRequest::EvaluateCode(params) => self.handle_evaluate_code(params), + } + } + + fn handle_call_method(&self, request: CallMethodParams) -> anyhow::Result { + log::trace!("Handling '{}' frontend RPC method", request.method); + + // Today, all RPCs are fulfilled by R directly. Check to see if an R + // method of the appropriate name is defined. + // + // Consider: In the future, we may want to allow requests to be + // fulfilled here on the Rust side, with only some requests forwarded to + // R; Rust methods may wish to establish their own RPC handlers. + + let method = format!(".ps.rpc.{}", request.method); + + let exists_obj = RFunction::from("exists") + .param("x", method.clone()) + .call()?; + let exists: bool = exists_obj.try_into()?; + + if !exists { + let method = &request.method; + return Err(anyhow::anyhow!("No such method: {method}")); + } + + let mut call = RFunction::from(method); + for param in request.params.iter() { + let p = RObject::try_from(param.clone())?; + call.add(p); + } + let result = call.call()?; + let result = Value::try_from(result)?; + + Ok(UiBackendReply::CallMethodReply(result)) + } + + fn handle_did_change_plot_render_settings( + &self, + params: DidChangePlotsRenderSettingsParams, + ) -> anyhow::Result { + // The frontend shouldn't send invalid sizes but be defensive. Sometimes + // the plot container is in a strange state when it's hidden. + if params.settings.size.height <= 0 || params.settings.size.width <= 0 { + return Err(anyhow::anyhow!( + "Got invalid plot render size: {size:?}", + size = params.settings.size, + )); + } + + self.graphics_device_tx + .send(GraphicsDeviceNotification::DidChangePlotRenderSettings( + params.settings, + )) + .map_err(|err| anyhow::anyhow!("Failed to send plot render settings: {err}"))?; + + Ok(UiBackendReply::DidChangePlotsRenderSettingsReply()) + } + + fn handle_editor_context_changed( + &self, + params: EditorContextChangedParams, + ) -> anyhow::Result { + log::trace!( + "Editor context changed: document_uri={}, is_execution_source={}", + params.document_uri, + params.is_execution_source + ); + Ok(UiBackendReply::EditorContextChangedReply()) + } + + fn handle_evaluate_code(&self, params: EvaluateCodeParams) -> anyhow::Result { + log::trace!("Evaluating code: {}", params.code); + + let mut capture = if Console::is_initialized() { + Console::get_mut().start_capture() + } else { + ConsoleOutputCapture::dummy() + }; + + let value = parse_eval_global(¶ms.code); + + let output = capture.take(); + drop(capture); + + match value { + Ok(evaluated) => { + let result = Value::try_from(evaluated)?; + Ok(UiBackendReply::EvaluateCodeReply(EvalResult { + result, + output, + })) + }, + Err(err) => { + let message = match err { + harp::Error::TryCatchError { message, .. } => message, + harp::Error::ParseError { message, .. } => message, + harp::Error::ParseSyntaxError { message } => message, + _ => format!("{err}"), + }; + Err(anyhow::anyhow!("{message}")) + }, + } + } + + fn refresh(&mut self, ctx: &CommHandlerContext) { + self.refresh_prompt_info(ctx); + self.refresh_working_directory(ctx).log_err(); + } + + fn refresh_prompt_info(&self, ctx: &CommHandlerContext) { + let input_prompt: String = harp::get_option("prompt") + .try_into() + .unwrap_or_else(|_| String::from("> ")); + let continuation_prompt: String = harp::get_option("continue") + .try_into() + .unwrap_or_else(|_| String::from("+ ")); + + ctx.send_event(&UiFrontendEvent::PromptState(PromptStateParams { + input_prompt, + continuation_prompt, + })); + } + + /// Checks for changes to the working directory, and sends an event to the + /// frontend if the working directory has changed. + fn refresh_working_directory(&mut self, ctx: &CommHandlerContext) -> anyhow::Result<()> { + let mut new_working_directory = std::env::current_dir()?; + + if new_working_directory != self.working_directory { + self.working_directory = new_working_directory.clone(); + + // Attempt to alias the directory, if it's within the home directory + if let Some(home_dir) = home::home_dir() { + if let Ok(stripped_dir) = new_working_directory.strip_prefix(home_dir) { + let mut new_path = PathBuf::from("~"); + new_path.push(stripped_dir); + new_working_directory = new_path; + } + } + + ctx.send_event(&UiFrontendEvent::WorkingDirectory(WorkingDirectoryParams { + directory: new_working_directory.to_string_lossy().to_string(), + })); + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use amalthea::comm::base_comm::JsonRpcError; + use amalthea::comm::comm_channel::CommMsg; + use amalthea::comm::event::CommEvent; + use amalthea::comm::ui_comm::CallMethodParams; + use amalthea::comm::ui_comm::EvalResult; + use amalthea::comm::ui_comm::EvaluateCodeParams; + use amalthea::comm::ui_comm::UiBackendReply; + use amalthea::comm::ui_comm::UiBackendRequest; + use amalthea::socket::comm::CommOutgoingTx; + use amalthea::socket::iopub::IOPubMessage; + use ark_test::dummy_jupyter_header; + use ark_test::IOPubReceiverExt; + use crossbeam::channel::bounded; + use serde_json::Value; + + use super::*; + use crate::comm_handler::CommHandlerContext; + use crate::r_task; + + fn setup_ui_comm( + iopub_tx: crossbeam::channel::Sender, + ) -> (UiComm, CommHandlerContext) { + let comm_id = uuid::Uuid::new_v4().to_string(); + + let outgoing_tx = CommOutgoingTx::new(comm_id, iopub_tx); + let (comm_event_tx, _) = bounded::(10); + let ctx = CommHandlerContext::new(outgoing_tx, comm_event_tx); + + let (graphics_device_tx, _) = tokio::sync::mpsc::unbounded_channel(); + let handler = UiComm::new(graphics_device_tx); + + (handler, ctx) + } + + #[test] + fn test_ui_comm() { + let (iopub_tx, iopub_rx) = bounded::(10); + + let old_width = r_task(move || { + let (mut handler, ctx) = setup_ui_comm(iopub_tx); + + // Get the current console width + let old_width: i32 = harp::get_option("width").try_into().unwrap(); + + // Send a setConsoleWidth RPC + let msg = CommMsg::Rpc { + id: String::from("test-id-1"), + parent_header: dummy_jupyter_header(), + data: serde_json::to_value(UiBackendRequest::CallMethod(CallMethodParams { + method: String::from("setConsoleWidth"), + params: vec![Value::from(123)], + })) + .unwrap(), + }; + handler.handle_msg(msg, &ctx); + + // Assert that the console width changed + let new_width: i32 = harp::get_option("width").try_into().unwrap(); + assert_eq!(new_width, 123); + + // Now try to invoke an RPC that doesn't exist + let msg = CommMsg::Rpc { + id: String::from("test-id-2"), + parent_header: dummy_jupyter_header(), + data: serde_json::to_value(UiBackendRequest::CallMethod(CallMethodParams { + method: String::from("thisRpcDoesNotExist"), + params: vec![], + })) + .unwrap(), + }; + handler.handle_msg(msg, &ctx); + + old_width + }); + + // Check first response (setConsoleWidth) + let response = iopub_rx.recv_comm_msg(); + match response { + CommMsg::Rpc { id, data, .. } => { + let result = serde_json::from_value::(data).unwrap(); + assert_eq!(id, "test-id-1"); + assert_eq!( + result, + UiBackendReply::CallMethodReply(Value::from(old_width)) + ); + }, + _ => panic!("Unexpected response: {:?}", response), + } + + // Check second response (non-existent method error) + let response = iopub_rx.recv_comm_msg(); + match response { + CommMsg::Rpc { id, data, .. } => { + let _reply = serde_json::from_value::(data).unwrap(); + assert_eq!(id, "test-id-2"); + }, + _ => panic!("Unexpected response: {:?}", response), + } + } + + #[test] + fn test_evaluate_code() { + let (iopub_tx, iopub_rx) = bounded::(10); + + r_task(move || { + let (mut handler, ctx) = setup_ui_comm(iopub_tx); + + // Pure result with no output (e.g. 1 + 1) + let msg = CommMsg::Rpc { + id: String::from("eval-1"), + parent_header: dummy_jupyter_header(), + data: serde_json::to_value(UiBackendRequest::EvaluateCode(EvaluateCodeParams { + code: String::from("1 + 1"), + })) + .unwrap(), + }; + handler.handle_msg(msg, &ctx); + }); + + let response = iopub_rx.recv_comm_msg(); + match response { + CommMsg::Rpc { data, .. } => { + let result = serde_json::from_value::(data).unwrap(); + assert_eq!( + result, + UiBackendReply::EvaluateCodeReply(EvalResult { + result: Value::from(2.0), + // Output capture relies on Console::start_capture(), which is + // not available in unit tests (Console is not initialized). + // Output capture is exercised in integration tests instead. + output: String::from(""), + }) + ); + }, + _ => panic!("Unexpected response: {:?}", response), + } + } +} diff --git a/crates/ark/src/variables/r_variables.rs b/crates/ark/src/variables/r_variables.rs index e3e0f973b..9af576f43 100644 --- a/crates/ark/src/variables/r_variables.rs +++ b/crates/ark/src/variables/r_variables.rs @@ -361,7 +361,7 @@ impl RVariables { let explorer = RDataExplorer::new(name.clone(), obj, Some(binding)) .map_err(|err| harp::Error::Anyhow(err))?; let viewer_id = Console::get_mut() - .comm_register(DATA_EXPLORER_COMM_NAME, Box::new(explorer)) + .comm_open_backend(DATA_EXPLORER_COMM_NAME, Box::new(explorer)) .map_err(|err| harp::Error::Anyhow(err))?; Ok(Some(viewer_id)) }) diff --git a/crates/ark/src/viewer.rs b/crates/ark/src/viewer.rs index e6de45c55..351ffa698 100644 --- a/crates/ark/src/viewer.rs +++ b/crates/ark/src/viewer.rs @@ -112,11 +112,7 @@ pub unsafe extern "C-unwind" fn ps_html_viewer( // TODO: What's the right thing to do in `Console` mode when // we aren't connected to Positron? Right now we error. - let ui_comm_tx = console - .get_ui_comm_tx() - .ok_or_else(|| anyhow::anyhow!("UI comm not connected."))?; - - ui_comm_tx.send_event(event); + console.try_ui_comm()?.send_event(&event); }, } }, diff --git a/crates/ark/tests/evaluate-code.rs b/crates/ark/tests/evaluate-code.rs index 939a6bc7d..f3e4a8cb7 100644 --- a/crates/ark/tests/evaluate-code.rs +++ b/crates/ark/tests/evaluate-code.rs @@ -21,14 +21,11 @@ fn evaluate_code(frontend: &DummyArkFrontend, comm_id: &str, code: &str) -> UiBa }); frontend.send_shell_comm_msg(String::from(comm_id), data); - - // The shell routes the message to the UI comm thread and goes busy/idle. - // The RPC reply is sent from the UI comm thread and can arrive on IOPub - // either before or after the shell's Idle status. frontend.recv_iopub_busy(); - let reply = frontend.recv_iopub_comm_msg_and_idle(); + let reply = frontend.recv_iopub_comm_msg(); assert_eq!(reply.comm_id, comm_id); + frontend.recv_iopub_idle(); serde_json::from_value::(reply.data).unwrap() } diff --git a/crates/ark_test/src/dummy_frontend.rs b/crates/ark_test/src/dummy_frontend.rs index 09edb1814..743fd0be0 100644 --- a/crates/ark_test/src/dummy_frontend.rs +++ b/crates/ark_test/src/dummy_frontend.rs @@ -636,36 +636,6 @@ impl DummyArkFrontend { } } - /// Receive a CommMsg and Idle from IOPub in either order. - /// - /// Some comm RPC replies race with the shell's Idle status because the - /// reply is sent from a separate thread (e.g. the UI comm thread). This - /// helper accepts both orderings and returns the CommMsg content. - #[track_caller] - pub fn recv_iopub_comm_msg_and_idle(&self) -> amalthea::wire::comm_msg::CommWireMsg { - let first = self.recv_iopub_next(); - let second = self.recv_iopub_next(); - - let (comm_msg, idle) = match (first, second) { - (Message::CommMsg(comm), Message::Status(status)) => (comm, status), - (Message::Status(status), Message::CommMsg(comm)) => (comm, status), - (a, b) => panic!( - "Expected CommMsg and Idle in either order, got {:?} and {:?}", - a, b - ), - }; - - assert_eq!( - idle.content.execution_state, - amalthea::wire::status::ExecutionState::Idle, - "Expected Idle status" - ); - - self.flush_streams_at_boundary(); - - comm_msg.content - } - /// Receive from IOPub and assert CommOpen message. /// Automatically skips any Stream messages. #[track_caller] @@ -1094,54 +1064,28 @@ impl DummyArkFrontend { data: serde_json::json!({}), }); - // The comm_open triggers busy/idle on the shell thread and also - // sends an EstablishUiCommChannel kernel request to the console. - // The UI comm then sends events (prompt_state, working_directory) - // as CommMsg on IOPub. These can arrive in any order relative to - // the busy/idle. We wait for the prompt_state CommMsg as evidence - // that the UI comm has been established, draining everything. - let deadline = Instant::now() + RECV_TIMEOUT; - let mut got_prompt_state = false; - let mut idle_count = 0u32; + // The UI comm runs on the R thread via CommHandler. The comm_open + // blocks Shell while the handler's `handle_open()` runs, so events + // arrive deterministically within the Busy/Idle window. + self.recv_iopub_busy(); - // We need to see the prompt_state AND at least one idle - while !got_prompt_state || idle_count == 0 { - let remaining = deadline.saturating_duration_since(Instant::now()); - let Some(msg) = self.recv_iopub_with_timeout(remaining) else { - panic!( - "Timed out waiting for UI comm (got_prompt_state={got_prompt_state}, \ - idle_count={idle_count})" - ); - }; - match &msg { - Message::CommMsg(data) => { - if let Some(method) = data.content.data.get("method").and_then(|v| v.as_str()) { - if method == "prompt_state" { - got_prompt_state = true; - } - } - }, - Message::Status(ref data) - if data.content.execution_state == - amalthea::wire::status::ExecutionState::Idle => - { - idle_count += 1; - }, - Message::Stream(ref data) => { - self.buffer_stream(&data.content); - }, - _ => {}, - } - } + // `handle_open()` calls `refresh()` which sends prompt_state then + // working_directory. + let prompt_state = self.recv_iopub_comm_msg(); + assert_eq!(prompt_state.comm_id, comm_id); + assert_eq!( + prompt_state.data.get("method").and_then(|v| v.as_str()), + Some("prompt_state") + ); - // The UI comm sends events asynchronously. Some may arrive after - // idle. Drain any stragglers with a short timeout. - while let Some(msg) = self.recv_iopub_with_timeout(Duration::from_millis(200)) { - if let Message::Stream(ref data) = msg { - self.buffer_stream(&data.content); - } - // Discard late comm messages and other events - } + let working_dir = self.recv_iopub_comm_msg(); + assert_eq!(working_dir.comm_id, comm_id); + assert_eq!( + working_dir.data.get("method").and_then(|v| v.as_str()), + Some("working_directory") + ); + + self.recv_iopub_idle(); comm_id }