25 using data_callback_t = std::function<void(
const typename MessageType::ConstSharedPtr&)>;
33 : m_node(options.
node), m_qos(options.qos), m_sub_opts(options.subscription_options), m_topic_name(options.
topic_name), m_node_name(options.
node_name),
34 m_got_data(
false), m_new_data(
false), m_used_data(
false), m_timeout_manager(options.
timeout_manager), m_latest_message_time(0),
35 m_latest_message(
nullptr), m_message_callback(message_callback)
41 m_timeout_mgr_callback = std::bind(options.
timeout_callback, m_topic_name, std::placeholders::_1);
44 m_timeout_mgr_callback = std::bind(&Impl::default_timeout_callback,
this, m_topic_name, std::placeholders::_1);
51 if (!m_timeout_manager)
54 std::make_shared<mrs_lib::TimeoutManager>(m_node, rclcpp::Rate(1.0 / (options.
no_message_timeout.seconds() * 0.5), m_node->get_clock()));
58 m_timeout_id = m_timeout_manager->registerNew(options.
no_message_timeout, m_timeout_mgr_callback, m_node->get_clock()->now());
61 const std::string msg =
"Subscribed to topic '" + m_topic_name +
"' -> '" + m_topic_name +
"'";
63 RCLCPP_INFO_STREAM(m_node->get_logger(), msg);
67 virtual ~Impl() =
default;
71 virtual typename MessageType::ConstSharedPtr
getMsg()
80 virtual typename MessageType::ConstSharedPtr
peekMsg()
const
86 return m_latest_message;
91 virtual bool hasMsg()
const
98 virtual bool newMsg()
const
112 virtual typename MessageType::ConstSharedPtr
waitForNew(
const rclcpp::Duration& timeout)
115 const std::chrono::duration<float> chrono_timeout(timeout.seconds());
117 std::unique_lock lock(m_new_data_mtx);
121 else if (m_new_data_cv.wait_for(lock, chrono_timeout) == std::cv_status::no_timeout && m_new_data)
131 return m_latest_message_time;
138 if (m_sub ==
nullptr)
143 std::string ret = m_sub->get_topic_name();
147 ret = m_node->get_node_topics_interface()->resolve_topic_name(ret);
157 return m_sub->get_publisher_count();
164 if (timeout == mrs_lib::no_timeout)
167 if (m_timeout_manager !=
nullptr && m_timeout_id.has_value())
168 m_timeout_manager->pause(m_timeout_id.value());
173 if (m_timeout_manager ==
nullptr)
174 m_timeout_manager = std::make_shared<mrs_lib::TimeoutManager>(m_node, rclcpp::Rate(1.0 / (timeout.seconds() * 0.5), m_node->get_clock()));
177 if (m_timeout_id.has_value())
178 m_timeout_manager->change(m_timeout_id.value(), timeout, m_timeout_mgr_callback, m_node->get_clock()->now());
182 m_timeout_id = m_timeout_manager->registerNew(timeout, m_timeout_mgr_callback, m_node->get_clock()->now());
190 if (m_timeout_manager && m_timeout_id.has_value())
191 m_timeout_manager->start(m_timeout_id.value(), m_node->get_clock()->now());
193 const std::function<void(
const typename MessageType::SharedPtr)> cbk = std::bind(&Impl::data_callback,
this, std::placeholders::_1);
195 m_sub = m_node->create_subscription<MessageType>(m_topic_name, m_qos, cbk, m_sub_opts);
202 if (m_timeout_manager && m_timeout_id.has_value())
204 m_timeout_manager->pause(m_timeout_id.value());
212 rclcpp::Node::SharedPtr m_node;
213 rclcpp::Subscription<MessageType>::SharedPtr m_sub;
214 rclcpp::QoS m_qos = rclcpp::SystemDefaultsQoS();
215 rclcpp::SubscriptionOptions m_sub_opts;
218 std::string m_topic_name;
219 std::string m_node_name;
224 mutable std::mutex m_new_data_mtx;
225 mutable std::condition_variable m_new_data_cv;
231 std::shared_ptr<mrs_lib::TimeoutManager> m_timeout_manager;
232 std::optional<mrs_lib::TimeoutManager::timeout_id_t> m_timeout_id;
233 mrs_lib::TimeoutManager::callback_t m_timeout_mgr_callback;
236 rclcpp::Time m_latest_message_time;
237 typename MessageType::ConstSharedPtr m_latest_message;
238 message_callback_t m_message_callback;
242 void default_timeout_callback(
const std::string& topic_name,
const rclcpp::Time& last_msg)
244 const rclcpp::Duration since_msg = (m_node->get_clock()->now() - last_msg);
245 const auto n_pubs = m_sub->get_publisher_count();
246 const std::string txt =
"Did not receive any message from topic '" + topic_name +
"' for " + std::to_string(since_msg.seconds()) +
"s ("
247 + std::to_string(n_pubs) +
" publishers on this topic)";
249 RCLCPP_WARN_STREAM(m_node->get_logger(), txt);
254 void process_new_message(
const typename MessageType::ConstSharedPtr& msg)
256 m_latest_message_time = m_node->get_clock()->now();
257 m_latest_message = msg;
260 m_new_data = !m_message_callback;
262 m_new_data_cv.notify_one();
267 virtual void data_callback(
const typename MessageType::ConstSharedPtr& msg)
270 std::lock_guard lck(m_new_data_mtx);
271 if (m_timeout_manager && m_timeout_id.has_value())
272 m_timeout_manager->reset(m_timeout_id.value(), m_node->get_clock()->now());
273 process_new_message(msg);
277 if (m_message_callback)
278 m_message_callback(msg);
292 using timeout_callback_t =
typename impl_class_t::timeout_callback_t;
293 using message_callback_t =
typename impl_class_t::message_callback_t;
299 : impl_class_t::Impl(options, message_callback)
304 virtual bool hasMsg()
const override
306 std::lock_guard lck(m_mtx);
307 return impl_class_t::hasMsg();
309 virtual bool newMsg()
const override
311 std::lock_guard lck(m_mtx);
312 return impl_class_t::newMsg();
314 virtual bool usedMsg()
const override
316 std::lock_guard lck(m_mtx);
317 return impl_class_t::usedMsg();
319 virtual typename MessageType::ConstSharedPtr
getMsg()
override
321 std::lock_guard lck(m_mtx);
322 return impl_class_t::getMsg();
324 virtual typename MessageType::ConstSharedPtr
peekMsg()
const override
326 std::lock_guard lck(m_mtx);
327 return impl_class_t::peekMsg();
331 std::lock_guard lck(m_mtx);
332 return impl_class_t::lastMsgTime();
334 virtual std::string
topicName()
const override
336 std::lock_guard lck(m_mtx);
337 return impl_class_t::topicName();
339 virtual void start()
override
341 std::lock_guard lck(m_mtx);
342 return impl_class_t::start();
344 virtual void stop()
override
346 std::lock_guard lck(m_mtx);
347 return impl_class_t::stop();
353 virtual void data_callback(
const typename MessageType::ConstSharedPtr& msg)
override
356 std::scoped_lock lck(m_mtx, this->m_new_data_mtx);
358 if (this->m_timeout_manager && this->m_timeout_id.has_value())
360 this->m_timeout_manager->reset(this->m_timeout_id.value(), this->m_node->get_clock()->now());
363 impl_class_t::process_new_message(msg);
367 if (this->m_message_callback)
368 impl_class_t::m_message_callback(msg);
372 mutable std::recursive_mutex m_mtx;