25 using data_callback_t = std::function<void(
const typename MessageType::ConstSharedPtr&)>;
33 : m_node(options.
node),
35 m_sub_opts(options.subscription_options),
42 m_latest_message_time(0),
43 m_latest_message(
nullptr),
44 m_message_callback(message_callback)
50 m_timeout_mgr_callback = std::bind(options.
timeout_callback, m_topic_name, std::placeholders::_1);
53 m_timeout_mgr_callback = std::bind(&Impl::default_timeout_callback,
this, m_topic_name, std::placeholders::_1);
60 if (!m_timeout_manager)
63 std::make_shared<mrs_lib::TimeoutManager>(m_node, rclcpp::Rate(1.0 / (options.
no_message_timeout.seconds() * 0.5), m_node->get_clock()));
67 m_timeout_id = m_timeout_manager->registerNew(options.
no_message_timeout, m_timeout_mgr_callback, m_node->get_clock()->now());
70 const std::string msg =
"Subscribed to topic '" + m_topic_name +
"' -> '" + m_topic_name +
"'";
72 RCLCPP_INFO_STREAM(m_node->get_logger(), msg);
76 virtual ~Impl() =
default;
80 virtual typename MessageType::ConstSharedPtr
getMsg()
89 virtual typename MessageType::ConstSharedPtr
peekMsg()
const
95 return m_latest_message;
100 virtual bool hasMsg()
const
107 virtual bool newMsg()
const
121 virtual typename MessageType::ConstSharedPtr
waitForNew(
const rclcpp::Duration& timeout)
124 const std::chrono::duration<float> chrono_timeout(timeout.seconds());
126 std::unique_lock lock(m_new_data_mtx);
130 else if (m_new_data_cv.wait_for(lock, chrono_timeout) == std::cv_status::no_timeout && m_new_data)
140 return m_latest_message_time;
147 if (m_sub ==
nullptr)
152 std::string ret = m_sub->get_topic_name();
156 ret = m_node->get_node_topics_interface()->resolve_topic_name(ret);
166 return m_sub->get_publisher_count();
173 if (timeout == mrs_lib::no_timeout)
176 if (m_timeout_manager !=
nullptr && m_timeout_id.has_value())
177 m_timeout_manager->pause(m_timeout_id.value());
182 if (m_timeout_manager ==
nullptr)
183 m_timeout_manager = std::make_shared<mrs_lib::TimeoutManager>(m_node, rclcpp::Rate(1.0 / (timeout.seconds() * 0.5), m_node->get_clock()));
186 if (m_timeout_id.has_value())
187 m_timeout_manager->change(m_timeout_id.value(), timeout, m_timeout_mgr_callback, m_node->get_clock()->now());
191 m_timeout_id = m_timeout_manager->registerNew(timeout, m_timeout_mgr_callback, m_node->get_clock()->now());
199 if (m_timeout_manager && m_timeout_id.has_value())
200 m_timeout_manager->start(m_timeout_id.value(), m_node->get_clock()->now());
202 const std::function<void(
const typename MessageType::SharedPtr)> cbk = std::bind(&Impl::data_callback,
this, std::placeholders::_1);
204 m_sub = m_node->create_subscription<MessageType>(m_topic_name, m_qos, cbk, m_sub_opts);
211 if (m_timeout_manager && m_timeout_id.has_value())
213 m_timeout_manager->pause(m_timeout_id.value());
221 rclcpp::Node::SharedPtr m_node;
222 rclcpp::Subscription<MessageType>::SharedPtr m_sub;
223 rclcpp::QoS m_qos = rclcpp::SystemDefaultsQoS();
224 rclcpp::SubscriptionOptions m_sub_opts;
227 std::string m_topic_name;
228 std::string m_node_name;
233 mutable std::mutex m_new_data_mtx;
234 mutable std::condition_variable m_new_data_cv;
240 std::shared_ptr<mrs_lib::TimeoutManager> m_timeout_manager;
241 std::optional<mrs_lib::TimeoutManager::timeout_id_t> m_timeout_id;
242 mrs_lib::TimeoutManager::callback_t m_timeout_mgr_callback;
245 rclcpp::Time m_latest_message_time;
246 typename MessageType::ConstSharedPtr m_latest_message;
247 message_callback_t m_message_callback;
251 void default_timeout_callback(
const std::string& topic_name,
const rclcpp::Time& last_msg)
253 const rclcpp::Duration since_msg = (m_node->get_clock()->now() - last_msg);
254 const auto n_pubs = m_sub->get_publisher_count();
255 const std::string txt =
"Did not receive any message from topic '" + topic_name +
"' for " + std::to_string(since_msg.seconds()) +
"s ("
256 + std::to_string(n_pubs) +
" publishers on this topic)";
258 RCLCPP_WARN_STREAM(m_node->get_logger(), txt);
263 void process_new_message(
const typename MessageType::ConstSharedPtr& msg)
265 m_latest_message_time = m_node->get_clock()->now();
266 m_latest_message = msg;
269 m_new_data = !m_message_callback;
271 m_new_data_cv.notify_one();
276 virtual void data_callback(
const typename MessageType::ConstSharedPtr& msg)
279 std::lock_guard lck(m_new_data_mtx);
280 if (m_timeout_manager && m_timeout_id.has_value())
281 m_timeout_manager->reset(m_timeout_id.value(), m_node->get_clock()->now());
282 process_new_message(msg);
286 if (m_message_callback)
287 m_message_callback(msg);
301 using timeout_callback_t =
typename impl_class_t::timeout_callback_t;
302 using message_callback_t =
typename impl_class_t::message_callback_t;
308 : impl_class_t::Impl(options, message_callback)
313 virtual bool hasMsg()
const override
315 std::lock_guard lck(m_mtx);
316 return impl_class_t::hasMsg();
318 virtual bool newMsg()
const override
320 std::lock_guard lck(m_mtx);
321 return impl_class_t::newMsg();
323 virtual bool usedMsg()
const override
325 std::lock_guard lck(m_mtx);
326 return impl_class_t::usedMsg();
328 virtual typename MessageType::ConstSharedPtr
getMsg()
override
330 std::lock_guard lck(m_mtx);
331 return impl_class_t::getMsg();
333 virtual typename MessageType::ConstSharedPtr
peekMsg()
const override
335 std::lock_guard lck(m_mtx);
336 return impl_class_t::peekMsg();
340 std::lock_guard lck(m_mtx);
341 return impl_class_t::lastMsgTime();
343 virtual std::string
topicName()
const override
345 std::lock_guard lck(m_mtx);
346 return impl_class_t::topicName();
348 virtual void start()
override
350 std::lock_guard lck(m_mtx);
351 return impl_class_t::start();
353 virtual void stop()
override
355 std::lock_guard lck(m_mtx);
356 return impl_class_t::stop();
362 virtual void data_callback(
const typename MessageType::ConstSharedPtr& msg)
override
365 std::scoped_lock lck(m_mtx, this->m_new_data_mtx);
367 if (this->m_timeout_manager && this->m_timeout_id.has_value())
369 this->m_timeout_manager->reset(this->m_timeout_id.value(), this->m_node->get_clock()->now());
372 impl_class_t::process_new_message(msg);
376 if (this->m_message_callback)
377 impl_class_t::m_message_callback(msg);
381 mutable std::recursive_mutex m_mtx;