Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 79 additions & 89 deletions crates/ark/src/connections/r_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,73 +104,69 @@ impl RConnection {
match message {
ConnectionsBackendRequest::ListObjects(ListObjectsParams { path }) => {
let tables = r_task(|| -> Result<_, anyhow::Error> {
unsafe {
let mut call = RFunction::from(".ps.connection_list_objects");
call.add(RObject::from(self.comm.comm_id.clone()));
for obj in path {
call.param(obj.kind.as_str(), obj.name);
}
// returns a data.frame with columns name and type
let tables = call.call()?;

let names = RFunction::from("[[")
.add(tables.clone())
.add(RObject::from("name"))
.call()?;

let types = RFunction::from("[[")
.add(tables)
.add(RObject::from("type"))
.call()?;

let resulting = RObject::to::<Vec<String>>(names)?
.iter()
.zip(RObject::to::<Vec<String>>(types)?.iter())
.map(|(name, kind)| ObjectSchema {
name: name.clone(),
kind: kind.clone(),
has_children: None,
})
.collect::<Vec<_>>();

Ok(resulting)
let mut call = RFunction::from(".ps.connection_list_objects");
call.add(RObject::from(self.comm.comm_id.clone()));
for obj in path {
call.param(obj.kind.as_str(), obj.name);
}
// returns a data.frame with columns name and type
let tables = call.call()?;

let names = RFunction::from("[[")
.add(tables.clone())
.add(RObject::from("name"))
.call()?;

let types = RFunction::from("[[")
.add(tables)
.add(RObject::from("type"))
.call()?;

let resulting = RObject::to::<Vec<String>>(names)?
.iter()
.zip(RObject::to::<Vec<String>>(types)?.iter())
.map(|(name, kind)| ObjectSchema {
name: name.clone(),
kind: kind.clone(),
has_children: None,
})
.collect::<Vec<_>>();

Ok(resulting)
})?;

Ok(ConnectionsBackendReply::ListObjectsReply(tables))
},
ConnectionsBackendRequest::ListFields(ListFieldsParams { path }) => {
let fields = r_task(|| -> Result<_, anyhow::Error> {
unsafe {
let mut call = RFunction::from(".ps.connection_list_fields");
call.add(RObject::from(self.comm.comm_id.clone()));
for obj in path {
call.param(obj.kind.as_str(), obj.name);
}
let fields = call.call()?;

// for now we only need the name column
let names = RFunction::from("[[")
.add(fields.clone())
.add(RObject::from("name"))
.call()?;

let dtypes = RFunction::from("[[")
.add(fields)
.add(RObject::from("type"))
.call()?;

let resulting = RObject::to::<Vec<String>>(names)?
.iter()
.zip(RObject::to::<Vec<String>>(dtypes)?.iter())
.map(|(name, dtype)| FieldSchema {
name: name.clone(),
dtype: dtype.clone(),
})
.collect::<Vec<_>>();

Ok(resulting)
let mut call = RFunction::from(".ps.connection_list_fields");
call.add(RObject::from(self.comm.comm_id.clone()));
for obj in path {
call.param(obj.kind.as_str(), obj.name);
}
let fields = call.call()?;

// for now we only need the name column
let names = RFunction::from("[[")
.add(fields.clone())
.add(RObject::from("name"))
.call()?;

let dtypes = RFunction::from("[[")
.add(fields)
.add(RObject::from("type"))
.call()?;

let resulting = RObject::to::<Vec<String>>(names)?
.iter()
.zip(RObject::to::<Vec<String>>(dtypes)?.iter())
.map(|(name, dtype)| FieldSchema {
name: name.clone(),
dtype: dtype.clone(),
})
.collect::<Vec<_>>();

Ok(resulting)
})?;

Ok(ConnectionsBackendReply::ListFieldsReply(fields))
Expand All @@ -191,38 +187,34 @@ impl RConnection {
ConnectionsBackendRequest::GetIcon(GetIconParams { path }) => {
// Calls back into R to get the icon.
let icon_path = r_task(|| -> Result<_, anyhow::Error> {
unsafe {
let mut call = RFunction::from(".ps.connection_icon");
call.add(RObject::from(self.comm.comm_id.clone()));
for obj in path {
call.param(obj.kind.as_str(), obj.name);
}

let icon = call.call()?;

if r_is_null(*icon) {
// we'd rather use the option type but couldn't find a way to autogenerate RPC optionals
Ok("".to_string())
} else {
Ok(RObject::to::<String>(icon)?)
}
let mut call = RFunction::from(".ps.connection_icon");
call.add(RObject::from(self.comm.comm_id.clone()));
for obj in path {
call.param(obj.kind.as_str(), obj.name);
}

let icon = call.call()?;

if r_is_null(*icon) {
// we'd rather use the option type but couldn't find a way to autogenerate RPC optionals
Ok("".to_string())
} else {
Ok(RObject::to::<String>(icon)?)
}
})?;
Ok(ConnectionsBackendReply::GetIconReply(icon_path))
},
ConnectionsBackendRequest::ContainsData(ContainsDataParams { path }) => {
// Calls back into R to check if the object contains data.
let contains_data = r_task(|| -> Result<_, anyhow::Error> {
unsafe {
let mut contains_data_call: RFunction =
RFunction::from(".ps.connection_contains_data");
contains_data_call.add(RObject::from(self.comm.comm_id.clone()));
for obj in path {
contains_data_call.param(obj.kind.as_str(), obj.name);
}
let contains_data = contains_data_call.call()?;
Ok(RObject::to::<bool>(contains_data)?)
let mut contains_data_call: RFunction =
RFunction::from(".ps.connection_contains_data");
contains_data_call.add(RObject::from(self.comm.comm_id.clone()));
for obj in path {
contains_data_call.param(obj.kind.as_str(), obj.name);
}
let contains_data = contains_data_call.call()?;
Ok(RObject::to::<bool>(contains_data)?)
})?;
Ok(ConnectionsBackendReply::ContainsDataReply(contains_data))
},
Expand Down Expand Up @@ -255,12 +247,10 @@ impl RConnection {
fn disconnect(&self) -> std::result::Result<bool, anyhow::Error> {
// Execute database side disconnect method.
r_task(|| -> Result<bool, anyhow::Error> {
unsafe {
let mut call = RFunction::from(".ps.connection_close");
call.add(RObject::from(self.comm.comm_id.clone()));
let closed = call.call()?;
Ok(RObject::to::<bool>(closed)?)
}
let mut call = RFunction::from(".ps.connection_close");
call.add(RObject::from(self.comm.comm_id.clone()));
let closed = call.call()?;
Ok(RObject::to::<bool>(closed)?)
})
}

Expand Down
Loading
Loading