diff --git a/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp b/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp index 0624f92c62..32a2bdf92a 100644 --- a/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp +++ b/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp @@ -17,6 +17,7 @@ #include +#include #include #include #include @@ -29,7 +30,9 @@ #include "rclcpp/context.hpp" #include "rclcpp/experimental/buffers/intra_process_buffer.hpp" #include "rclcpp/experimental/subscription_intra_process_buffer.hpp" +#include "rclcpp/logging.hpp" #include "rclcpp/qos.hpp" +#include "rclcpp/time.hpp" #include "rclcpp/type_support_decl.hpp" #include "tracetools/tracetools.h" @@ -71,13 +74,18 @@ class SubscriptionIntraProcess using MessageUniquePtr = typename SubscriptionIntraProcessBufferT::SubscribedTypeUniquePtr; using BufferUniquePtr = typename SubscriptionIntraProcessBufferT::BufferUniquePtr; + /// Type-erased stats handler to avoid pulling in subscription_topic_statistics.hpp + /// which creates a circular include via publisher.hpp -> callback_group.hpp. + using StatsHandlerFn = std::function; + SubscriptionIntraProcess( AnySubscriptionCallback callback, std::shared_ptr allocator, rclcpp::Context::SharedPtr context, const std::string & topic_name, const rclcpp::QoS & qos_profile, - rclcpp::IntraProcessBufferType buffer_type) + rclcpp::IntraProcessBufferType buffer_type, + StatsHandlerFn stats_handler = nullptr) : SubscriptionIntraProcessBuffer( std::make_shared(*allocator), @@ -85,7 +93,8 @@ class SubscriptionIntraProcess topic_name, qos_profile, buffer_type), - any_callback_(callback) + any_callback_(callback), + stats_handler_(std::move(stats_handler)) { TRACETOOLS_TRACEPOINT( rclcpp_subscription_callback_added, @@ -174,6 +183,16 @@ class SubscriptionIntraProcess msg_info.publisher_gid = {0, {0}}; msg_info.from_intra_process = true; + std::chrono::time_point now; + if (stats_handler_) { + RCLCPP_WARN_ONCE( + rclcpp::get_logger("rclcpp"), + "Intra-process communication does not support accurate message age statistics"); + now = std::chrono::system_clock::now(); + const auto nanos = std::chrono::time_point_cast(now); + msg_info.source_timestamp = nanos.time_since_epoch().count(); + } + auto shared_ptr = std::static_pointer_cast>( data); @@ -185,12 +204,19 @@ class SubscriptionIntraProcess any_callback_.dispatch_intra_process(std::move(unique_msg), msg_info); } shared_ptr.reset(); + + if (stats_handler_) { + const auto nanos = std::chrono::time_point_cast(now); + stats_handler_(msg_info, rclcpp::Time(nanos.time_since_epoch().count())); + } } AnySubscriptionCallback any_callback_; + /// Type-erased statistics callback, populated when topic statistics are enabled. + StatsHandlerFn stats_handler_; }; } // namespace experimental } // namespace rclcpp -#endif // RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_HPP_ +#endif // RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_HPP_ \ No newline at end of file diff --git a/rclcpp/include/rclcpp/subscription.hpp b/rclcpp/include/rclcpp/subscription.hpp index 20f2b575f7..26d2359ecb 100644 --- a/rclcpp/include/rclcpp/subscription.hpp +++ b/rclcpp/include/rclcpp/subscription.hpp @@ -164,6 +164,19 @@ class Subscription : public SubscriptionBase ROSMessageT, AllocatorT>; + // Build a type-erased stats handler to pass into the IPC subscription. + // This avoids pulling subscription_topic_statistics.hpp into the IPC + // header where it causes circular include issues. + typename SubscriptionIntraProcessT::StatsHandlerFn stats_handler = nullptr; + if (subscription_topic_statistics) { + stats_handler = + [subscription_topic_statistics]( + const rmw_message_info_t & info, const rclcpp::Time & time) + { + subscription_topic_statistics->handle_message(info, time); + }; + } + // First create a SubscriptionIntraProcess which will be given to the intra-process manager. auto context = node_base->get_context(); subscription_intra_process_ = std::make_shared( @@ -172,7 +185,8 @@ class Subscription : public SubscriptionBase context, this->get_topic_name(), // important to get like this, as it has the fully-qualified name qos_profile, - resolve_intra_process_buffer_type(options_.intra_process_buffer_type, callback)); + resolve_intra_process_buffer_type(options_.intra_process_buffer_type, callback), + std::move(stats_handler)); TRACETOOLS_TRACEPOINT( rclcpp_subscription_init, static_cast(get_subscription_handle().get()), @@ -454,4 +468,4 @@ class Subscription : public SubscriptionBase } // namespace rclcpp -#endif // RCLCPP__SUBSCRIPTION_HPP_ +#endif // RCLCPP__SUBSCRIPTION_HPP_ \ No newline at end of file