mrs_lib
Various reusable classes, functions and utilities for use in MRS projects
Loading...
Searching...
No Matches
subscriber_handler.hpp
1// clang: MatousFormat
2
3#ifndef SUBSCRIBER_HANDLER_HPP
4#define SUBSCRIBER_HANDLER_HPP
5
6#ifndef SUBSCRIBER_HANDLER_H
8#endif
9
11#include <mrs_lib/timer_handler.h>
12#include <mutex>
13#include <condition_variable>
14
15namespace mrs_lib
16{
17 /* SubscriberHandler::Impl class //{ */
18 // implements the constructor, getMsg() method and data_callback method (non-thread-safe)
19 template <typename MessageType>
20 class SubscriberHandler<MessageType>::Impl
21 {
22 public:
23 using timeout_callback_t = typename SubscriberHandler<MessageType>::timeout_callback_t;
24 using message_callback_t = typename SubscriberHandler<MessageType>::message_callback_t;
25 using data_callback_t = std::function<void(const typename MessageType::ConstSharedPtr&)>;
26
27 private:
28 friend class SubscriberHandler<MessageType>;
29
30 public:
31 /* constructor //{ */
32 Impl(const SubscriberHandlerOptions& options, const message_callback_t& message_callback = message_callback_t())
33 : m_node(options.node),
34 m_qos(options.qos),
35 m_sub_opts(options.subscription_options),
36 m_topic_name(options.topic_name),
37 m_node_name(options.node_name),
38 m_got_data(false),
39 m_new_data(false),
40 m_used_data(false),
41 m_timeout_manager(options.timeout_manager),
42 m_latest_message_time(0),
43 m_latest_message(nullptr),
44 m_message_callback(message_callback)
45 {
46
47 // initialize the callback for the TimeoutManager
48 if (options.timeout_callback)
49 {
50 m_timeout_mgr_callback = std::bind(options.timeout_callback, m_topic_name, std::placeholders::_1);
51 } else
52 {
53 m_timeout_mgr_callback = std::bind(&Impl::default_timeout_callback, this, m_topic_name, std::placeholders::_1);
54 }
55
56 if (options.no_message_timeout != mrs_lib::no_timeout)
57 {
58
59 // initialize a new TimeoutManager if not provided by the user
60 if (!m_timeout_manager)
61 {
62 m_timeout_manager =
63 std::make_shared<mrs_lib::TimeoutManager>(m_node, rclcpp::Rate(1.0 / (options.no_message_timeout.seconds() * 0.5), m_node->get_clock()));
64 }
65
66 // register the timeout callback with the TimeoutManager
67 m_timeout_id = m_timeout_manager->registerNew(options.no_message_timeout, m_timeout_mgr_callback, m_node->get_clock()->now());
68 }
69
70 const std::string msg = "Subscribed to topic '" + m_topic_name + "' -> '" + m_topic_name + "'";
71
72 RCLCPP_INFO_STREAM(m_node->get_logger(), msg);
73 }
74 //}
75
76 virtual ~Impl() = default;
77
78 public:
79 /* getMsg() method //{ */
80 virtual typename MessageType::ConstSharedPtr getMsg()
81 {
82 m_new_data = false;
83 m_used_data = true;
84 return peekMsg();
85 }
86 //}
87
88 /* peekMsg() method //{ */
89 virtual typename MessageType::ConstSharedPtr peekMsg() const
90 {
91 /* assert(m_got_data); */
92 /* if (!m_got_data) */
93 /* ROS_ERROR("[%s]: No data received yet from topic '%s' (forgot to check hasMsg()?)! Returning empty message.", m_node_name.c_str(), */
94 /* topicName().c_str()); */
95 return m_latest_message;
96 }
97 //}
98
99 /* hasMsg() method //{ */
100 virtual bool hasMsg() const
101 {
102 return m_got_data;
103 }
104 //}
105
106 /* newMsg() method //{ */
107 virtual bool newMsg() const
108 {
109 return m_new_data;
110 }
111 //}
112
113 /* usedMsg() method //{ */
114 virtual bool usedMsg() const
115 {
116 return m_used_data;
117 }
118 //}
119
120 /* waitForNew() method //{ */
121 virtual typename MessageType::ConstSharedPtr waitForNew(const rclcpp::Duration& timeout)
122 {
123 // convert the ros type to chrono type
124 const std::chrono::duration<float> chrono_timeout(timeout.seconds());
125 // lock the mutex guarding the m_new_data flag
126 std::unique_lock lock(m_new_data_mtx);
127 // if new data is available, return immediately, otherwise attempt to wait for new data using the respective condition variable
128 if (m_new_data)
129 return getMsg();
130 else if (m_new_data_cv.wait_for(lock, chrono_timeout) == std::cv_status::no_timeout && m_new_data)
131 return getMsg();
132 else
133 return nullptr;
134 };
135 //}
136
137 /* lastMsgTime() method //{ */
138 virtual rclcpp::Time lastMsgTime() const
139 {
140 return m_latest_message_time;
141 };
142 //}
143
144 /* topicName() method //{ */
145 virtual std::string topicName() const
146 {
147 if (m_sub == nullptr)
148 {
149 return "";
150 }
151
152 std::string ret = m_sub->get_topic_name();
153
154 if (ret.empty())
155 {
156 ret = m_node->get_node_topics_interface()->resolve_topic_name(ret);
157 }
158
159 return ret;
160 }
161 //}
162
163 /* getNumPublishers() method //{ */
164 virtual uint32_t getNumPublishers() const
165 {
166 return m_sub->get_publisher_count();
167 };
168 //}
169
170 /* setNoMessageTimeout() method //{ */
171 virtual void setNoMessageTimeout(const rclcpp::Duration& timeout)
172 {
173 if (timeout == mrs_lib::no_timeout)
174 {
175 // if there is a timeout callback already registered but the user wants to disable it, pause it
176 if (m_timeout_manager != nullptr && m_timeout_id.has_value())
177 m_timeout_manager->pause(m_timeout_id.value());
178 // otherwise, there is no callback, so nothing to do
179 } else
180 {
181 // if there is no callback manager, create it
182 if (m_timeout_manager == nullptr)
183 m_timeout_manager = std::make_shared<mrs_lib::TimeoutManager>(m_node, rclcpp::Rate(1.0 / (timeout.seconds() * 0.5), m_node->get_clock()));
184
185 // if there is an existing timeout callback registered, change its timeout
186 if (m_timeout_id.has_value())
187 m_timeout_manager->change(m_timeout_id.value(), timeout, m_timeout_mgr_callback, m_node->get_clock()->now());
188
189 // otherwise, register it
190 else
191 m_timeout_id = m_timeout_manager->registerNew(timeout, m_timeout_mgr_callback, m_node->get_clock()->now());
192 }
193 }
194 //}
195
196 /* start() method //{ */
197 virtual void start()
198 {
199 if (m_timeout_manager && m_timeout_id.has_value())
200 m_timeout_manager->start(m_timeout_id.value(), m_node->get_clock()->now());
201
202 const std::function<void(const typename MessageType::SharedPtr)> cbk = std::bind(&Impl::data_callback, this, std::placeholders::_1);
203
204 m_sub = m_node->create_subscription<MessageType>(m_topic_name, m_qos, cbk, m_sub_opts);
205 }
206 //}
207
208 /* stop() method //{ */
209 virtual void stop()
210 {
211 if (m_timeout_manager && m_timeout_id.has_value())
212 {
213 m_timeout_manager->pause(m_timeout_id.value());
214 }
215
216 m_sub.reset();
217 }
218 //}
219
220 protected:
221 rclcpp::Node::SharedPtr m_node;
222 rclcpp::Subscription<MessageType>::SharedPtr m_sub;
223 rclcpp::QoS m_qos = rclcpp::SystemDefaultsQoS();
224 rclcpp::SubscriptionOptions m_sub_opts;
225
226 protected:
227 std::string m_topic_name;
228 std::string m_node_name;
229
230 protected:
231 bool m_got_data; // whether any data was received
232
233 mutable std::mutex m_new_data_mtx;
234 mutable std::condition_variable m_new_data_cv;
235 bool m_new_data; // whether new data was received since last call to get_data
236
237 bool m_used_data; // whether get_data was successfully called at least once
238
239 protected:
240 std::shared_ptr<mrs_lib::TimeoutManager> m_timeout_manager;
241 std::optional<mrs_lib::TimeoutManager::timeout_id_t> m_timeout_id;
242 mrs_lib::TimeoutManager::callback_t m_timeout_mgr_callback;
243
244 protected:
245 rclcpp::Time m_latest_message_time;
246 typename MessageType::ConstSharedPtr m_latest_message;
247 message_callback_t m_message_callback;
248
249 protected:
250 /* default_timeout_callback() method //{ */
251 void default_timeout_callback(const std::string& topic_name, const rclcpp::Time& last_msg)
252 {
253 const rclcpp::Duration since_msg = (m_node->get_clock()->now() - last_msg);
254 const auto n_pubs = m_sub->get_publisher_count();
255 const std::string txt = "Did not receive any message from topic '" + topic_name + "' for " + std::to_string(since_msg.seconds()) + "s ("
256 + std::to_string(n_pubs) + " publishers on this topic)";
257
258 RCLCPP_WARN_STREAM(m_node->get_logger(), txt);
259 }
260 //}
261
262 /* process_new_message() method //{ */
263 void process_new_message(const typename MessageType::ConstSharedPtr& msg)
264 {
265 m_latest_message_time = m_node->get_clock()->now();
266 m_latest_message = msg;
267 // If the message callback is registered, the new data will immediately be processed,
268 // so reset the flag. Otherwise, set the flag.
269 m_new_data = !m_message_callback;
270 m_got_data = true;
271 m_new_data_cv.notify_one();
272 }
273 //}
274
275 /* data_callback() method //{ */
276 virtual void data_callback(const typename MessageType::ConstSharedPtr& msg)
277 {
278 {
279 std::lock_guard lck(m_new_data_mtx);
280 if (m_timeout_manager && m_timeout_id.has_value())
281 m_timeout_manager->reset(m_timeout_id.value(), m_node->get_clock()->now());
282 process_new_message(msg);
283 }
284
285 // execute the callback after unlocking the mutex to enable multi-threaded callback execution
286 if (m_message_callback)
287 m_message_callback(msg);
288 }
289 //}
290 };
291 //}
292
293 /* SubscribeHandler_threadsafe class //{ */
294 template <typename MessageType>
295 class SubscriberHandler<MessageType>::ImplThreadsafe : public SubscriberHandler<MessageType>::Impl
296 {
297 private:
299
300 public:
301 using timeout_callback_t = typename impl_class_t::timeout_callback_t;
302 using message_callback_t = typename impl_class_t::message_callback_t;
303
304 friend class SubscriberHandler<MessageType>;
305
306 public:
307 ImplThreadsafe(const SubscriberHandlerOptions& options, const message_callback_t& message_callback = message_callback_t())
308 : impl_class_t::Impl(options, message_callback)
309 {
310 }
311
312 public:
313 virtual bool hasMsg() const override
314 {
315 std::lock_guard lck(m_mtx);
316 return impl_class_t::hasMsg();
317 }
318 virtual bool newMsg() const override
319 {
320 std::lock_guard lck(m_mtx);
321 return impl_class_t::newMsg();
322 }
323 virtual bool usedMsg() const override
324 {
325 std::lock_guard lck(m_mtx);
326 return impl_class_t::usedMsg();
327 }
328 virtual typename MessageType::ConstSharedPtr getMsg() override
329 {
330 std::lock_guard lck(m_mtx);
331 return impl_class_t::getMsg();
332 }
333 virtual typename MessageType::ConstSharedPtr peekMsg() const override
334 {
335 std::lock_guard lck(m_mtx);
336 return impl_class_t::peekMsg();
337 }
338 virtual rclcpp::Time lastMsgTime() const override
339 {
340 std::lock_guard lck(m_mtx);
341 return impl_class_t::lastMsgTime();
342 };
343 virtual std::string topicName() const override
344 {
345 std::lock_guard lck(m_mtx);
346 return impl_class_t::topicName();
347 };
348 virtual void start() override
349 {
350 std::lock_guard lck(m_mtx);
351 return impl_class_t::start();
352 }
353 virtual void stop() override
354 {
355 std::lock_guard lck(m_mtx);
356 return impl_class_t::stop();
357 }
358
359 virtual ~ImplThreadsafe() override = default;
360
361 protected:
362 virtual void data_callback(const typename MessageType::ConstSharedPtr& msg) override
363 {
364 {
365 std::scoped_lock lck(m_mtx, this->m_new_data_mtx);
366
367 if (this->m_timeout_manager && this->m_timeout_id.has_value())
368 {
369 this->m_timeout_manager->reset(this->m_timeout_id.value(), this->m_node->get_clock()->now());
370 }
371
372 impl_class_t::process_new_message(msg);
373 }
374
375 // execute the callback after unlocking the mutex to enable multi-threaded callback execution
376 if (this->m_message_callback)
377 impl_class_t::m_message_callback(msg);
378 }
379
380 private:
381 mutable std::recursive_mutex m_mtx;
382 };
383 //}
384
385} // namespace mrs_lib
386
387#endif // SUBSCRIBER_HANDLER_HPP
Definition subscriber_handler.hpp:296
Definition subscriber_handler.hpp:21
The main class for ROS topic subscription, message timeout handling etc.
Definition subscriber_handler.h:92
virtual bool newMsg() const
Used to check whether at least one message has been received on the handled topic since the last call...
Definition subscriber_handler.h:150
virtual void stop()
Disables the callbacks for the handled topic.
Definition subscriber_handler.h:252
virtual MessageType::ConstSharedPtr getMsg()
Returns the last received message on the topic, handled by this SubscriberHandler....
Definition subscriber_handler.h:117
virtual uint32_t getNumPublishers() const
Returns number of publishers registered at the topic.
Definition subscriber_handler.h:217
virtual void setNoMessageTimeout(const rclcpp::Duration &timeout)
Sets the timeout for no received message.
Definition subscriber_handler.h:228
virtual std::string topicName() const
Returns the resolved (full) name of the topic, handled by this SubscriberHandler.
Definition subscriber_handler.h:195
virtual bool usedMsg() const
Used to check whether getMsg() was called at least once on this SubscriberHandler.
Definition subscriber_handler.h:161
virtual MessageType::ConstSharedPtr waitForNew(const rclcpp::Duration &timeout)
Blocks until new data becomes available or until the timeout runs out or until a spurious wake-up.
Definition subscriber_handler.h:173
virtual bool hasMsg() const
Used to check whether at least one message has been received on the handled topic.
Definition subscriber_handler.h:139
virtual void start()
Enables the callbacks for the handled topic.
Definition subscriber_handler.h:240
virtual MessageType::ConstSharedPtr peekMsg() const
Returns the last received message on the topic without modifying the newMsg() or usedMsg() flags.
Definition subscriber_handler.h:128
std::function< void(typename MessageType::ConstSharedPtr)> message_callback_t
Convenience type for the message callback function.
Definition subscriber_handler.h:107
std::function< void(const std::string &topic_name, const rclcpp::Time &last_msg)> timeout_callback_t
Type for the timeout callback function.
Definition subscriber_handler.h:102
virtual rclcpp::Time lastMsgTime() const
Returns time of the last received message on the topic, handled by this SubscriberHandler.
Definition subscriber_handler.h:184
All mrs_lib functions, classes, variables and definitions are contained in this namespace.
Definition attitude_converter.h:24
A helper class to simplify setup of SubscriberHandler construction. This class is passed to the Subsc...
Definition subscriber_handler.h:33
rclcpp::Node::SharedPtr node
The ROS NodeHandle to be used for subscription.
Definition subscriber_handler.h:39
std::string topic_name
Name of the ROS topic to be handled.
Definition subscriber_handler.h:43
std::shared_ptr< mrs_lib::TimeoutManager > timeout_manager
Will be used for handling message timouts if necessary. If no manager is specified,...
Definition subscriber_handler.h:45
std::function< void(const std::string &topic_name, const rclcpp::Time &last_msg)> timeout_callback
This function will be called if no new message is received for the no_message_timeout duration....
Definition subscriber_handler.h:52
std::string node_name
Name of the ROS node, using this handle (used for messages printed to console).
Definition subscriber_handler.h:41
rclcpp::Duration no_message_timeout
If no new message is received for this duration, the timeout_callback function will be called....
Definition subscriber_handler.h:48
Defines SubscriberHandler and related convenience classes for subscribing to ROS topics.