3 #ifndef SUBSCRIBE_HANDLER_HPP
4 #define SUBSCRIBE_HANDLER_HPP
7 #include <mrs_lib/timer.h>
9 #include <condition_variable>
15 template <
typename MessageType>
21 using data_callback_t = std::function<void(
const typename MessageType::ConstPtr&)>;
36 m_latest_message_time(0),
37 m_latest_message(
nullptr),
38 m_message_callback(message_callback),
46 m_timeout_mgr_callback = std::bind(&Impl::default_timeout_callback,
this,
topicName(), std::placeholders::_1);
51 if (!m_timeout_manager)
52 m_timeout_manager = std::make_shared<mrs_lib::TimeoutManager>(m_nh, ros::Rate(options.
no_message_timeout * 0.5));
55 m_timeout_id = m_timeout_manager->registerNew(options.
no_message_timeout, m_timeout_mgr_callback);
58 const std::string msg =
"Subscribed to topic '" + m_topic_name +
"' -> '" +
topicName() +
"'";
59 if (m_node_name.empty())
62 ROS_INFO_STREAM(
"[" << m_node_name <<
"]: " << msg);
66 virtual ~
Impl() =
default;
70 virtual typename MessageType::ConstPtr
getMsg()
79 virtual typename MessageType::ConstPtr
peekMsg()
const
85 return m_latest_message;
90 virtual bool hasMsg()
const
97 virtual bool newMsg()
const
111 virtual typename MessageType::ConstPtr
waitForNew(
const ros::WallDuration& timeout)
114 const std::chrono::duration<float> chrono_timeout(timeout.toSec());
116 std::unique_lock lock(m_new_data_mtx);
120 else if (m_new_data_cv.wait_for(lock, chrono_timeout) == std::cv_status::no_timeout && m_new_data)
130 return m_latest_message_time;
137 std::string ret = m_sub.getTopic();
139 ret = m_nh.resolveName(m_topic_name);
147 return m_sub.getNumPublishers();
154 if (timeout == mrs_lib::no_timeout)
157 if (m_timeout_manager !=
nullptr && m_timeout_id.has_value())
158 m_timeout_manager->pause(m_timeout_id.value());
164 if (m_timeout_manager ==
nullptr)
165 m_timeout_manager = std::make_shared<mrs_lib::TimeoutManager>(m_nh, ros::Rate(timeout * 0.5));
168 if (m_timeout_id.has_value())
169 m_timeout_manager->change(m_timeout_id.value(), timeout, m_timeout_mgr_callback);
172 m_timeout_id = m_timeout_manager->registerNew(timeout, m_timeout_mgr_callback);
180 if (m_timeout_manager && m_timeout_id.has_value())
181 m_timeout_manager->start(m_timeout_id.value());
182 m_sub = m_nh.subscribe(m_topic_name, m_queue_size, &Impl::data_callback,
this, m_transport_hints);
189 if (m_timeout_manager && m_timeout_id.has_value())
190 m_timeout_manager->pause(m_timeout_id.value());
196 ros::NodeHandle m_nh;
197 ros::Subscriber m_sub;
200 std::string m_topic_name;
201 std::string m_node_name;
206 mutable std::mutex m_new_data_mtx;
207 mutable std::condition_variable m_new_data_cv;
213 std::shared_ptr<mrs_lib::TimeoutManager> m_timeout_manager;
214 std::optional<mrs_lib::TimeoutManager::timeout_id_t> m_timeout_id;
215 mrs_lib::TimeoutManager::callback_t m_timeout_mgr_callback;
218 ros::Time m_latest_message_time;
219 typename MessageType::ConstPtr m_latest_message;
220 message_callback_t m_message_callback;
223 uint32_t m_queue_size;
224 ros::TransportHints m_transport_hints;
228 void default_timeout_callback(
const std::string& topic_name,
const ros::Time& last_msg)
230 const ros::Duration since_msg = (ros::Time::now() - last_msg);
231 const auto n_pubs = m_sub.getNumPublishers();
232 const std::string txt =
"Did not receive any message from topic '" + topic_name +
"' for " + std::to_string(since_msg.toSec()) +
"s ("
233 + std::to_string(n_pubs) +
" publishers on this topic)";
234 if (m_node_name.empty())
235 ROS_WARN_STREAM(txt);
237 ROS_WARN_STREAM(
"[" << m_node_name <<
"]: " << txt);
242 void process_new_message(
const typename MessageType::ConstPtr& msg)
244 m_latest_message_time = ros::Time::now();
245 m_latest_message = msg;
248 m_new_data = !m_message_callback;
250 m_new_data_cv.notify_one();
255 virtual void data_callback(
const typename MessageType::ConstPtr& msg)
258 std::lock_guard lck(m_new_data_mtx);
259 if (m_timeout_manager && m_timeout_id.has_value())
260 m_timeout_manager->reset(m_timeout_id.value());
261 process_new_message(msg);
265 if (m_message_callback)
266 m_message_callback(msg);
273 template <
typename MessageType>
280 using timeout_callback_t =
typename impl_class_t::timeout_callback_t;
281 using message_callback_t =
typename impl_class_t::message_callback_t;
287 : impl_class_t::Impl(options, message_callback)
292 virtual bool hasMsg()
const override
294 std::lock_guard lck(m_mtx);
295 return impl_class_t::hasMsg();
297 virtual bool newMsg()
const override
299 std::lock_guard lck(m_mtx);
300 return impl_class_t::newMsg();
302 virtual bool usedMsg()
const override
304 std::lock_guard lck(m_mtx);
305 return impl_class_t::usedMsg();
307 virtual typename MessageType::ConstPtr
getMsg()
override
309 std::lock_guard lck(m_mtx);
310 return impl_class_t::getMsg();
312 virtual typename MessageType::ConstPtr
peekMsg()
const override
314 std::lock_guard lck(m_mtx);
315 return impl_class_t::peekMsg();
319 std::lock_guard lck(m_mtx);
320 return impl_class_t::lastMsgTime();
322 virtual std::string
topicName()
const override
324 std::lock_guard lck(m_mtx);
325 return impl_class_t::topicName();
327 virtual void start()
override
329 std::lock_guard lck(m_mtx);
330 return impl_class_t::start();
332 virtual void stop()
override
334 std::lock_guard lck(m_mtx);
335 return impl_class_t::stop();
341 virtual void data_callback(
const typename MessageType::ConstPtr& msg)
override
344 std::scoped_lock lck(m_mtx, this->m_new_data_mtx);
345 if (this->m_timeout_manager && this->m_timeout_id.has_value())
346 this->m_timeout_manager->reset(this->m_timeout_id.value());
347 impl_class_t::process_new_message(msg);
351 if (this->m_message_callback)
352 impl_class_t::m_message_callback(msg);
356 mutable std::recursive_mutex m_mtx;
362 #endif // SUBSCRIBE_HANDLER_HPP