mrs_lib
Various reusable classes, functions and utilities for use in MRS projects
subscribe_handler.hpp
1 // clang: MatousFormat
2 
3 #ifndef SUBSCRIBE_HANDLER_HPP
4 #define SUBSCRIBE_HANDLER_HPP
5 
7 #include <mrs_lib/timer.h>
8 #include <mutex>
9 #include <condition_variable>
10 
11 namespace mrs_lib
12 {
13  /* SubscribeHandler::Impl class //{ */
14  // implements the constructor, getMsg() method and data_callback method (non-thread-safe)
15  template <typename MessageType>
16  class SubscribeHandler<MessageType>::Impl
17  {
18  public:
19  using timeout_callback_t = typename SubscribeHandler<MessageType>::timeout_callback_t;
20  using message_callback_t = typename SubscribeHandler<MessageType>::message_callback_t;
21  using data_callback_t = std::function<void(const typename MessageType::ConstPtr&)>;
22 
23  private:
24  friend class SubscribeHandler<MessageType>;
25 
26  public:
27  /* constructor //{ */
28  Impl(const SubscribeHandlerOptions& options, const message_callback_t& message_callback = message_callback_t())
29  : m_nh(options.nh),
30  m_topic_name(options.topic_name),
31  m_node_name(options.node_name),
32  m_got_data(false),
33  m_new_data(false),
34  m_used_data(false),
35  m_timeout_manager(options.timeout_manager),
36  m_latest_message_time(0),
37  m_latest_message(nullptr),
38  m_message_callback(message_callback),
39  m_queue_size(options.queue_size),
40  m_transport_hints(options.transport_hints)
41  {
42  // initialize the callback for the TimeoutManager
43  if (options.timeout_callback)
44  m_timeout_mgr_callback = std::bind(options.timeout_callback, topicName(), std::placeholders::_1);
45  else
46  m_timeout_mgr_callback = std::bind(&Impl::default_timeout_callback, this, topicName(), std::placeholders::_1);
47 
48  if (options.no_message_timeout != mrs_lib::no_timeout)
49  {
50  // initialize a new TimeoutManager if not provided by the user
51  if (!m_timeout_manager)
52  m_timeout_manager = std::make_shared<mrs_lib::TimeoutManager>(m_nh, ros::Rate(options.no_message_timeout * 0.5));
53 
54  // register the timeout callback with the TimeoutManager
55  m_timeout_id = m_timeout_manager->registerNew(options.no_message_timeout, m_timeout_mgr_callback);
56  }
57 
58  const std::string msg = "Subscribed to topic '" + m_topic_name + "' -> '" + topicName() + "'";
59  if (m_node_name.empty())
60  ROS_INFO_STREAM(msg);
61  else
62  ROS_INFO_STREAM("[" << m_node_name << "]: " << msg);
63  }
64  //}
65 
66  virtual ~Impl() = default;
67 
68  public:
69  /* getMsg() method //{ */
70  virtual typename MessageType::ConstPtr getMsg()
71  {
72  m_new_data = false;
73  m_used_data = true;
74  return peekMsg();
75  }
76  //}
77 
78  /* peekMsg() method //{ */
79  virtual typename MessageType::ConstPtr peekMsg() const
80  {
81  /* assert(m_got_data); */
82  /* if (!m_got_data) */
83  /* ROS_ERROR("[%s]: No data received yet from topic '%s' (forgot to check hasMsg()?)! Returning empty message.", m_node_name.c_str(), */
84  /* topicName().c_str()); */
85  return m_latest_message;
86  }
87  //}
88 
89  /* hasMsg() method //{ */
90  virtual bool hasMsg() const
91  {
92  return m_got_data;
93  }
94  //}
95 
96  /* newMsg() method //{ */
97  virtual bool newMsg() const
98  {
99  return m_new_data;
100  }
101  //}
102 
103  /* usedMsg() method //{ */
104  virtual bool usedMsg() const
105  {
106  return m_used_data;
107  }
108  //}
109 
110  /* waitForNew() method //{ */
111  virtual typename MessageType::ConstPtr waitForNew(const ros::WallDuration& timeout)
112  {
113  // convert the ros type to chrono type
114  const std::chrono::duration<float> chrono_timeout(timeout.toSec());
115  // lock the mutex guarding the m_new_data flag
116  std::unique_lock lock(m_new_data_mtx);
117  // if new data is available, return immediately, otherwise attempt to wait for new data using the respective condition variable
118  if (m_new_data)
119  return getMsg();
120  else if (m_new_data_cv.wait_for(lock, chrono_timeout) == std::cv_status::no_timeout && m_new_data)
121  return getMsg();
122  else
123  return nullptr;
124  };
125  //}
126 
127  /* lastMsgTime() method //{ */
128  virtual ros::Time lastMsgTime() const
129  {
130  return m_latest_message_time;
131  };
132  //}
133 
134  /* topicName() method //{ */
135  virtual std::string topicName() const
136  {
137  std::string ret = m_sub.getTopic();
138  if (ret.empty())
139  ret = m_nh.resolveName(m_topic_name);
140  return ret;
141  }
142  //}
143 
144  /* getNumPublishers() method //{ */
145  virtual uint32_t getNumPublishers() const
146  {
147  return m_sub.getNumPublishers();
148  };
149  //}
150 
151  /* setNoMessageTimeout() method //{ */
152  virtual void setNoMessageTimeout(const ros::Duration& timeout)
153  {
154  if (timeout == mrs_lib::no_timeout)
155  {
156  // if there is a timeout callback already registered but the user wants to disable it, pause it
157  if (m_timeout_manager != nullptr && m_timeout_id.has_value())
158  m_timeout_manager->pause(m_timeout_id.value());
159  // otherwise, there is no callback, so nothing to do
160  }
161  else
162  {
163  // if there is no callback manager, create it
164  if (m_timeout_manager == nullptr)
165  m_timeout_manager = std::make_shared<mrs_lib::TimeoutManager>(m_nh, ros::Rate(timeout * 0.5));
166 
167  // if there is an existing timeout callback registered, change its timeout
168  if (m_timeout_id.has_value())
169  m_timeout_manager->change(m_timeout_id.value(), timeout, m_timeout_mgr_callback);
170  // otherwise, register it
171  else
172  m_timeout_id = m_timeout_manager->registerNew(timeout, m_timeout_mgr_callback);
173  }
174  }
175  //}
176 
177  /* start() method //{ */
178  virtual void start()
179  {
180  if (m_timeout_manager && m_timeout_id.has_value())
181  m_timeout_manager->start(m_timeout_id.value());
182  m_sub = m_nh.subscribe(m_topic_name, m_queue_size, &Impl::data_callback, this, m_transport_hints);
183  }
184  //}
185 
186  /* stop() method //{ */
187  virtual void stop()
188  {
189  if (m_timeout_manager && m_timeout_id.has_value())
190  m_timeout_manager->pause(m_timeout_id.value());
191  m_sub.shutdown();
192  }
193  //}
194 
195  protected:
196  ros::NodeHandle m_nh;
197  ros::Subscriber m_sub;
198 
199  protected:
200  std::string m_topic_name;
201  std::string m_node_name;
202 
203  protected:
204  bool m_got_data; // whether any data was received
205 
206  mutable std::mutex m_new_data_mtx;
207  mutable std::condition_variable m_new_data_cv;
208  bool m_new_data; // whether new data was received since last call to get_data
209 
210  bool m_used_data; // whether get_data was successfully called at least once
211 
212  protected:
213  std::shared_ptr<mrs_lib::TimeoutManager> m_timeout_manager;
214  std::optional<mrs_lib::TimeoutManager::timeout_id_t> m_timeout_id;
215  mrs_lib::TimeoutManager::callback_t m_timeout_mgr_callback;
216 
217  protected:
218  ros::Time m_latest_message_time;
219  typename MessageType::ConstPtr m_latest_message;
220  message_callback_t m_message_callback;
221 
222  private:
223  uint32_t m_queue_size;
224  ros::TransportHints m_transport_hints;
225 
226  protected:
227  /* default_timeout_callback() method //{ */
228  void default_timeout_callback(const std::string& topic_name, const ros::Time& last_msg)
229  {
230  const ros::Duration since_msg = (ros::Time::now() - last_msg);
231  const auto n_pubs = m_sub.getNumPublishers();
232  const std::string txt = "Did not receive any message from topic '" + topic_name + "' for " + std::to_string(since_msg.toSec()) + "s ("
233  + std::to_string(n_pubs) + " publishers on this topic)";
234  if (m_node_name.empty())
235  ROS_WARN_STREAM(txt);
236  else
237  ROS_WARN_STREAM("[" << m_node_name << "]: " << txt);
238  }
239  //}
240 
241  /* process_new_message() method //{ */
242  void process_new_message(const typename MessageType::ConstPtr& msg)
243  {
244  m_latest_message_time = ros::Time::now();
245  m_latest_message = msg;
246  // If the message callback is registered, the new data will immediately be processed,
247  // so reset the flag. Otherwise, set the flag.
248  m_new_data = !m_message_callback;
249  m_got_data = true;
250  m_new_data_cv.notify_one();
251  }
252  //}
253 
254  /* data_callback() method //{ */
255  virtual void data_callback(const typename MessageType::ConstPtr& msg)
256  {
257  {
258  std::lock_guard lck(m_new_data_mtx);
259  if (m_timeout_manager && m_timeout_id.has_value())
260  m_timeout_manager->reset(m_timeout_id.value());
261  process_new_message(msg);
262  }
263 
264  // execute the callback after unlocking the mutex to enable multi-threaded callback execution
265  if (m_message_callback)
266  m_message_callback(msg);
267  }
268  //}
269  };
270  //}
271 
272  /* SubscribeHandler_threadsafe class //{ */
273  template <typename MessageType>
274  class SubscribeHandler<MessageType>::ImplThreadsafe : public SubscribeHandler<MessageType>::Impl
275  {
276  private:
278 
279  public:
280  using timeout_callback_t = typename impl_class_t::timeout_callback_t;
281  using message_callback_t = typename impl_class_t::message_callback_t;
282 
283  friend class SubscribeHandler<MessageType>;
284 
285  public:
286  ImplThreadsafe(const SubscribeHandlerOptions& options, const message_callback_t& message_callback = message_callback_t())
287  : impl_class_t::Impl(options, message_callback)
288  {
289  }
290 
291  public:
292  virtual bool hasMsg() const override
293  {
294  std::lock_guard lck(m_mtx);
295  return impl_class_t::hasMsg();
296  }
297  virtual bool newMsg() const override
298  {
299  std::lock_guard lck(m_mtx);
300  return impl_class_t::newMsg();
301  }
302  virtual bool usedMsg() const override
303  {
304  std::lock_guard lck(m_mtx);
305  return impl_class_t::usedMsg();
306  }
307  virtual typename MessageType::ConstPtr getMsg() override
308  {
309  std::lock_guard lck(m_mtx);
310  return impl_class_t::getMsg();
311  }
312  virtual typename MessageType::ConstPtr peekMsg() const override
313  {
314  std::lock_guard lck(m_mtx);
315  return impl_class_t::peekMsg();
316  }
317  virtual ros::Time lastMsgTime() const override
318  {
319  std::lock_guard lck(m_mtx);
320  return impl_class_t::lastMsgTime();
321  };
322  virtual std::string topicName() const override
323  {
324  std::lock_guard lck(m_mtx);
325  return impl_class_t::topicName();
326  };
327  virtual void start() override
328  {
329  std::lock_guard lck(m_mtx);
330  return impl_class_t::start();
331  }
332  virtual void stop() override
333  {
334  std::lock_guard lck(m_mtx);
335  return impl_class_t::stop();
336  }
337 
338  virtual ~ImplThreadsafe() override = default;
339 
340  protected:
341  virtual void data_callback(const typename MessageType::ConstPtr& msg) override
342  {
343  {
344  std::scoped_lock lck(m_mtx, this->m_new_data_mtx);
345  if (this->m_timeout_manager && this->m_timeout_id.has_value())
346  this->m_timeout_manager->reset(this->m_timeout_id.value());
347  impl_class_t::process_new_message(msg);
348  }
349 
350  // execute the callback after unlocking the mutex to enable multi-threaded callback execution
351  if (this->m_message_callback)
352  impl_class_t::m_message_callback(msg);
353  }
354 
355  private:
356  mutable std::recursive_mutex m_mtx;
357  };
358  //}
359 
360 } // namespace mrs_lib
361 
362 #endif // SUBSCRIBE_HANDLER_HPP
mrs_lib::SubscribeHandlerOptions::queue_size
uint32_t queue_size
This parameter is passed to the NodeHandle when subscribing to the topic.
Definition: subscribe_handler.h:52
mrs_lib::SubscribeHandler::hasMsg
virtual bool hasMsg() const
Used to check whether at least one message has been received on the handled topic.
Definition: subscribe_handler.h:122
mrs_lib::SubscribeHandler::setNoMessageTimeout
virtual void setNoMessageTimeout(const ros::Duration &timeout)
Sets the timeout for no received message.
Definition: subscribe_handler.h:179
mrs_lib::SubscribeHandlerOptions::timeout_manager
std::shared_ptr< mrs_lib::TimeoutManager > timeout_manager
Will be used for handling message timouts if necessary. If no manager is specified,...
Definition: subscribe_handler.h:42
mrs_lib::SubscribeHandler::timeout_callback_t
std::function< void(const std::string &topic_name, const ros::Time &last_msg)> timeout_callback_t
Type for the timeout callback function.
Definition: subscribe_handler.h:93
mrs_lib::SubscribeHandler::usedMsg
virtual bool usedMsg() const
Used to check whether getMsg() was called at least once on this SubscribeHandler.
Definition: subscribe_handler.h:136
mrs_lib::SubscribeHandlerOptions::transport_hints
ros::TransportHints transport_hints
This parameter is passed to the NodeHandle when subscribing to the topic.
Definition: subscribe_handler.h:54
mrs_lib::SubscribeHandlerOptions
A helper class to simplify setup of SubscribeHandler construction. This class is passed to the Subscr...
Definition: subscribe_handler.h:31
mrs_lib::SubscribeHandler::stop
virtual void stop()
Disables the callbacks for the handled topic.
Definition: subscribe_handler.h:195
subscribe_handler.h
Defines SubscribeHandler and related convenience classes for subscribing to ROS topics.
mrs_lib::SubscribeHandler::getMsg
virtual MessageType::ConstPtr getMsg()
Returns the last received message on the topic, handled by this SubscribeHandler. Use hasMsg() first ...
Definition: subscribe_handler.h:108
mrs_lib::SubscribeHandler::start
virtual void start()
Enables the callbacks for the handled topic.
Definition: subscribe_handler.h:187
mrs_lib::SubscribeHandler
The main class for ROS topic subscription, message timeout handling etc.
Definition: subscribe_handler.h:82
mrs_lib::SubscribeHandlerOptions::no_message_timeout
ros::Duration no_message_timeout
If no new message is received for this duration, the timeout_callback function will be called....
Definition: subscribe_handler.h:44
mrs_lib::SubscribeHandler::waitForNew
virtual MessageType::ConstPtr waitForNew(const ros::WallDuration &timeout)
Blocks until new data becomes available or until the timeout runs out or until a spurious wake-up.
Definition: subscribe_handler.h:144
mrs_lib::SubscribeHandler::Impl
Definition: subscribe_handler.hpp:16
mrs_lib::SubscribeHandlerOptions::node_name
std::string node_name
Name of the ROS node, using this handle (used for messages printed to console).
Definition: subscribe_handler.h:38
mrs_lib
All mrs_lib functions, classes, variables and definitions are contained in this namespace.
Definition: attitude_converter.h:29
mrs_lib::SubscribeHandler::peekMsg
virtual MessageType::ConstPtr peekMsg() const
Returns the last received message on the topic without modifying the newMsg() or usedMsg() flags.
Definition: subscribe_handler.h:115
mrs_lib::SubscribeHandlerOptions::timeout_callback
std::function< void(const std::string &topic_name, const ros::Time &last_msg)> timeout_callback
This function will be called if no new message is received for the no_message_timeout duration....
Definition: subscribe_handler.h:46
mrs_lib::SubscribeHandler::topicName
virtual std::string topicName() const
Returns the resolved (full) name of the topic, handled by this SubscribeHandler.
Definition: subscribe_handler.h:158
mrs_lib::SubscribeHandlerOptions::topic_name
std::string topic_name
Name of the ROS topic to be handled.
Definition: subscribe_handler.h:40
mrs_lib::SubscribeHandler::message_callback_t
std::function< void(typename MessageType::ConstPtr)> message_callback_t
Convenience type for the message callback function.
Definition: subscribe_handler.h:98
mrs_lib::SubscribeHandler::newMsg
virtual bool newMsg() const
Used to check whether at least one message has been received on the handled topic since the last call...
Definition: subscribe_handler.h:129
mrs_lib::SubscribeHandlerOptions::nh
ros::NodeHandle nh
The ROS NodeHandle to be used for subscription.
Definition: subscribe_handler.h:36
mrs_lib::SubscribeHandler::getNumPublishers
virtual uint32_t getNumPublishers() const
Returns number of publishers registered at the topic.
Definition: subscribe_handler.h:172
mrs_lib::SubscribeHandler::lastMsgTime
virtual ros::Time lastMsgTime() const
Returns time of the last received message on the topic, handled by this SubscribeHandler.
Definition: subscribe_handler.h:151
mrs_lib::SubscribeHandler::ImplThreadsafe
Definition: subscribe_handler.hpp:274