mrs_lib
Various reusable classes, functions and utilities for use in MRS projects
Loading...
Searching...
No Matches
subscriber_handler.hpp
1// clang: MatousFormat
2
3#ifndef SUBSCRIBER_HANDLER_HPP
4#define SUBSCRIBER_HANDLER_HPP
5
6#ifndef SUBSCRIBER_HANDLER_H
8#endif
9
11#include <mrs_lib/timer_handler.h>
12#include <mutex>
13#include <condition_variable>
14
15namespace mrs_lib
16{
17 /* SubscriberHandler::Impl class //{ */
18 // implements the constructor, getMsg() method and data_callback method (non-thread-safe)
19 template <typename MessageType>
20 class SubscriberHandler<MessageType>::Impl
21 {
22 public:
23 using timeout_callback_t = typename SubscriberHandler<MessageType>::timeout_callback_t;
24 using message_callback_t = typename SubscriberHandler<MessageType>::message_callback_t;
25 using data_callback_t = std::function<void(const typename MessageType::ConstSharedPtr&)>;
26
27 private:
28 friend class SubscriberHandler<MessageType>;
29
30 public:
31 /* constructor //{ */
32 Impl(const SubscriberHandlerOptions& options, const message_callback_t& message_callback = message_callback_t())
33 : m_node(options.node), m_qos(options.qos), m_sub_opts(options.subscription_options), m_topic_name(options.topic_name), m_node_name(options.node_name),
34 m_got_data(false), m_new_data(false), m_used_data(false), m_timeout_manager(options.timeout_manager), m_latest_message_time(0),
35 m_latest_message(nullptr), m_message_callback(message_callback)
36 {
37
38 // initialize the callback for the TimeoutManager
39 if (options.timeout_callback)
40 {
41 m_timeout_mgr_callback = std::bind(options.timeout_callback, m_topic_name, std::placeholders::_1);
42 } else
43 {
44 m_timeout_mgr_callback = std::bind(&Impl::default_timeout_callback, this, m_topic_name, std::placeholders::_1);
45 }
46
47 if (options.no_message_timeout != mrs_lib::no_timeout)
48 {
49
50 // initialize a new TimeoutManager if not provided by the user
51 if (!m_timeout_manager)
52 {
53 m_timeout_manager =
54 std::make_shared<mrs_lib::TimeoutManager>(m_node, rclcpp::Rate(1.0 / (options.no_message_timeout.seconds() * 0.5), m_node->get_clock()));
55 }
56
57 // register the timeout callback with the TimeoutManager
58 m_timeout_id = m_timeout_manager->registerNew(options.no_message_timeout, m_timeout_mgr_callback, m_node->get_clock()->now());
59 }
60 }
61 //}
62
63 virtual ~Impl() = default;
64
65 public:
66 /* getMsg() method //{ */
67 virtual typename MessageType::ConstSharedPtr getMsg()
68 {
69 m_new_data = false;
70 m_used_data = true;
71 return peekMsg();
72 }
73 //}
74
75 /* peekMsg() method //{ */
76 virtual typename MessageType::ConstSharedPtr peekMsg() const
77 {
78 /* assert(m_got_data); */
79 /* if (!m_got_data) */
80 /* ROS_ERROR("[%s]: No data received yet from topic '%s' (forgot to check hasMsg()?)! Returning empty message.", m_node_name.c_str(), */
81 /* topicName().c_str()); */
82 return m_latest_message;
83 }
84 //}
85
86 /* hasMsg() method //{ */
87 virtual bool hasMsg() const
88 {
89 return m_got_data;
90 }
91 //}
92
93 /* newMsg() method //{ */
94 virtual bool newMsg() const
95 {
96 return m_new_data;
97 }
98 //}
99
100 /* usedMsg() method //{ */
101 virtual bool usedMsg() const
102 {
103 return m_used_data;
104 }
105 //}
106
107 /* waitForNew() method //{ */
108 virtual typename MessageType::ConstSharedPtr waitForNew(const rclcpp::Duration& timeout)
109 {
110 // convert the ros type to chrono type
111 const std::chrono::duration<float> chrono_timeout(timeout.seconds());
112 // lock the mutex guarding the m_new_data flag
113 std::unique_lock lock(m_new_data_mtx);
114 // if new data is available, return immediately, otherwise attempt to wait for new data using the respective condition variable
115 if (m_new_data)
116 return getMsg();
117 else if (m_new_data_cv.wait_for(lock, chrono_timeout) == std::cv_status::no_timeout && m_new_data)
118 return getMsg();
119 else
120 return nullptr;
121 };
122 //}
123
124 /* lastMsgTime() method //{ */
125 virtual rclcpp::Time lastMsgTime() const
126 {
127 return m_latest_message_time;
128 };
129 //}
130
131 /* topicName() method //{ */
132 virtual std::string topicName() const
133 {
134 if (m_sub == nullptr)
135 {
136 return "";
137 }
138
139 std::string ret = m_sub->get_topic_name();
140
141 if (ret.empty())
142 {
143 ret = m_node->get_node_topics_interface()->resolve_topic_name(m_topic_name);
144 }
145
146 return ret;
147 }
148 //}
149
150 /* getNumPublishers() method //{ */
151 virtual uint32_t getNumPublishers() const
152 {
153 return m_sub->get_publisher_count();
154 };
155 //}
156
157 /* setNoMessageTimeout() method //{ */
158 virtual void setNoMessageTimeout(const rclcpp::Duration& timeout)
159 {
160 if (timeout == mrs_lib::no_timeout)
161 {
162 // if there is a timeout callback already registered but the user wants to disable it, pause it
163 if (m_timeout_manager != nullptr && m_timeout_id.has_value())
164 m_timeout_manager->pause(m_timeout_id.value());
165 // otherwise, there is no callback, so nothing to do
166 } else
167 {
168 // if there is no callback manager, create it
169 if (m_timeout_manager == nullptr)
170 m_timeout_manager = std::make_shared<mrs_lib::TimeoutManager>(m_node, rclcpp::Rate(1.0 / (timeout.seconds() * 0.5), m_node->get_clock()));
171
172 // if there is an existing timeout callback registered, change its timeout
173 if (m_timeout_id.has_value())
174 m_timeout_manager->change(m_timeout_id.value(), timeout, m_timeout_mgr_callback, m_node->get_clock()->now());
175
176 // otherwise, register it
177 else
178 m_timeout_id = m_timeout_manager->registerNew(timeout, m_timeout_mgr_callback, m_node->get_clock()->now());
179 }
180 }
181 //}
182
183 /* start() method //{ */
184 virtual void start()
185 {
186 if (m_timeout_manager && m_timeout_id.has_value())
187 m_timeout_manager->start(m_timeout_id.value(), m_node->get_clock()->now());
188
189 const std::function<void(const typename MessageType::SharedPtr)> cbk = std::bind(&Impl::data_callback, this, std::placeholders::_1);
190
191 m_sub = m_node->create_subscription<MessageType>(m_topic_name, m_qos, cbk, m_sub_opts);
192 RCLCPP_INFO_STREAM(m_node->get_logger(), "Subscribed to topic '" << m_topic_name << "' -> '" << m_sub->get_topic_name() << "'");
193 }
194 //}
195
196 /* stop() method //{ */
197 virtual void stop()
198 {
199 if (m_timeout_manager && m_timeout_id.has_value())
200 {
201 m_timeout_manager->pause(m_timeout_id.value());
202 }
203
204 m_sub.reset();
205 }
206 //}
207
208 protected:
209 rclcpp::Node::SharedPtr m_node;
210 rclcpp::Subscription<MessageType>::SharedPtr m_sub;
211 rclcpp::QoS m_qos = rclcpp::SystemDefaultsQoS();
212 rclcpp::SubscriptionOptions m_sub_opts;
213
214 protected:
215 std::string m_topic_name;
216 std::string m_node_name;
217
218 protected:
219 bool m_got_data; // whether any data was received
220
221 mutable std::mutex m_new_data_mtx;
222 mutable std::condition_variable m_new_data_cv;
223 bool m_new_data; // whether new data was received since last call to get_data
224
225 bool m_used_data; // whether get_data was successfully called at least once
226
227 protected:
228 std::shared_ptr<mrs_lib::TimeoutManager> m_timeout_manager;
229 std::optional<mrs_lib::TimeoutManager::timeout_id_t> m_timeout_id;
230 mrs_lib::TimeoutManager::callback_t m_timeout_mgr_callback;
231
232 protected:
233 rclcpp::Time m_latest_message_time;
234 typename MessageType::ConstSharedPtr m_latest_message;
235 message_callback_t m_message_callback;
236
237 protected:
238 /* default_timeout_callback() method //{ */
239 void default_timeout_callback(const std::string& topic_name, const rclcpp::Time& last_msg)
240 {
241 const rclcpp::Duration since_msg = (m_node->get_clock()->now() - last_msg);
242 const auto n_pubs = m_sub->get_publisher_count();
243 const std::string txt = "Did not receive any message from topic '" + topic_name + "' for " + std::to_string(since_msg.seconds()) + "s ("
244 + std::to_string(n_pubs) + " publishers on this topic)";
245
246 RCLCPP_WARN_STREAM(m_node->get_logger(), txt);
247 }
248 //}
249
250 /* process_new_message() method //{ */
251 void process_new_message(const typename MessageType::ConstSharedPtr& msg)
252 {
253 m_latest_message_time = m_node->get_clock()->now();
254 m_latest_message = msg;
255 // If the message callback is registered, the new data will immediately be processed,
256 // so reset the flag. Otherwise, set the flag.
257 m_new_data = !m_message_callback;
258 m_got_data = true;
259 m_new_data_cv.notify_one();
260 }
261 //}
262
263 /* data_callback() method //{ */
264 virtual void data_callback(const typename MessageType::ConstSharedPtr& msg)
265 {
266 {
267 std::lock_guard lck(m_new_data_mtx);
268 if (m_timeout_manager && m_timeout_id.has_value())
269 m_timeout_manager->reset(m_timeout_id.value(), m_node->get_clock()->now());
270 process_new_message(msg);
271 }
272
273 // execute the callback after unlocking the mutex to enable multi-threaded callback execution
274 if (m_message_callback)
275 m_message_callback(msg);
276 }
277 //}
278 };
279 //}
280
281 /* SubscribeHandler_threadsafe class //{ */
282 template <typename MessageType>
283 class SubscriberHandler<MessageType>::ImplThreadsafe : public SubscriberHandler<MessageType>::Impl
284 {
285 private:
287
288 public:
289 using timeout_callback_t = typename impl_class_t::timeout_callback_t;
290 using message_callback_t = typename impl_class_t::message_callback_t;
291
292 friend class SubscriberHandler<MessageType>;
293
294 public:
295 ImplThreadsafe(const SubscriberHandlerOptions& options, const message_callback_t& message_callback = message_callback_t())
296 : impl_class_t::Impl(options, message_callback)
297 {
298 }
299
300 public:
301 virtual bool hasMsg() const override
302 {
303 std::lock_guard lck(m_mtx);
304 return impl_class_t::hasMsg();
305 }
306 virtual bool newMsg() const override
307 {
308 std::lock_guard lck(m_mtx);
309 return impl_class_t::newMsg();
310 }
311 virtual bool usedMsg() const override
312 {
313 std::lock_guard lck(m_mtx);
314 return impl_class_t::usedMsg();
315 }
316 virtual typename MessageType::ConstSharedPtr getMsg() override
317 {
318 std::lock_guard lck(m_mtx);
319 return impl_class_t::getMsg();
320 }
321 virtual typename MessageType::ConstSharedPtr peekMsg() const override
322 {
323 std::lock_guard lck(m_mtx);
324 return impl_class_t::peekMsg();
325 }
326 virtual rclcpp::Time lastMsgTime() const override
327 {
328 std::lock_guard lck(m_mtx);
329 return impl_class_t::lastMsgTime();
330 };
331 virtual std::string topicName() const override
332 {
333 std::lock_guard lck(m_mtx);
334 return impl_class_t::topicName();
335 };
336 virtual void start() override
337 {
338 std::lock_guard lck(m_mtx);
339 return impl_class_t::start();
340 }
341 virtual void stop() override
342 {
343 std::lock_guard lck(m_mtx);
344 return impl_class_t::stop();
345 }
346
347 virtual ~ImplThreadsafe() override = default;
348
349 protected:
350 virtual void data_callback(const typename MessageType::ConstSharedPtr& msg) override
351 {
352 {
353 std::scoped_lock lck(m_mtx, this->m_new_data_mtx);
354
355 if (this->m_timeout_manager && this->m_timeout_id.has_value())
356 {
357 this->m_timeout_manager->reset(this->m_timeout_id.value(), this->m_node->get_clock()->now());
358 }
359
360 impl_class_t::process_new_message(msg);
361 }
362
363 // execute the callback after unlocking the mutex to enable multi-threaded callback execution
364 if (this->m_message_callback)
365 impl_class_t::m_message_callback(msg);
366 }
367
368 private:
369 mutable std::recursive_mutex m_mtx;
370 };
371 //}
372
373} // namespace mrs_lib
374
375#endif // SUBSCRIBER_HANDLER_HPP
Definition subscriber_handler.hpp:284
Definition subscriber_handler.hpp:21
The main class for ROS topic subscription, message timeout handling etc.
Definition subscriber_handler.h:92
virtual bool newMsg() const
Used to check whether at least one message has been received on the handled topic since the last call...
Definition subscriber_handler.h:150
virtual void stop()
Disables the callbacks for the handled topic.
Definition subscriber_handler.h:252
virtual MessageType::ConstSharedPtr getMsg()
Returns the last received message on the topic, handled by this SubscriberHandler....
Definition subscriber_handler.h:117
virtual uint32_t getNumPublishers() const
Returns number of publishers registered at the topic.
Definition subscriber_handler.h:217
virtual void setNoMessageTimeout(const rclcpp::Duration &timeout)
Sets the timeout for no received message.
Definition subscriber_handler.h:228
virtual std::string topicName() const
Returns the resolved (full) name of the topic, handled by this SubscriberHandler.
Definition subscriber_handler.h:195
virtual bool usedMsg() const
Used to check whether getMsg() was called at least once on this SubscriberHandler.
Definition subscriber_handler.h:161
virtual MessageType::ConstSharedPtr waitForNew(const rclcpp::Duration &timeout)
Blocks until new data becomes available or until the timeout runs out or until a spurious wake-up.
Definition subscriber_handler.h:173
virtual bool hasMsg() const
Used to check whether at least one message has been received on the handled topic.
Definition subscriber_handler.h:139
virtual void start()
Enables the callbacks for the handled topic.
Definition subscriber_handler.h:240
virtual MessageType::ConstSharedPtr peekMsg() const
Returns the last received message on the topic without modifying the newMsg() or usedMsg() flags.
Definition subscriber_handler.h:128
std::function< void(typename MessageType::ConstSharedPtr)> message_callback_t
Convenience type for the message callback function.
Definition subscriber_handler.h:107
std::function< void(const std::string &topic_name, const rclcpp::Time &last_msg)> timeout_callback_t
Type for the timeout callback function.
Definition subscriber_handler.h:102
virtual rclcpp::Time lastMsgTime() const
Returns time of the last received message on the topic, handled by this SubscriberHandler.
Definition subscriber_handler.h:184
All mrs_lib functions, classes, variables and definitions are contained in this namespace.
Definition attitude_converter.h:24
A helper class to simplify setup of SubscriberHandler construction. This class is passed to the Subsc...
Definition subscriber_handler.h:33
rclcpp::Node::SharedPtr node
The ROS NodeHandle to be used for subscription.
Definition subscriber_handler.h:39
std::string topic_name
Name of the ROS topic to be handled.
Definition subscriber_handler.h:43
std::shared_ptr< mrs_lib::TimeoutManager > timeout_manager
Will be used for handling message timouts if necessary. If no manager is specified,...
Definition subscriber_handler.h:45
std::function< void(const std::string &topic_name, const rclcpp::Time &last_msg)> timeout_callback
This function will be called if no new message is received for the no_message_timeout duration....
Definition subscriber_handler.h:52
std::string node_name
Name of the ROS node, using this handle (used for messages printed to console).
Definition subscriber_handler.h:41
rclcpp::Duration no_message_timeout
If no new message is received for this duration, the timeout_callback function will be called....
Definition subscriber_handler.h:48
Defines SubscriberHandler and related convenience classes for subscribing to ROS topics.