25 using data_callback_t = std::function<void(
const typename MessageType::ConstSharedPtr&)>;
33 : m_node(options.
node), m_qos(options.qos), m_sub_opts(options.subscription_options), m_topic_name(options.
topic_name), m_node_name(options.
node_name),
34 m_got_data(
false), m_new_data(
false), m_used_data(
false), m_timeout_manager(options.
timeout_manager), m_latest_message_time(0),
35 m_latest_message(
nullptr), m_message_callback(message_callback)
41 m_timeout_mgr_callback = std::bind(options.
timeout_callback, m_topic_name, std::placeholders::_1);
44 m_timeout_mgr_callback = std::bind(&Impl::default_timeout_callback,
this, m_topic_name, std::placeholders::_1);
51 if (!m_timeout_manager)
54 std::make_shared<mrs_lib::TimeoutManager>(m_node, rclcpp::Rate(1.0 / (options.
no_message_timeout.seconds() * 0.5), m_node->get_clock()));
58 m_timeout_id = m_timeout_manager->registerNew(options.
no_message_timeout, m_timeout_mgr_callback, m_node->get_clock()->now());
63 virtual ~Impl() =
default;
67 virtual typename MessageType::ConstSharedPtr
getMsg()
76 virtual typename MessageType::ConstSharedPtr
peekMsg()
const
82 return m_latest_message;
87 virtual bool hasMsg()
const
94 virtual bool newMsg()
const
108 virtual typename MessageType::ConstSharedPtr
waitForNew(
const rclcpp::Duration& timeout)
111 const std::chrono::duration<float> chrono_timeout(timeout.seconds());
113 std::unique_lock lock(m_new_data_mtx);
117 else if (m_new_data_cv.wait_for(lock, chrono_timeout) == std::cv_status::no_timeout && m_new_data)
127 return m_latest_message_time;
134 if (m_sub ==
nullptr)
139 std::string ret = m_sub->get_topic_name();
143 ret = m_node->get_node_topics_interface()->resolve_topic_name(m_topic_name);
153 return m_sub->get_publisher_count();
160 if (timeout == mrs_lib::no_timeout)
163 if (m_timeout_manager !=
nullptr && m_timeout_id.has_value())
164 m_timeout_manager->pause(m_timeout_id.value());
169 if (m_timeout_manager ==
nullptr)
170 m_timeout_manager = std::make_shared<mrs_lib::TimeoutManager>(m_node, rclcpp::Rate(1.0 / (timeout.seconds() * 0.5), m_node->get_clock()));
173 if (m_timeout_id.has_value())
174 m_timeout_manager->change(m_timeout_id.value(), timeout, m_timeout_mgr_callback, m_node->get_clock()->now());
178 m_timeout_id = m_timeout_manager->registerNew(timeout, m_timeout_mgr_callback, m_node->get_clock()->now());
186 if (m_timeout_manager && m_timeout_id.has_value())
187 m_timeout_manager->start(m_timeout_id.value(), m_node->get_clock()->now());
189 const std::function<void(
const typename MessageType::SharedPtr)> cbk = std::bind(&Impl::data_callback,
this, std::placeholders::_1);
191 m_sub = m_node->create_subscription<MessageType>(m_topic_name, m_qos, cbk, m_sub_opts);
192 RCLCPP_INFO_STREAM(m_node->get_logger(),
"Subscribed to topic '" << m_topic_name <<
"' -> '" << m_sub->get_topic_name() <<
"'");
199 if (m_timeout_manager && m_timeout_id.has_value())
201 m_timeout_manager->pause(m_timeout_id.value());
209 rclcpp::Node::SharedPtr m_node;
210 rclcpp::Subscription<MessageType>::SharedPtr m_sub;
211 rclcpp::QoS m_qos = rclcpp::SystemDefaultsQoS();
212 rclcpp::SubscriptionOptions m_sub_opts;
215 std::string m_topic_name;
216 std::string m_node_name;
221 mutable std::mutex m_new_data_mtx;
222 mutable std::condition_variable m_new_data_cv;
228 std::shared_ptr<mrs_lib::TimeoutManager> m_timeout_manager;
229 std::optional<mrs_lib::TimeoutManager::timeout_id_t> m_timeout_id;
230 mrs_lib::TimeoutManager::callback_t m_timeout_mgr_callback;
233 rclcpp::Time m_latest_message_time;
234 typename MessageType::ConstSharedPtr m_latest_message;
235 message_callback_t m_message_callback;
239 void default_timeout_callback(
const std::string& topic_name,
const rclcpp::Time& last_msg)
241 const rclcpp::Duration since_msg = (m_node->get_clock()->now() - last_msg);
242 const auto n_pubs = m_sub->get_publisher_count();
243 const std::string txt =
"Did not receive any message from topic '" + topic_name +
"' for " + std::to_string(since_msg.seconds()) +
"s ("
244 + std::to_string(n_pubs) +
" publishers on this topic)";
246 RCLCPP_WARN_STREAM(m_node->get_logger(), txt);
251 void process_new_message(
const typename MessageType::ConstSharedPtr& msg)
253 m_latest_message_time = m_node->get_clock()->now();
254 m_latest_message = msg;
257 m_new_data = !m_message_callback;
259 m_new_data_cv.notify_one();
264 virtual void data_callback(
const typename MessageType::ConstSharedPtr& msg)
267 std::lock_guard lck(m_new_data_mtx);
268 if (m_timeout_manager && m_timeout_id.has_value())
269 m_timeout_manager->reset(m_timeout_id.value(), m_node->get_clock()->now());
270 process_new_message(msg);
274 if (m_message_callback)
275 m_message_callback(msg);
289 using timeout_callback_t =
typename impl_class_t::timeout_callback_t;
290 using message_callback_t =
typename impl_class_t::message_callback_t;
296 : impl_class_t::Impl(options, message_callback)
301 virtual bool hasMsg()
const override
303 std::lock_guard lck(m_mtx);
304 return impl_class_t::hasMsg();
306 virtual bool newMsg()
const override
308 std::lock_guard lck(m_mtx);
309 return impl_class_t::newMsg();
311 virtual bool usedMsg()
const override
313 std::lock_guard lck(m_mtx);
314 return impl_class_t::usedMsg();
316 virtual typename MessageType::ConstSharedPtr
getMsg()
override
318 std::lock_guard lck(m_mtx);
319 return impl_class_t::getMsg();
321 virtual typename MessageType::ConstSharedPtr
peekMsg()
const override
323 std::lock_guard lck(m_mtx);
324 return impl_class_t::peekMsg();
328 std::lock_guard lck(m_mtx);
329 return impl_class_t::lastMsgTime();
331 virtual std::string
topicName()
const override
333 std::lock_guard lck(m_mtx);
334 return impl_class_t::topicName();
336 virtual void start()
override
338 std::lock_guard lck(m_mtx);
339 return impl_class_t::start();
341 virtual void stop()
override
343 std::lock_guard lck(m_mtx);
344 return impl_class_t::stop();
350 virtual void data_callback(
const typename MessageType::ConstSharedPtr& msg)
override
353 std::scoped_lock lck(m_mtx, this->m_new_data_mtx);
355 if (this->m_timeout_manager && this->m_timeout_id.has_value())
357 this->m_timeout_manager->reset(this->m_timeout_id.value(), this->m_node->get_clock()->now());
360 impl_class_t::process_new_message(msg);
364 if (this->m_message_callback)
365 impl_class_t::m_message_callback(msg);
369 mutable std::recursive_mutex m_mtx;