diff --git a/roslibrust_ros2/Cargo.toml b/roslibrust_ros2/Cargo.toml index c7d2cb0..1cc54fb 100644 --- a/roslibrust_ros2/Cargo.toml +++ b/roslibrust_ros2/Cargo.toml @@ -13,7 +13,7 @@ cdr = "0.2" roslibrust_common = { path = "../roslibrust_common", version = "0.20" } anyhow = "1.0" # Experimental "raw ros" library, we're going to try to build on top of -ros-z = { git = "https://github.com/ZettaScaleLabs/ros-z.git" } +ros-z = { path = "/home/circle/Workings/ZettaScale/zenoh-ws/project/nix-ros/src/worktree-ros-z/serdes-param/crates/ros-z", features = ["jazzy"] } log = "0.4" # Used for cancellation token tokio-util = "0.7" diff --git a/roslibrust_ros2/src/lib.rs b/roslibrust_ros2/src/lib.rs index 48019c3..b919c27 100644 --- a/roslibrust_ros2/src/lib.rs +++ b/roslibrust_ros2/src/lib.rs @@ -5,123 +5,53 @@ use std::result::Result as StdResult; use ros_z::{ context::ZContext, entity::{TypeHash, TypeInfo}, - msg::ZService, + msg::{CdrCompatSerdes, ZService}, pubsub::{ZPub, ZSub}, - ros_msg::{ServiceTypeInfo, WithTypeInfo}, + ros_msg::ServiceTypeInfo, Builder, }; /// re-export ros_z for consumers pub use ros_z; -/// Wrapper type that implements WithTypeInfo for RosMessageType -/// This allows RosMessageType implementations to work with ros-z's type system -pub struct RosMessageWrapper(pub T); - -impl ros_z::ros_msg::MessageTypeInfo for RosMessageWrapper { - fn type_name() -> &'static str { - T::ROS2_TYPE_NAME - } - - fn type_hash() -> TypeHash { - TypeHash::new(1, *T::ROS2_HASH) - } -} - -impl WithTypeInfo for RosMessageWrapper {} - -// Custom serializer that treats RosMessageWrapper as T for serialization purposes -pub struct WrapperSerdes(std::marker::PhantomData); - -impl ros_z::msg::ZSerializer for WrapperSerdes { - type Input<'a> = &'a RosMessageWrapper; - fn serialize(input: Self::Input<'_>) -> Vec { - ros_z::msg::CdrSerdes::::serialize(&input.0) - } - - fn serialize_to_zbuf(input: Self::Input<'_>) -> zenoh_buffers::ZBuf { - ros_z::msg::CdrSerdes::::serialize_to_zbuf(&input.0) - } - - fn serialize_to_buf(input: Self::Input<'_>, buffer: &mut Vec) { - ros_z::msg::CdrSerdes::::serialize_to_buf(&input.0, buffer) - } - - fn serialize_to_zbuf_with_hint( - input: Self::Input<'_>, - capacity_hint: usize, - ) -> zenoh_buffers::ZBuf { - ros_z::msg::CdrSerdes::::serialize_to_zbuf_with_hint(&input.0, capacity_hint) - } - - fn serialize_to_shm( - input: Self::Input<'_>, - estimated_size: usize, - provider: &zenoh::shm::ShmProvider, - ) -> zenoh::Result<(zenoh_buffers::ZBuf, usize)> { - ros_z::msg::CdrSerdes::::serialize_to_shm(&input.0, estimated_size, provider) - } -} - -impl ros_z::msg::ZDeserializer for WrapperSerdes { - type Input<'a> = &'a [u8]; - type Output = RosMessageWrapper; - type Error = ros_z::msg::CdrError; - fn deserialize(data: Self::Input<'_>) -> std::result::Result { - let inner = ros_z::msg::CdrSerdes::::deserialize(data)?; - Ok(RosMessageWrapper(inner)) - } -} - -impl ros_z::msg::ZMessage for RosMessageWrapper { - type Serdes = WrapperSerdes; -} - /// A "newtype" wrapper around ZNode so we can implement roslibrust's traits for it. pub struct ZenohClient { - // TODO: We'll probably end up wrapping node in an Arc<> so we can make this clone for users node: ros_z::node::ZNode, } -// A "newtype" wrapper around ZPub so we can implement roslibrust's traits for it. /// The publisher type returned by [TopicProvider::advertise] on [ZenohClient]. -/// This type is self de-registering, and dropping the publisher will automatically un-advertise the topic. -/// This type is generic on the message type that will be published. pub struct ZenohPublisher { - publisher: ZPub, WrapperSerdes>, - _marker: std::marker::PhantomData, + publisher: ZPub, } -impl Publish for ZenohPublisher { +impl + Publish for ZenohPublisher +{ async fn publish(&self, data: &T) -> Result<()> { self.publisher - .async_publish(&RosMessageWrapper(data.clone())) + .async_publish(data) .await - // TODO: should work on error type here with ros_z team .map_err(|e| Error::Unexpected(anyhow::anyhow!(e))) } } /// The subscriber type returned by [TopicProvider::subscribe] on [ZenohClient]. -/// This type is self de-registering, and dropping the subscriber will automatically unsubscribe from the topic. -/// This type is generic on the message type that will be received. -/// It is typically used with types generated by roslibrust's codegen. pub struct ZenohSubscriber { - subscriber: ZSub, zenoh::sample::Sample, WrapperSerdes>, - _marker: std::marker::PhantomData, + subscriber: ZSub, } -impl Subscribe for ZenohSubscriber { +impl + Subscribe for ZenohSubscriber +{ async fn next(&mut self) -> Result { - let next = self.subscriber.async_recv().await; - let msg = next.map_err(|e| Error::Unexpected(anyhow::anyhow!(e)))?; - Ok(msg.0) + self.subscriber + .async_recv() + .await + .map_err(|e| Error::Unexpected(anyhow::anyhow!(e))) } } impl ZenohClient { - // TODO don't love error type here... could work with ros_z to get better errors - /// Creates a new node with the specified name and return a handle to it. pub async fn new( ctx: &ZContext, name: impl AsRef, @@ -131,6 +61,11 @@ impl ZenohClient { } } +/// Build a TypeInfo from a RosMessageType's type name and hash. +fn ros_type_info() -> TypeInfo { + TypeInfo::new(T::ROS2_TYPE_NAME, TypeHash::new(1, *T::ROS2_HASH)) +} + impl roslibrust_common::TopicProvider for ZenohClient { type Publisher = ZenohPublisher; type Subscriber = ZenohSubscriber; @@ -142,16 +77,12 @@ impl roslibrust_common::TopicProvider for ZenohClient { let topic: roslibrust_common::GlobalTopicName = topic.to_global_name()?; let publisher = self .node - .create_pub::>(topic.as_ref()) - .with_serdes::>() + .create_pub_impl::(topic.as_ref(), Some(ros_type_info::())) + .with_serdes::() .build() - // TODO better errors .map_err(|e| Error::Unexpected(anyhow::anyhow!(e)))?; - Ok(ZenohPublisher { - publisher, - _marker: std::marker::PhantomData, - }) + Ok(ZenohPublisher { publisher }) } async fn subscribe( @@ -161,23 +92,16 @@ impl roslibrust_common::TopicProvider for ZenohClient { let topic: roslibrust_common::GlobalTopicName = topic.to_global_name()?; let sub = self .node - .create_sub::>(topic.as_ref()) - .with_serdes::>() + .create_sub_impl::(topic.as_ref(), Some(ros_type_info::())) + .with_serdes::() .build() - // TODO better errors .map_err(|e| Error::Unexpected(anyhow::anyhow!(e)))?; - Ok(ZenohSubscriber { - subscriber: sub, - _marker: std::marker::PhantomData, - }) + Ok(ZenohSubscriber { subscriber: sub }) } } -// TODO MAJOR: problem here ZService trait can't be implemented for our example messages due to orphan rule... -// Have to do some gross work around to get roslibrust::RosServiceType and ros_z::ZService to play nicely together pub struct ZenohServiceServer { - // Used to shutdown server task when dropped cancellation_token: tokio_util::sync::CancellationToken, } @@ -192,8 +116,10 @@ pub struct ZenohServiceClient { _marker: std::marker::PhantomData, } -// Helper struct to work around orphan rule +// Orphan-rule shim: ZService (ros_z) cannot be impl'd for T: RosServiceType (roslibrust_common) +// in a downstream crate. This unit struct bridges the two. struct Fake(std::marker::PhantomData); + impl ZService for Fake { type Request = T::Request; type Response = T::Response; @@ -207,19 +133,14 @@ impl ServiceTypeInfo for Fake { impl roslibrust_common::Service for ZenohServiceClient { async fn call(&self, request: &T::Request) -> Result { - // Send the request self.client .send_request(request) .await .map_err(|e| Error::Unexpected(anyhow::anyhow!(e)))?; - // Wait for and take the response - let response = self - .client + self.client .take_response() - .map_err(|e| Error::Unexpected(anyhow::anyhow!(e)))?; - - Ok(response) + .map_err(|e| Error::Unexpected(anyhow::anyhow!(e))) } } @@ -233,7 +154,6 @@ impl roslibrust_common::ServiceProvider for ZenohClient { request: SrvType::Request, ) -> Result { let service: roslibrust_common::GlobalTopicName = service.to_global_name()?; - // Create a service client and call it once let client = ZenohClient::service_client::(self, service.as_ref()).await?; client.call(&request).await } @@ -261,13 +181,14 @@ impl roslibrust_common::ServiceProvider for ZenohClient { server: F, ) -> Result { let service: roslibrust_common::GlobalTopicName = service.to_global_name()?; - // TODO: doing some really dome stuff here... to work around orphan rule and RosServiceType != ZService - struct Fake(T); - impl ZService for Fake { + + // Local Fake for advertise_service (same orphan workaround as service_client) + struct LocalFake(T); + impl ZService for LocalFake { type Request = T::Request; type Response = T::Response; } - impl ServiceTypeInfo for Fake { + impl ServiceTypeInfo for LocalFake { fn service_type_info() -> TypeInfo { TypeInfo::new(T::ROS2_TYPE_NAME, TypeHash::new(1, *T::ROS2_HASH)) } @@ -275,16 +196,15 @@ impl roslibrust_common::ServiceProvider for ZenohClient { let mut svc = self .node - .create_service::>(service.as_ref()) + .create_service::>(service.as_ref()) .build() .map_err(|e| Error::Unexpected(anyhow::anyhow!(e)))?; let cancellation_token = tokio_util::sync::CancellationToken::new(); - - // Build a clone wrapper for the users's function let server = std::sync::Arc::new(server); let service_name = String::from(service); let ct_copy = cancellation_token.clone(); + tokio::spawn(async move { let body_future = async { loop { @@ -301,7 +221,6 @@ impl roslibrust_common::ServiceProvider for ZenohClient { query ); - // Evaluate the server function inside a spawn_blocking to uphold trait expectations from roslibrust_common let server_copy = server.clone(); let response = tokio::task::spawn_blocking(move || server_copy(req)).await; @@ -316,20 +235,16 @@ impl roslibrust_common::ServiceProvider for ZenohClient { continue; } }; + let send_result = svc.send_response_async(&valid_response, &query).await; - match send_result { - Ok(()) => {} - Err(e) => { - error!("Failed to send response to service {service_name}: {e:?}"); - } - }; + if let Err(e) = send_result { + error!("Failed to send response to service {service_name}: {e:?}"); + } } }; tokio::select! { - _ = ct_copy.cancelled() => { - // Shutdown - } + _ = ct_copy.cancelled() => {} _ = body_future => { error!("Service task for {service_name} exited unexpectedly"); } @@ -360,7 +275,6 @@ mod tests { .unwrap() } - // Ignored for now until we get ROS2 zenoh CI stable #[ignore] #[tokio::test(flavor = "multi_thread")] async fn test_subscribe_basic() { @@ -377,7 +291,6 @@ mod tests { let mut pub_cmd = std::process::Command::new("ros2") .arg("topic") .arg("pub") - // Publish 10 times .arg("-t") .arg("10") .arg("/chatter") @@ -427,7 +340,6 @@ mod tests { .expect("Failed to receive message within 2 seconds"); } - // Test is currently failing... Want to merge this code and then file issues to gradually fix #[ignore] #[tokio::test(flavor = "multi_thread")] async fn test_service_server_callable() { @@ -437,7 +349,6 @@ mod tests { .unwrap(); let state = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)); - let state_copy = state.clone(); let server_fn = move |request: roslibrust_test::ros2::std_srvs::SetBoolRequest| { state_copy.store(request.data, std::sync::atomic::Ordering::SeqCst); @@ -473,13 +384,9 @@ mod tests { .await .expect("Bool should be set true within 2 seconds"); - // If we reach here, state was changed to true by the service call! - - // Protection to make sure we don't leave a ros2 service call running srv_call_cmd.kill().unwrap() } - // Test disabled on Jan 21 '25, waiting for ros-z development to stabilize #[ignore] #[tokio::test(flavor = "multi_thread")] async fn test_service_zenoh_to_zenoh() { @@ -499,7 +406,6 @@ mod tests { }) }; - // NOTE: Cannot use multi-part names for services in ros2 currently let _service = node .advertise_service::( "/test_service_zenoh_to_zenoh_set_bool", @@ -508,10 +414,8 @@ mod tests { .await .unwrap(); - // Give server time to start tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; - // Create service client and call the service let response = node .call_service::( "/test_service_zenoh_to_zenoh_set_bool", @@ -520,11 +424,8 @@ mod tests { .await .expect("Service call should succeed"); - // Verify the response assert!(response.success); assert_eq!(response.message, "You set my bool!"); - - // Verify the server state was updated assert!(state.load(std::sync::atomic::Ordering::SeqCst)); } }