Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions crates/ark/src/comm_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,15 @@ impl CommHandlerContext {
pub fn is_closed(&self) -> bool {
self.closed.get()
}

/// Send a serializable event as `CommMsg::Data` on the outgoing channel.
/// Serialization or send errors are logged and ignored.
pub fn send_event<T: Serialize>(&self, event: &T) {
let Some(json) = serde_json::to_value(event).log_err() else {
return;
};
self.outgoing_tx.send(CommMsg::Data(json)).log_err();
}
}

/// Trait for comm handlers that run synchronously on the R thread.
Expand Down
9 changes: 3 additions & 6 deletions crates/ark/src/console.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ pub(crate) use console_repl::ConsoleNotification;
pub(crate) use console_repl::ConsoleOutputCapture;
pub(crate) use console_repl::KernelInfo;
use console_repl::PendingInputs;
use console_repl::PromptInfo;
use console_repl::ReadConsolePendingAction;
pub use console_repl::SessionMode;

Expand Down Expand Up @@ -166,8 +165,6 @@ use crate::srcref::ns_populate_srcref;
use crate::srcref::resource_loaded_namespaces;
use crate::startup;
use crate::sys::console::console_to_utf8;
use crate::ui::UiCommMessage;
use crate::ui::UiCommSender;
use crate::url::UrlId;

thread_local! {
Expand Down Expand Up @@ -224,9 +221,9 @@ pub(crate) struct Console {
tasks_idle_any_rx: Receiver<RTask>,
pending_futures: HashMap<Uuid, (BoxFuture<'static, ()>, RTaskStartInfo, Option<String>)>,

/// Channel to communicate requests and events to the frontend
/// by forwarding them through the UI comm. Optional, and really Positron specific.
ui_comm_tx: Option<UiCommSender>,
/// Comm ID of the currently connected UI comm, if any.
/// The handler lives in `self.comms`; this is just an index into it.
ui_comm_id: Option<String>,

/// Error captured by our global condition handler during the last iteration
/// of the REPL.
Expand Down
43 changes: 41 additions & 2 deletions crates/ark/src/console/console_comm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
//
// Copyright (C) 2026 Posit Software, PBC. All rights reserved.
//
//

use amalthea::comm::comm_channel::CommMsg;
use amalthea::comm::event::CommEvent;
use amalthea::socket::comm::CommInitiator;
use amalthea::socket::comm::CommOutgoingTx;
use amalthea::socket::comm::CommSocket;
use stdext::result::ResultExt;
use uuid::Uuid;
Expand All @@ -17,6 +17,7 @@ use crate::comm_handler::CommHandlerContext;
use crate::comm_handler::ConsoleComm;
use crate::comm_handler::EnvironmentChanged;
use crate::console::Console;
use crate::ui::UI_COMM_NAME;

impl Console {
pub(super) fn comm_handle_msg(&mut self, comm_id: &str, msg: CommMsg) {
Expand All @@ -29,6 +30,10 @@ impl Console {
}

pub(super) fn comm_handle_close(&mut self, comm_id: &str) {
if self.ui_comm_id.as_deref() == Some(comm_id) {
self.ui_comm_id = None;
}

let Some(mut reg) = self.comms.remove(comm_id) else {
log::warn!("Received close for unknown registered comm {comm_id}");
return;
Expand All @@ -40,7 +45,7 @@ impl Console {
///
/// Creates the `CommSocket` and `CommHandlerContext`, calls `handle_open`,
/// sends `CommEvent::Opened` to amalthea, and returns the comm ID.
pub(crate) fn comm_register(
pub(crate) fn comm_open_backend(
&mut self,
comm_name: &str,
mut handler: Box<dyn CommHandler>,
Expand All @@ -67,6 +72,35 @@ impl Console {
Ok(comm_id)
}

/// Register a frontend-initiated comm on the R thread.
///
/// Unlike `comm_open_backend` (which is for backend-initiated comms and
/// sends `CommEvent::Opened`), this is called when the frontend opened the
/// comm. The `CommSocket` already exists in amalthea's open_comms list, so
/// we only need to register the handler and call `handle_open`.
pub(super) fn comm_open_frontend(
&mut self,
comm_id: String,
comm_name: &str,
outgoing_tx: CommOutgoingTx,
mut handler: Box<dyn CommHandler>,
) {
let ctx = CommHandlerContext::new(outgoing_tx, self.comm_event_tx.clone());
handler.handle_open(&ctx);

if comm_name == UI_COMM_NAME {
if let Some(old_id) = self.ui_comm_id.take() {
log::info!("Replacing an existing UI comm.");
if let Some(mut old) = self.comms.remove(&old_id) {
old.handler.handle_close(&old.ctx);
}
}
self.ui_comm_id = Some(comm_id.clone());
}

self.comms.insert(comm_id, ConsoleComm { handler, ctx });
}

pub(super) fn comm_notify_environment_changed(&mut self, event: EnvironmentChanged) {
for (_, reg) in self.comms.iter_mut() {
reg.handler.handle_environment(event, &reg.ctx);
Expand All @@ -84,6 +118,11 @@ impl Console {
.collect();

for comm_id in closed_ids {
// We're not expecting the UI comm to close itself but we handle the
// case explicitly to be defensive
if self.ui_comm_id.as_deref() == Some(comm_id.as_str()) {
self.ui_comm_id = None;
}
if let Some(reg) = self.comms.remove(&comm_id) {
self.comm_notify_closed(&comm_id, &reg);
}
Expand Down
207 changes: 90 additions & 117 deletions crates/ark/src/console/console_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,129 +11,22 @@ use super::*;

/// UI comm integration.
impl Console {
pub(super) fn handle_establish_ui_comm_channel(
&mut self,
ui_comm_tx: Sender<UiCommMessage>,
info: &PromptInfo,
) {
if self.ui_comm_tx.is_some() {
log::info!("Replacing an existing UI comm channel.");
}

// Create and store the sender channel
self.ui_comm_tx = Some(UiCommSender::new(ui_comm_tx));

// Go ahead and do an initial refresh
self.with_mut_ui_comm_tx(|ui_comm_tx| {
let input_prompt = info.input_prompt.clone();
let continuation_prompt = info.continuation_prompt.clone();

ui_comm_tx.send_refresh(input_prompt, continuation_prompt);
});
}

pub(crate) fn session_mode(&self) -> SessionMode {
self.session_mode
}

pub(crate) fn get_ui_comm_tx(&self) -> Option<&UiCommSender> {
self.ui_comm_tx.as_ref()
}

fn get_mut_ui_comm_tx(&mut self) -> Option<&mut UiCommSender> {
self.ui_comm_tx.as_mut()
}

pub(super) fn with_ui_comm_tx<F>(&self, f: F)
where
F: FnOnce(&UiCommSender),
{
match self.get_ui_comm_tx() {
Some(ui_comm_tx) => f(ui_comm_tx),
None => {
// Trace level logging, its typically not a bug if the frontend
// isn't connected. Happens in all Jupyter use cases.
log::trace!("UI comm isn't connected, dropping `f`.");
},
}
pub(crate) fn ui_comm(&self) -> Option<UiCommRef<'_>> {
let comm = self.comms.get(self.ui_comm_id.as_deref()?)?;
Some(UiCommRef {
comm,
originator: self.active_request.as_ref().map(|r| &r.originator),
stdin_request_tx: &self.stdin_request_tx,
})
}

pub(super) fn with_mut_ui_comm_tx<F>(&mut self, mut f: F)
where
F: FnMut(&mut UiCommSender),
{
match self.get_mut_ui_comm_tx() {
Some(ui_comm_tx) => f(ui_comm_tx),
None => {
// Trace level logging, its typically not a bug if the frontend
// isn't connected. Happens in all Jupyter use cases.
log::trace!("UI comm isn't connected, dropping `f`.");
},
}
}

pub(crate) fn is_ui_comm_connected(&self) -> bool {
self.get_ui_comm_tx().is_some()
}

pub(crate) fn call_frontend_method(
&self,
request: UiFrontendRequest,
) -> anyhow::Result<RObject> {
log::trace!("Calling frontend method {request:?}");

let ui_comm_tx = self.get_ui_comm_tx().ok_or_else(|| {
anyhow::anyhow!("UI comm is not connected. Can't execute request {request:?}")
})?;

let (reply_tx, reply_rx) = bounded(1);

let Some(req) = &self.active_request else {
return Err(anyhow::anyhow!(
"No active request. Can't execute request {request:?}"
));
};

// Forward request to UI comm
ui_comm_tx.send_request(UiCommFrontendRequest {
originator: req.originator.clone(),
reply_tx,
request: request.clone(),
});

// Block for reply
let reply = reply_rx.recv().unwrap();

log::trace!("Got reply from frontend method: {reply:?}");

match reply {
StdInRpcReply::Reply(reply) => match reply {
JsonRpcReply::Result(reply) => {
// Deserialize to Rust first to verify the OpenRPC contract.
// Errors are propagated to R.
if let Err(err) = ui_frontend_reply_from_value(reply.result.clone(), &request) {
return Err(anyhow::anyhow!(
"Can't deserialize RPC reply for {request:?}:\n{err:?}"
));
}

// Now deserialize to an R object
Ok(RObject::try_from(reply.result)?)
},
JsonRpcReply::Error(reply) => {
let message = reply.error.message;

return Err(anyhow::anyhow!(
"While calling frontend method:\n\
{message}",
));
},
},
// If an interrupt was signalled, return `NULL`. This should not be
// visible to the caller since `r_unwrap()` (called e.g. by
// `harp::register`) will trigger an interrupt jump right away.
StdInRpcReply::Interrupt => Ok(RObject::null()),
}
pub(crate) fn try_ui_comm(&self) -> anyhow::Result<UiCommRef<'_>> {
self.ui_comm()
.ok_or_else(|| anyhow!("UI comm is not connected"))
}
}

Expand Down Expand Up @@ -270,3 +163,83 @@ impl Console {
self.lsp_virtual_documents.get(uri).cloned()
}
}

/// Reference to the UI comm. Returned by `Console::ui_comm()`.
///
/// Existence of this value guarantees the comm is connected.
pub(crate) struct UiCommRef<'a> {
comm: &'a ConsoleComm,
originator: Option<&'a Originator>,
stdin_request_tx: &'a Sender<StdInRequest>,
}

impl UiCommRef<'_> {
pub(crate) fn send_event(&self, event: &UiFrontendEvent) {
self.comm.ctx.send_event(event);
}

pub(crate) fn busy(&self, busy: bool) {
self.send_event(&UiFrontendEvent::Busy(BusyParams { busy }));
}

pub(crate) fn show_message(&self, message: String) {
self.send_event(&UiFrontendEvent::ShowMessage(ShowMessageParams { message }));
}

pub(crate) fn call_frontend_method(
&self,
request: UiFrontendRequest,
) -> anyhow::Result<RObject> {
log::trace!("Calling frontend method {request:?}");

let (reply_tx, reply_rx) = bounded(1);

let Some(originator) = self.originator else {
return Err(anyhow!(
"No active request. Can't execute request {request:?}"
));
};

// Forward request directly to the stdin channel
let comm_msg = StdInRequest::Comm(UiCommFrontendRequest {
originator: originator.clone(),
reply_tx,
request: request.clone(),
});
self.stdin_request_tx.send(comm_msg)?;

// Block for reply
let reply = reply_rx.recv()?;

log::trace!("Got reply from frontend method: {reply:?}");

match reply {
StdInRpcReply::Reply(reply) => match reply {
JsonRpcReply::Result(reply) => {
// Deserialize to Rust first to verify the OpenRPC contract.
// Errors are propagated to R.
if let Err(err) = ui_frontend_reply_from_value(reply.result.clone(), &request) {
return Err(anyhow!(
"Can't deserialize RPC reply for {request:?}:\n{err:?}"
));
}

// Now deserialize to an R object
Ok(RObject::try_from(reply.result)?)
},
JsonRpcReply::Error(reply) => {
let message = reply.error.message;

return Err(anyhow!(
"While calling frontend method:\n\
{message}",
));
},
},
// If an interrupt was signalled, return `NULL`. This should not be
// visible to the caller since `r_unwrap()` (called e.g. by
// `harp::register`) will trigger an interrupt jump right away.
StdInRpcReply::Interrupt => Ok(RObject::null()),
}
}
}
Loading