Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion roslibrust_ros2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ cdr = "0.2"
roslibrust_common = { path = "../roslibrust_common", version = "0.20" }
anyhow = "1.0"
# Experimental "raw ros" library, we're going to try to build on top of
ros-z = { git = "https://github.com/ZettaScaleLabs/ros-z.git" }
ros-z = { path = "/home/circle/Workings/ZettaScale/zenoh-ws/project/nix-ros/src/worktree-ros-z/serdes-param/crates/ros-z", features = ["jazzy"] }
log = "0.4"
# Used for cancellation token
tokio-util = "0.7"
Expand Down
185 changes: 43 additions & 142 deletions roslibrust_ros2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,123 +5,53 @@ use std::result::Result as StdResult;
use ros_z::{
context::ZContext,
entity::{TypeHash, TypeInfo},
msg::ZService,
msg::{CdrCompatSerdes, ZService},
pubsub::{ZPub, ZSub},
ros_msg::{ServiceTypeInfo, WithTypeInfo},
ros_msg::ServiceTypeInfo,
Builder,
};

/// re-export ros_z for consumers
pub use ros_z;

/// Wrapper type that implements WithTypeInfo for RosMessageType
/// This allows RosMessageType implementations to work with ros-z's type system
pub struct RosMessageWrapper<T: RosMessageType>(pub T);

impl<T: RosMessageType> ros_z::ros_msg::MessageTypeInfo for RosMessageWrapper<T> {
fn type_name() -> &'static str {
T::ROS2_TYPE_NAME
}

fn type_hash() -> TypeHash {
TypeHash::new(1, *T::ROS2_HASH)
}
}

impl<T: RosMessageType> WithTypeInfo for RosMessageWrapper<T> {}

// Custom serializer that treats RosMessageWrapper<T> as T for serialization purposes
pub struct WrapperSerdes<T: RosMessageType>(std::marker::PhantomData<T>);

impl<T: RosMessageType> ros_z::msg::ZSerializer for WrapperSerdes<T> {
type Input<'a> = &'a RosMessageWrapper<T>;
fn serialize(input: Self::Input<'_>) -> Vec<u8> {
ros_z::msg::CdrSerdes::<T>::serialize(&input.0)
}

fn serialize_to_zbuf(input: Self::Input<'_>) -> zenoh_buffers::ZBuf {
ros_z::msg::CdrSerdes::<T>::serialize_to_zbuf(&input.0)
}

fn serialize_to_buf(input: Self::Input<'_>, buffer: &mut Vec<u8>) {
ros_z::msg::CdrSerdes::<T>::serialize_to_buf(&input.0, buffer)
}

fn serialize_to_zbuf_with_hint(
input: Self::Input<'_>,
capacity_hint: usize,
) -> zenoh_buffers::ZBuf {
ros_z::msg::CdrSerdes::<T>::serialize_to_zbuf_with_hint(&input.0, capacity_hint)
}

fn serialize_to_shm(
input: Self::Input<'_>,
estimated_size: usize,
provider: &zenoh::shm::ShmProvider<zenoh::shm::PosixShmProviderBackend>,
) -> zenoh::Result<(zenoh_buffers::ZBuf, usize)> {
ros_z::msg::CdrSerdes::<T>::serialize_to_shm(&input.0, estimated_size, provider)
}
}

impl<T: RosMessageType> ros_z::msg::ZDeserializer for WrapperSerdes<T> {
type Input<'a> = &'a [u8];
type Output = RosMessageWrapper<T>;
type Error = ros_z::msg::CdrError;
fn deserialize(data: Self::Input<'_>) -> std::result::Result<Self::Output, Self::Error> {
let inner = ros_z::msg::CdrSerdes::<T>::deserialize(data)?;
Ok(RosMessageWrapper(inner))
}
}

impl<T: RosMessageType> ros_z::msg::ZMessage for RosMessageWrapper<T> {
type Serdes = WrapperSerdes<T>;
}

/// A "newtype" wrapper around ZNode so we can implement roslibrust's traits for it.
pub struct ZenohClient {
// TODO: We'll probably end up wrapping node in an Arc<> so we can make this clone for users
node: ros_z::node::ZNode,
}

// A "newtype" wrapper around ZPub so we can implement roslibrust's traits for it.
/// The publisher type returned by [TopicProvider::advertise] on [ZenohClient].
/// This type is self de-registering, and dropping the publisher will automatically un-advertise the topic.
/// This type is generic on the message type that will be published.
pub struct ZenohPublisher<T: RosMessageType> {
publisher: ZPub<RosMessageWrapper<T>, WrapperSerdes<T>>,
_marker: std::marker::PhantomData<T>,
publisher: ZPub<T, CdrCompatSerdes>,
}

impl<T: RosMessageType> Publish<T> for ZenohPublisher<T> {
impl<T: RosMessageType + serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static>
Publish<T> for ZenohPublisher<T>
{
async fn publish(&self, data: &T) -> Result<()> {
self.publisher
.async_publish(&RosMessageWrapper(data.clone()))
.async_publish(data)
.await
// TODO: should work on error type here with ros_z team
.map_err(|e| Error::Unexpected(anyhow::anyhow!(e)))
}
}

/// The subscriber type returned by [TopicProvider::subscribe] on [ZenohClient].
/// This type is self de-registering, and dropping the subscriber will automatically unsubscribe from the topic.
/// This type is generic on the message type that will be received.
/// It is typically used with types generated by roslibrust's codegen.
pub struct ZenohSubscriber<T: RosMessageType> {
subscriber: ZSub<RosMessageWrapper<T>, zenoh::sample::Sample, WrapperSerdes<T>>,
_marker: std::marker::PhantomData<T>,
subscriber: ZSub<T, zenoh::sample::Sample, CdrCompatSerdes>,
}

impl<T: RosMessageType> Subscribe<T> for ZenohSubscriber<T> {
impl<T: RosMessageType + serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static>
Subscribe<T> for ZenohSubscriber<T>
{
async fn next(&mut self) -> Result<T> {
let next = self.subscriber.async_recv().await;
let msg = next.map_err(|e| Error::Unexpected(anyhow::anyhow!(e)))?;
Ok(msg.0)
self.subscriber
.async_recv()
.await
.map_err(|e| Error::Unexpected(anyhow::anyhow!(e)))
}
}

impl ZenohClient {
// TODO don't love error type here... could work with ros_z to get better errors
/// Creates a new node with the specified name and return a handle to it.
pub async fn new(
ctx: &ZContext,
name: impl AsRef<str>,
Expand All @@ -131,6 +61,11 @@ impl ZenohClient {
}
}

/// Build a TypeInfo from a RosMessageType's type name and hash.
fn ros_type_info<T: RosMessageType>() -> TypeInfo {
TypeInfo::new(T::ROS2_TYPE_NAME, TypeHash::new(1, *T::ROS2_HASH))
}

impl roslibrust_common::TopicProvider for ZenohClient {
type Publisher<T: RosMessageType> = ZenohPublisher<T>;
type Subscriber<T: RosMessageType> = ZenohSubscriber<T>;
Expand All @@ -142,16 +77,12 @@ impl roslibrust_common::TopicProvider for ZenohClient {
let topic: roslibrust_common::GlobalTopicName = topic.to_global_name()?;
let publisher = self
.node
.create_pub::<RosMessageWrapper<MsgType>>(topic.as_ref())
.with_serdes::<WrapperSerdes<MsgType>>()
.create_pub_impl::<MsgType>(topic.as_ref(), Some(ros_type_info::<MsgType>()))
.with_serdes::<CdrCompatSerdes>()
.build()
// TODO better errors
.map_err(|e| Error::Unexpected(anyhow::anyhow!(e)))?;

Ok(ZenohPublisher {
publisher,
_marker: std::marker::PhantomData,
})
Ok(ZenohPublisher { publisher })
}

async fn subscribe<MsgType: RosMessageType>(
Expand All @@ -161,23 +92,16 @@ impl roslibrust_common::TopicProvider for ZenohClient {
let topic: roslibrust_common::GlobalTopicName = topic.to_global_name()?;
let sub = self
.node
.create_sub::<RosMessageWrapper<MsgType>>(topic.as_ref())
.with_serdes::<WrapperSerdes<MsgType>>()
.create_sub_impl::<MsgType>(topic.as_ref(), Some(ros_type_info::<MsgType>()))
.with_serdes::<CdrCompatSerdes>()
.build()
// TODO better errors
.map_err(|e| Error::Unexpected(anyhow::anyhow!(e)))?;

Ok(ZenohSubscriber {
subscriber: sub,
_marker: std::marker::PhantomData,
})
Ok(ZenohSubscriber { subscriber: sub })
}
}

// TODO MAJOR: problem here ZService trait can't be implemented for our example messages due to orphan rule...
// Have to do some gross work around to get roslibrust::RosServiceType and ros_z::ZService to play nicely together
pub struct ZenohServiceServer {
// Used to shutdown server task when dropped
cancellation_token: tokio_util::sync::CancellationToken,
}

Expand All @@ -192,8 +116,10 @@ pub struct ZenohServiceClient<T: RosServiceType> {
_marker: std::marker::PhantomData<T>,
}

// Helper struct to work around orphan rule
// Orphan-rule shim: ZService (ros_z) cannot be impl'd for T: RosServiceType (roslibrust_common)
// in a downstream crate. This unit struct bridges the two.
struct Fake<T>(std::marker::PhantomData<T>);

impl<T: RosServiceType> ZService for Fake<T> {
type Request = T::Request;
type Response = T::Response;
Expand All @@ -207,19 +133,14 @@ impl<T: RosServiceType> ServiceTypeInfo for Fake<T> {

impl<T: RosServiceType> roslibrust_common::Service<T> for ZenohServiceClient<T> {
async fn call(&self, request: &T::Request) -> Result<T::Response> {
// Send the request
self.client
.send_request(request)
.await
.map_err(|e| Error::Unexpected(anyhow::anyhow!(e)))?;

// Wait for and take the response
let response = self
.client
self.client
.take_response()
.map_err(|e| Error::Unexpected(anyhow::anyhow!(e)))?;

Ok(response)
.map_err(|e| Error::Unexpected(anyhow::anyhow!(e)))
}
}

Expand All @@ -233,7 +154,6 @@ impl roslibrust_common::ServiceProvider for ZenohClient {
request: SrvType::Request,
) -> Result<SrvType::Response> {
let service: roslibrust_common::GlobalTopicName = service.to_global_name()?;
// Create a service client and call it once
let client = ZenohClient::service_client::<SrvType>(self, service.as_ref()).await?;
client.call(&request).await
}
Expand Down Expand Up @@ -261,30 +181,30 @@ impl roslibrust_common::ServiceProvider for ZenohClient {
server: F,
) -> Result<Self::ServiceServer> {
let service: roslibrust_common::GlobalTopicName = service.to_global_name()?;
// TODO: doing some really dome stuff here... to work around orphan rule and RosServiceType != ZService
struct Fake<T>(T);
impl<T: RosServiceType> ZService for Fake<T> {

// Local Fake<T> for advertise_service (same orphan workaround as service_client)
struct LocalFake<T>(T);
impl<T: RosServiceType> ZService for LocalFake<T> {
type Request = T::Request;
type Response = T::Response;
}
impl<T: RosServiceType> ServiceTypeInfo for Fake<T> {
impl<T: RosServiceType> ServiceTypeInfo for LocalFake<T> {
fn service_type_info() -> TypeInfo {
TypeInfo::new(T::ROS2_TYPE_NAME, TypeHash::new(1, *T::ROS2_HASH))
}
}

let mut svc = self
.node
.create_service::<Fake<SrvType>>(service.as_ref())
.create_service::<LocalFake<SrvType>>(service.as_ref())
.build()
.map_err(|e| Error::Unexpected(anyhow::anyhow!(e)))?;

let cancellation_token = tokio_util::sync::CancellationToken::new();

// Build a clone wrapper for the users's function
let server = std::sync::Arc::new(server);
let service_name = String::from(service);
let ct_copy = cancellation_token.clone();

tokio::spawn(async move {
let body_future = async {
loop {
Expand All @@ -301,7 +221,6 @@ impl roslibrust_common::ServiceProvider for ZenohClient {
query
);

// Evaluate the server function inside a spawn_blocking to uphold trait expectations from roslibrust_common
let server_copy = server.clone();
let response = tokio::task::spawn_blocking(move || server_copy(req)).await;

Expand All @@ -316,20 +235,16 @@ impl roslibrust_common::ServiceProvider for ZenohClient {
continue;
}
};

let send_result = svc.send_response_async(&valid_response, &query).await;
match send_result {
Ok(()) => {}
Err(e) => {
error!("Failed to send response to service {service_name}: {e:?}");
}
};
if let Err(e) = send_result {
error!("Failed to send response to service {service_name}: {e:?}");
}
}
};

tokio::select! {
_ = ct_copy.cancelled() => {
// Shutdown
}
_ = ct_copy.cancelled() => {}
_ = body_future => {
error!("Service task for {service_name} exited unexpectedly");
}
Expand Down Expand Up @@ -360,7 +275,6 @@ mod tests {
.unwrap()
}

// Ignored for now until we get ROS2 zenoh CI stable
#[ignore]
#[tokio::test(flavor = "multi_thread")]
async fn test_subscribe_basic() {
Expand All @@ -377,7 +291,6 @@ mod tests {
let mut pub_cmd = std::process::Command::new("ros2")
.arg("topic")
.arg("pub")
// Publish 10 times
.arg("-t")
.arg("10")
.arg("/chatter")
Expand Down Expand Up @@ -427,7 +340,6 @@ mod tests {
.expect("Failed to receive message within 2 seconds");
}

// Test is currently failing... Want to merge this code and then file issues to gradually fix
#[ignore]
#[tokio::test(flavor = "multi_thread")]
async fn test_service_server_callable() {
Expand All @@ -437,7 +349,6 @@ mod tests {
.unwrap();

let state = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));

let state_copy = state.clone();
let server_fn = move |request: roslibrust_test::ros2::std_srvs::SetBoolRequest| {
state_copy.store(request.data, std::sync::atomic::Ordering::SeqCst);
Expand Down Expand Up @@ -473,13 +384,9 @@ mod tests {
.await
.expect("Bool should be set true within 2 seconds");

// If we reach here, state was changed to true by the service call!

// Protection to make sure we don't leave a ros2 service call running
srv_call_cmd.kill().unwrap()
}

// Test disabled on Jan 21 '25, waiting for ros-z development to stabilize
#[ignore]
#[tokio::test(flavor = "multi_thread")]
async fn test_service_zenoh_to_zenoh() {
Expand All @@ -499,7 +406,6 @@ mod tests {
})
};

// NOTE: Cannot use multi-part names for services in ros2 currently
let _service = node
.advertise_service::<roslibrust_test::ros2::std_srvs::SetBool, _>(
"/test_service_zenoh_to_zenoh_set_bool",
Expand All @@ -508,10 +414,8 @@ mod tests {
.await
.unwrap();

// Give server time to start
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;

// Create service client and call the service
let response = node
.call_service::<roslibrust_test::ros2::std_srvs::SetBool>(
"/test_service_zenoh_to_zenoh_set_bool",
Expand All @@ -520,11 +424,8 @@ mod tests {
.await
.expect("Service call should succeed");

// Verify the response
assert!(response.success);
assert_eq!(response.message, "You set my bool!");

// Verify the server state was updated
assert!(state.load(std::sync::atomic::Ordering::SeqCst));
}
}
Expand Down
Loading