mrs_lib
Various reusable classes, functions and utilities for use in MRS projects
Loading...
Searching...
No Matches
subscriber_handler.hpp
1// clang: MatousFormat
2
3#ifndef SUBSCRIBER_HANDLER_HPP
4#define SUBSCRIBER_HANDLER_HPP
5
6#ifndef SUBSCRIBER_HANDLER_H
8#endif
9
11#include <mrs_lib/timer_handler.h>
12#include <mutex>
13#include <condition_variable>
14
15namespace mrs_lib
16{
17 /* SubscriberHandler::Impl class //{ */
18 // implements the constructor, getMsg() method and data_callback method (non-thread-safe)
19 template <typename MessageType>
20 class SubscriberHandler<MessageType>::Impl
21 {
22 public:
23 using timeout_callback_t = typename SubscriberHandler<MessageType>::timeout_callback_t;
24 using message_callback_t = typename SubscriberHandler<MessageType>::message_callback_t;
25 using data_callback_t = std::function<void(const typename MessageType::ConstSharedPtr&)>;
26
27 private:
28 friend class SubscriberHandler<MessageType>;
29
30 public:
31 /* constructor //{ */
32 Impl(const SubscriberHandlerOptions& options, const message_callback_t& message_callback = message_callback_t())
33 : m_node(options.node), m_qos(options.qos), m_sub_opts(options.subscription_options), m_topic_name(options.topic_name), m_node_name(options.node_name),
34 m_got_data(false), m_new_data(false), m_used_data(false), m_timeout_manager(options.timeout_manager), m_latest_message_time(0),
35 m_latest_message(nullptr), m_message_callback(message_callback)
36 {
37
38 // initialize the callback for the TimeoutManager
39 if (options.timeout_callback)
40 {
41 m_timeout_mgr_callback = std::bind(options.timeout_callback, m_topic_name, std::placeholders::_1);
42 } else
43 {
44 m_timeout_mgr_callback = std::bind(&Impl::default_timeout_callback, this, m_topic_name, std::placeholders::_1);
45 }
46
47 if (options.no_message_timeout != mrs_lib::no_timeout)
48 {
49
50 // initialize a new TimeoutManager if not provided by the user
51 if (!m_timeout_manager)
52 {
53 m_timeout_manager =
54 std::make_shared<mrs_lib::TimeoutManager>(m_node, rclcpp::Rate(1.0 / (options.no_message_timeout.seconds() * 0.5), m_node->get_clock()));
55 }
56
57 // register the timeout callback with the TimeoutManager
58 m_timeout_id = m_timeout_manager->registerNew(options.no_message_timeout, m_timeout_mgr_callback, m_node->get_clock()->now());
59 }
60
61 const std::string msg = "Subscribed to topic '" + m_topic_name + "' -> '" + m_topic_name + "'";
62
63 RCLCPP_INFO_STREAM(m_node->get_logger(), msg);
64 }
65 //}
66
67 virtual ~Impl() = default;
68
69 public:
70 /* getMsg() method //{ */
71 virtual typename MessageType::ConstSharedPtr getMsg()
72 {
73 m_new_data = false;
74 m_used_data = true;
75 return peekMsg();
76 }
77 //}
78
79 /* peekMsg() method //{ */
80 virtual typename MessageType::ConstSharedPtr peekMsg() const
81 {
82 /* assert(m_got_data); */
83 /* if (!m_got_data) */
84 /* ROS_ERROR("[%s]: No data received yet from topic '%s' (forgot to check hasMsg()?)! Returning empty message.", m_node_name.c_str(), */
85 /* topicName().c_str()); */
86 return m_latest_message;
87 }
88 //}
89
90 /* hasMsg() method //{ */
91 virtual bool hasMsg() const
92 {
93 return m_got_data;
94 }
95 //}
96
97 /* newMsg() method //{ */
98 virtual bool newMsg() const
99 {
100 return m_new_data;
101 }
102 //}
103
104 /* usedMsg() method //{ */
105 virtual bool usedMsg() const
106 {
107 return m_used_data;
108 }
109 //}
110
111 /* waitForNew() method //{ */
112 virtual typename MessageType::ConstSharedPtr waitForNew(const rclcpp::Duration& timeout)
113 {
114 // convert the ros type to chrono type
115 const std::chrono::duration<float> chrono_timeout(timeout.seconds());
116 // lock the mutex guarding the m_new_data flag
117 std::unique_lock lock(m_new_data_mtx);
118 // if new data is available, return immediately, otherwise attempt to wait for new data using the respective condition variable
119 if (m_new_data)
120 return getMsg();
121 else if (m_new_data_cv.wait_for(lock, chrono_timeout) == std::cv_status::no_timeout && m_new_data)
122 return getMsg();
123 else
124 return nullptr;
125 };
126 //}
127
128 /* lastMsgTime() method //{ */
129 virtual rclcpp::Time lastMsgTime() const
130 {
131 return m_latest_message_time;
132 };
133 //}
134
135 /* topicName() method //{ */
136 virtual std::string topicName() const
137 {
138 if (m_sub == nullptr)
139 {
140 return "";
141 }
142
143 std::string ret = m_sub->get_topic_name();
144
145 if (ret.empty())
146 {
147 ret = m_node->get_node_topics_interface()->resolve_topic_name(ret);
148 }
149
150 return ret;
151 }
152 //}
153
154 /* getNumPublishers() method //{ */
155 virtual uint32_t getNumPublishers() const
156 {
157 return m_sub->get_publisher_count();
158 };
159 //}
160
161 /* setNoMessageTimeout() method //{ */
162 virtual void setNoMessageTimeout(const rclcpp::Duration& timeout)
163 {
164 if (timeout == mrs_lib::no_timeout)
165 {
166 // if there is a timeout callback already registered but the user wants to disable it, pause it
167 if (m_timeout_manager != nullptr && m_timeout_id.has_value())
168 m_timeout_manager->pause(m_timeout_id.value());
169 // otherwise, there is no callback, so nothing to do
170 } else
171 {
172 // if there is no callback manager, create it
173 if (m_timeout_manager == nullptr)
174 m_timeout_manager = std::make_shared<mrs_lib::TimeoutManager>(m_node, rclcpp::Rate(1.0 / (timeout.seconds() * 0.5), m_node->get_clock()));
175
176 // if there is an existing timeout callback registered, change its timeout
177 if (m_timeout_id.has_value())
178 m_timeout_manager->change(m_timeout_id.value(), timeout, m_timeout_mgr_callback, m_node->get_clock()->now());
179
180 // otherwise, register it
181 else
182 m_timeout_id = m_timeout_manager->registerNew(timeout, m_timeout_mgr_callback, m_node->get_clock()->now());
183 }
184 }
185 //}
186
187 /* start() method //{ */
188 virtual void start()
189 {
190 if (m_timeout_manager && m_timeout_id.has_value())
191 m_timeout_manager->start(m_timeout_id.value(), m_node->get_clock()->now());
192
193 const std::function<void(const typename MessageType::SharedPtr)> cbk = std::bind(&Impl::data_callback, this, std::placeholders::_1);
194
195 m_sub = m_node->create_subscription<MessageType>(m_topic_name, m_qos, cbk, m_sub_opts);
196 }
197 //}
198
199 /* stop() method //{ */
200 virtual void stop()
201 {
202 if (m_timeout_manager && m_timeout_id.has_value())
203 {
204 m_timeout_manager->pause(m_timeout_id.value());
205 }
206
207 m_sub.reset();
208 }
209 //}
210
211 protected:
212 rclcpp::Node::SharedPtr m_node;
213 rclcpp::Subscription<MessageType>::SharedPtr m_sub;
214 rclcpp::QoS m_qos = rclcpp::SystemDefaultsQoS();
215 rclcpp::SubscriptionOptions m_sub_opts;
216
217 protected:
218 std::string m_topic_name;
219 std::string m_node_name;
220
221 protected:
222 bool m_got_data; // whether any data was received
223
224 mutable std::mutex m_new_data_mtx;
225 mutable std::condition_variable m_new_data_cv;
226 bool m_new_data; // whether new data was received since last call to get_data
227
228 bool m_used_data; // whether get_data was successfully called at least once
229
230 protected:
231 std::shared_ptr<mrs_lib::TimeoutManager> m_timeout_manager;
232 std::optional<mrs_lib::TimeoutManager::timeout_id_t> m_timeout_id;
233 mrs_lib::TimeoutManager::callback_t m_timeout_mgr_callback;
234
235 protected:
236 rclcpp::Time m_latest_message_time;
237 typename MessageType::ConstSharedPtr m_latest_message;
238 message_callback_t m_message_callback;
239
240 protected:
241 /* default_timeout_callback() method //{ */
242 void default_timeout_callback(const std::string& topic_name, const rclcpp::Time& last_msg)
243 {
244 const rclcpp::Duration since_msg = (m_node->get_clock()->now() - last_msg);
245 const auto n_pubs = m_sub->get_publisher_count();
246 const std::string txt = "Did not receive any message from topic '" + topic_name + "' for " + std::to_string(since_msg.seconds()) + "s ("
247 + std::to_string(n_pubs) + " publishers on this topic)";
248
249 RCLCPP_WARN_STREAM(m_node->get_logger(), txt);
250 }
251 //}
252
253 /* process_new_message() method //{ */
254 void process_new_message(const typename MessageType::ConstSharedPtr& msg)
255 {
256 m_latest_message_time = m_node->get_clock()->now();
257 m_latest_message = msg;
258 // If the message callback is registered, the new data will immediately be processed,
259 // so reset the flag. Otherwise, set the flag.
260 m_new_data = !m_message_callback;
261 m_got_data = true;
262 m_new_data_cv.notify_one();
263 }
264 //}
265
266 /* data_callback() method //{ */
267 virtual void data_callback(const typename MessageType::ConstSharedPtr& msg)
268 {
269 {
270 std::lock_guard lck(m_new_data_mtx);
271 if (m_timeout_manager && m_timeout_id.has_value())
272 m_timeout_manager->reset(m_timeout_id.value(), m_node->get_clock()->now());
273 process_new_message(msg);
274 }
275
276 // execute the callback after unlocking the mutex to enable multi-threaded callback execution
277 if (m_message_callback)
278 m_message_callback(msg);
279 }
280 //}
281 };
282 //}
283
284 /* SubscribeHandler_threadsafe class //{ */
285 template <typename MessageType>
286 class SubscriberHandler<MessageType>::ImplThreadsafe : public SubscriberHandler<MessageType>::Impl
287 {
288 private:
290
291 public:
292 using timeout_callback_t = typename impl_class_t::timeout_callback_t;
293 using message_callback_t = typename impl_class_t::message_callback_t;
294
295 friend class SubscriberHandler<MessageType>;
296
297 public:
298 ImplThreadsafe(const SubscriberHandlerOptions& options, const message_callback_t& message_callback = message_callback_t())
299 : impl_class_t::Impl(options, message_callback)
300 {
301 }
302
303 public:
304 virtual bool hasMsg() const override
305 {
306 std::lock_guard lck(m_mtx);
307 return impl_class_t::hasMsg();
308 }
309 virtual bool newMsg() const override
310 {
311 std::lock_guard lck(m_mtx);
312 return impl_class_t::newMsg();
313 }
314 virtual bool usedMsg() const override
315 {
316 std::lock_guard lck(m_mtx);
317 return impl_class_t::usedMsg();
318 }
319 virtual typename MessageType::ConstSharedPtr getMsg() override
320 {
321 std::lock_guard lck(m_mtx);
322 return impl_class_t::getMsg();
323 }
324 virtual typename MessageType::ConstSharedPtr peekMsg() const override
325 {
326 std::lock_guard lck(m_mtx);
327 return impl_class_t::peekMsg();
328 }
329 virtual rclcpp::Time lastMsgTime() const override
330 {
331 std::lock_guard lck(m_mtx);
332 return impl_class_t::lastMsgTime();
333 };
334 virtual std::string topicName() const override
335 {
336 std::lock_guard lck(m_mtx);
337 return impl_class_t::topicName();
338 };
339 virtual void start() override
340 {
341 std::lock_guard lck(m_mtx);
342 return impl_class_t::start();
343 }
344 virtual void stop() override
345 {
346 std::lock_guard lck(m_mtx);
347 return impl_class_t::stop();
348 }
349
350 virtual ~ImplThreadsafe() override = default;
351
352 protected:
353 virtual void data_callback(const typename MessageType::ConstSharedPtr& msg) override
354 {
355 {
356 std::scoped_lock lck(m_mtx, this->m_new_data_mtx);
357
358 if (this->m_timeout_manager && this->m_timeout_id.has_value())
359 {
360 this->m_timeout_manager->reset(this->m_timeout_id.value(), this->m_node->get_clock()->now());
361 }
362
363 impl_class_t::process_new_message(msg);
364 }
365
366 // execute the callback after unlocking the mutex to enable multi-threaded callback execution
367 if (this->m_message_callback)
368 impl_class_t::m_message_callback(msg);
369 }
370
371 private:
372 mutable std::recursive_mutex m_mtx;
373 };
374 //}
375
376} // namespace mrs_lib
377
378#endif // SUBSCRIBER_HANDLER_HPP
Definition subscriber_handler.hpp:287
Definition subscriber_handler.hpp:21
The main class for ROS topic subscription, message timeout handling etc.
Definition subscriber_handler.h:92
virtual bool newMsg() const
Used to check whether at least one message has been received on the handled topic since the last call...
Definition subscriber_handler.h:150
virtual void stop()
Disables the callbacks for the handled topic.
Definition subscriber_handler.h:252
virtual MessageType::ConstSharedPtr getMsg()
Returns the last received message on the topic, handled by this SubscriberHandler....
Definition subscriber_handler.h:117
virtual uint32_t getNumPublishers() const
Returns number of publishers registered at the topic.
Definition subscriber_handler.h:217
virtual void setNoMessageTimeout(const rclcpp::Duration &timeout)
Sets the timeout for no received message.
Definition subscriber_handler.h:228
virtual std::string topicName() const
Returns the resolved (full) name of the topic, handled by this SubscriberHandler.
Definition subscriber_handler.h:195
virtual bool usedMsg() const
Used to check whether getMsg() was called at least once on this SubscriberHandler.
Definition subscriber_handler.h:161
virtual MessageType::ConstSharedPtr waitForNew(const rclcpp::Duration &timeout)
Blocks until new data becomes available or until the timeout runs out or until a spurious wake-up.
Definition subscriber_handler.h:173
virtual bool hasMsg() const
Used to check whether at least one message has been received on the handled topic.
Definition subscriber_handler.h:139
virtual void start()
Enables the callbacks for the handled topic.
Definition subscriber_handler.h:240
virtual MessageType::ConstSharedPtr peekMsg() const
Returns the last received message on the topic without modifying the newMsg() or usedMsg() flags.
Definition subscriber_handler.h:128
std::function< void(typename MessageType::ConstSharedPtr)> message_callback_t
Convenience type for the message callback function.
Definition subscriber_handler.h:107
std::function< void(const std::string &topic_name, const rclcpp::Time &last_msg)> timeout_callback_t
Type for the timeout callback function.
Definition subscriber_handler.h:102
virtual rclcpp::Time lastMsgTime() const
Returns time of the last received message on the topic, handled by this SubscriberHandler.
Definition subscriber_handler.h:184
All mrs_lib functions, classes, variables and definitions are contained in this namespace.
Definition attitude_converter.h:24
A helper class to simplify setup of SubscriberHandler construction. This class is passed to the Subsc...
Definition subscriber_handler.h:33
rclcpp::Node::SharedPtr node
The ROS NodeHandle to be used for subscription.
Definition subscriber_handler.h:39
std::string topic_name
Name of the ROS topic to be handled.
Definition subscriber_handler.h:43
std::shared_ptr< mrs_lib::TimeoutManager > timeout_manager
Will be used for handling message timouts if necessary. If no manager is specified,...
Definition subscriber_handler.h:45
std::function< void(const std::string &topic_name, const rclcpp::Time &last_msg)> timeout_callback
This function will be called if no new message is received for the no_message_timeout duration....
Definition subscriber_handler.h:52
std::string node_name
Name of the ROS node, using this handle (used for messages printed to console).
Definition subscriber_handler.h:41
rclcpp::Duration no_message_timeout
If no new message is received for this duration, the timeout_callback function will be called....
Definition subscriber_handler.h:48
Defines SubscriberHandler and related convenience classes for subscribing to ROS topics.