Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 29 additions & 3 deletions rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include <rmw/types.h>

#include <functional>
#include <memory>
#include <stdexcept>
#include <string>
Comment on lines 18 to 23
Expand All @@ -29,7 +30,9 @@
#include "rclcpp/context.hpp"
#include "rclcpp/experimental/buffers/intra_process_buffer.hpp"
#include "rclcpp/experimental/subscription_intra_process_buffer.hpp"
#include "rclcpp/logging.hpp"
#include "rclcpp/qos.hpp"
#include "rclcpp/time.hpp"
#include "rclcpp/type_support_decl.hpp"
#include "tracetools/tracetools.h"

Expand Down Expand Up @@ -71,21 +74,27 @@ class SubscriptionIntraProcess
using MessageUniquePtr = typename SubscriptionIntraProcessBufferT::SubscribedTypeUniquePtr;
using BufferUniquePtr = typename SubscriptionIntraProcessBufferT::BufferUniquePtr;

/// Type-erased stats handler to avoid pulling in subscription_topic_statistics.hpp
/// which creates a circular include via publisher.hpp -> callback_group.hpp.
using StatsHandlerFn = std::function<void(const rmw_message_info_t &, const rclcpp::Time &)>;

SubscriptionIntraProcess(
AnySubscriptionCallback<MessageT, Alloc> callback,
std::shared_ptr<Alloc> allocator,
rclcpp::Context::SharedPtr context,
const std::string & topic_name,
const rclcpp::QoS & qos_profile,
rclcpp::IntraProcessBufferType buffer_type)
rclcpp::IntraProcessBufferType buffer_type,
StatsHandlerFn stats_handler = nullptr)
: SubscriptionIntraProcessBuffer<SubscribedType, SubscribedTypeAlloc,
SubscribedTypeDeleter, ROSMessageType>(
std::make_shared<SubscribedTypeAlloc>(*allocator),
context,
topic_name,
qos_profile,
buffer_type),
any_callback_(callback)
any_callback_(callback),
stats_handler_(std::move(stats_handler))
{
TRACETOOLS_TRACEPOINT(
rclcpp_subscription_callback_added,
Expand Down Expand Up @@ -174,6 +183,16 @@ class SubscriptionIntraProcess
msg_info.publisher_gid = {0, {0}};
msg_info.from_intra_process = true;

std::chrono::time_point<std::chrono::system_clock> now;
if (stats_handler_) {
RCLCPP_WARN_ONCE(
rclcpp::get_logger("rclcpp"),
"Intra-process communication does not support accurate message age statistics");
now = std::chrono::system_clock::now();
const auto nanos = std::chrono::time_point_cast<std::chrono::nanoseconds>(now);
msg_info.source_timestamp = nanos.time_since_epoch().count();
}

auto shared_ptr = std::static_pointer_cast<std::pair<ConstMessageSharedPtr, MessageUniquePtr>>(
data);

Expand All @@ -185,12 +204,19 @@ class SubscriptionIntraProcess
any_callback_.dispatch_intra_process(std::move(unique_msg), msg_info);
}
shared_ptr.reset();

if (stats_handler_) {
const auto nanos = std::chrono::time_point_cast<std::chrono::nanoseconds>(now);
stats_handler_(msg_info, rclcpp::Time(nanos.time_since_epoch().count()));
}
}

AnySubscriptionCallback<MessageT, Alloc> any_callback_;
/// Type-erased statistics callback, populated when topic statistics are enabled.
StatsHandlerFn stats_handler_;
};

} // namespace experimental
} // namespace rclcpp

#endif // RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_HPP_
#endif // RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_HPP_
18 changes: 16 additions & 2 deletions rclcpp/include/rclcpp/subscription.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,19 @@ class Subscription : public SubscriptionBase
ROSMessageT,
AllocatorT>;

// Build a type-erased stats handler to pass into the IPC subscription.
// This avoids pulling subscription_topic_statistics.hpp into the IPC
// header where it causes circular include issues.
typename SubscriptionIntraProcessT::StatsHandlerFn stats_handler = nullptr;
if (subscription_topic_statistics) {
Comment on lines +167 to +171
stats_handler =
[subscription_topic_statistics](
const rmw_message_info_t & info, const rclcpp::Time & time)
{
subscription_topic_statistics->handle_message(info, time);
};
}

// First create a SubscriptionIntraProcess which will be given to the intra-process manager.
auto context = node_base->get_context();
subscription_intra_process_ = std::make_shared<SubscriptionIntraProcessT>(
Expand All @@ -172,7 +185,8 @@ class Subscription : public SubscriptionBase
context,
this->get_topic_name(), // important to get like this, as it has the fully-qualified name
qos_profile,
resolve_intra_process_buffer_type(options_.intra_process_buffer_type, callback));
resolve_intra_process_buffer_type(options_.intra_process_buffer_type, callback),
std::move(stats_handler));
TRACETOOLS_TRACEPOINT(
rclcpp_subscription_init,
static_cast<const void *>(get_subscription_handle().get()),
Expand Down Expand Up @@ -454,4 +468,4 @@ class Subscription : public SubscriptionBase

} // namespace rclcpp

#endif // RCLCPP__SUBSCRIPTION_HPP_
#endif // RCLCPP__SUBSCRIPTION_HPP_
Loading