mrs_lib
Various reusable classes, functions and utilities for use in MRS projects
Loading...
Searching...
No Matches
service_client_handler.hpp
Go to the documentation of this file.
1
6#pragma once
7
9
10namespace mrs_lib
11{
12
13 namespace internal
14 {
15
16 template <typename ServiceType>
17 requires(rosidl_generator_traits::is_service<ServiceType>::value)
18 class [[nodiscard("This service call is only performed when `co_await`ed.")]] ServiceAwaitable
19 {
20 using Client = rclcpp::Client<ServiceType>;
21 using Response = ServiceType::Response;
22 using Request = ServiceType::Request;
23
24 public:
25 ServiceAwaitable(const ServiceAwaitable&) = delete;
26 ServiceAwaitable& operator=(const ServiceAwaitable&) = delete;
28 ServiceAwaitable& operator=(ServiceAwaitable&&) = delete;
29
30 bool await_ready()
31 {
32 return false;
33 }
34
35 bool await_suspend(std::coroutine_handle<> continuation)
36 {
37 // Store the continuation into the awaitable.
38 // It will either be invoked when the service completes or destroyed if it is cancelled.
39 data_.continuation = continuation;
40 std::shared_ptr<Client> client = data_.client.lock();
41 if (client == nullptr || !client->service_is_ready())
42 {
43 data_.response = std::nullopt;
44 // Do not suspend
45 return false;
46 }
47 client->async_send_request(data_.request,
48 std::function([data_handle = DataHandle(data_)](std::shared_future<std::shared_ptr<Response>> future) mutable {
49 auto data = data_handle.leak();
50 data->response = future.get();
51 (*data).response = future.get();
52 auto continuation = std::exchange(data->continuation, nullptr);
53 internal::resume_coroutine(continuation);
54 }));
55 return true;
56 }
57
58 std::optional<std::shared_ptr<Response>> await_resume()
59 {
60 return data_.response;
61 }
62
63 private:
64 ServiceAwaitable(std::weak_ptr<Client> client, std::shared_ptr<Request> request)
65 : data_{
66 .client = std::move(client),
67 .request = std::move(request),
68 }
69 {
70 }
71
72 struct Data
73 {
74 std::weak_ptr<Client> client;
75 std::shared_ptr<Request> request;
76 std::optional<std::shared_ptr<Response>> response = std::nullopt;
77 std::coroutine_handle<> continuation = nullptr;
78 // Intrusive ref counting - used to destroy continuation in case of cancellation
79 std::atomic<size_t> ref_count = 0;
80 };
81
82 class DataHandle
83 {
84 public:
85 DataHandle(Data& data) : data_(&data)
86 {
87 size_t prev_ref_count = data_->ref_count.fetch_add(1);
88 // This constructor is called when awaiting the result.
89 // We do not allow multiple awaits, thus, this should always be zero.
90 assert(prev_ref_count == 0);
91 }
92
93 DataHandle(const DataHandle& other) : data_(other.data_)
94 {
95 data_->ref_count++;
96 }
97
98 DataHandle& operator=(const DataHandle& other)
99 {
100 decrement_ref_count_();
101 data_ = other.data_;
102 return *this;
103 }
104
105 DataHandle(DataHandle&& other) : data_(other.data_)
106 {
107 data_->ref_count++;
108 }
109
110 DataHandle& operator=(DataHandle&& other)
111 {
112 decrement_ref_count_();
113 data_ = other.data_;
114 return *this;
115 }
116
117 ~DataHandle()
118 {
119 decrement_ref_count_();
120 }
121
122 Data& operator*() const
123 {
124 assert(data_);
125 return *data_;
126 }
127
128 Data* operator->() const
129 {
130 assert(data_);
131 return data_;
132 }
133
134 Data* leak()
135 {
136 assert(data_);
137 return std::exchange(data_, nullptr);
138 }
139
140 private:
141 void decrement_ref_count_()
142 {
143 if (data_ == nullptr)
144 {
145 return;
146 }
147
148 size_t prev_ref_count = data_->ref_count.fetch_sub(1);
149 // This is the last instance - we need to destroy the continuation
150 if (prev_ref_count == 1)
151 {
152 // Copy coroutine handle into the current frame
153 // (`*data_` is stored in the coroutine it will be destroying)
154 auto continuation = data_->continuation;
155 // The continuation may be null if it was reset or not set at all
156 if (continuation != nullptr)
157 {
158 // Destroy the non finished coroutine
159 continuation.destroy();
160 }
161 }
162 }
163
164 Data* data_;
165 };
166
167 Data data_;
168
169 friend class ServiceClientHandler<ServiceType>;
170 };
171
172 } // namespace internal
173
174 // --------------------------------------------------------------
175 // | ServiceClientHandler |
176 // --------------------------------------------------------------
177
178 /* ServiceClientHandler() constructors //{ */
179
180 template <class ServiceType>
181 ServiceClientHandler<ServiceType>::ServiceClientHandler(rclcpp::Node::SharedPtr& node, const std::string& address, const rclcpp::QoS& qos)
182 : impl_(std::make_shared<Impl>(node, address, qos, node->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive)))
183 {
184 }
185
186 template <class ServiceType>
190
191 template <class ServiceType>
192 ServiceClientHandler<ServiceType>::ServiceClientHandler(rclcpp::Node::SharedPtr& node, const std::string& address, const rclcpp::QoS& qos,
193 const rclcpp::CallbackGroup::SharedPtr& callback_group)
194 : impl_(std::make_shared<Impl>(node, address, qos, callback_group))
195 {
196 }
197
198 template <class ServiceType>
199 ServiceClientHandler<ServiceType>::ServiceClientHandler(rclcpp::Node::SharedPtr& node, const std::string& address,
200 const rclcpp::CallbackGroup::SharedPtr& callback_group)
201 : ServiceClientHandler(node, address, rclcpp::ServicesQoS(), callback_group)
202 {
203 }
204
205 //}
206
207 /* callSync(const ServiceType::Request& request, ServiceType::Response& response) //{ */
208
209 template <class ServiceType>
210 std::optional<std::shared_ptr<typename ServiceType::Response>>
211 ServiceClientHandler<ServiceType>::callSync(const std::shared_ptr<typename ServiceType::Request>& request)
212 {
213 if (!impl_)
214 {
215 RCLCPP_ERROR(rclcpp::get_logger("ServiceClientHandler"), "Not initialized, cannot use callSync()!");
216 return std::nullopt;
217 }
218 return impl_->callSync(request);
219 }
220
221 //}
222
223 /* callAsync(const ServiceType::Request& request, ServiceType::Response& response) //{ */
224
225 template <class ServiceType>
226 std::optional<std::shared_future<std::shared_ptr<typename ServiceType::Response>>>
227 ServiceClientHandler<ServiceType>::callAsync(const std::shared_ptr<typename ServiceType::Request>& request)
228 {
229 if (!impl_)
230 {
231 RCLCPP_ERROR(rclcpp::get_logger("ServiceClientHandler"), "Not initialized, cannot use callAsync()!");
232 return std::nullopt;
233 }
234 return impl_->callAsync(request);
235 }
236
237 //}
238
239 template <class ServiceType>
241 ServiceClientHandler<ServiceType>::callAwaitable(std::shared_ptr<typename ServiceType::Request> request)
242 {
243 if (!impl_)
244 {
245 RCLCPP_ERROR(rclcpp::get_logger("ServiceClientHandler"), "Not initialized, cannot use callAwaitable()!");
246 co_return std::nullopt;
247 }
248
249 co_return co_await impl_->callAwaitable(request);
250 }
251
252 /* getServiceName() //{ */
253
254 template <class ServiceType>
256 {
257 if (!impl_)
258 {
259 RCLCPP_ERROR(rclcpp::get_logger("ServiceClientHandler"), "Not initialized, cannot use getServiceName()!");
260 return {};
261 }
262 return impl_->getServiceName();
263 }
264
265 //}
266
267 /* waitForService() //{ */
268
269 template <class ServiceType>
270 template <typename RepT, typename RatioT>
271 bool ServiceClientHandler<ServiceType>::waitForService(std::chrono::duration<RepT, RatioT> timeout)
272 {
273 if (!impl_)
274 {
275 RCLCPP_ERROR(rclcpp::get_logger("ServiceClientHandler"), "Not initialized, cannot use waitForService()!");
276 return false;
277 }
278 return impl_->waitForService(timeout);
279 }
280
281 //}
282
283 /* isServiceReady() //{ */
284 template <class ServiceType>
286 {
287 if (!impl_)
288 {
289 RCLCPP_ERROR(rclcpp::get_logger("ServiceClientHandler"), "Not initialized, cannot use isServiceReady()!");
290 return false;
291 }
292 return impl_->isServiceReady();
293 }
294
295 //}
296
297 // --------------------------------------------------------------
298 // | ServiceClientHandler::Impl |
299 // --------------------------------------------------------------
300
301 /* class ServiceClientHandler::impl //{ */
302
306 template <class ServiceType>
307 class ServiceClientHandler<ServiceType>::Impl
308 {
309
310 public:
319 Impl(rclcpp::Node::SharedPtr& node, const std::string& address, const rclcpp::QoS& qos, const rclcpp::CallbackGroup::SharedPtr& callback_group)
320 : callback_group_(callback_group), service_client_(node->create_client<ServiceType>(address, qos, callback_group))
321 {
322 RCLCPP_INFO_STREAM(node->get_logger(), "Created client '" << address << "' -> '" << service_client_->get_service_name() << "'");
323 }
324
332 std::optional<std::shared_ptr<typename ServiceType::Response>> callSync(const std::shared_ptr<typename ServiceType::Request>& request)
333 {
334 /* always check if the service is ready before calling */
335 if (!service_client_->service_is_ready())
336 return std::nullopt;
337
338 /* the future done callback is being run in a separate thread under the default callback group of the node */
339 const auto future_msg = service_client_->async_send_request(request).future.share();
340
341 /* it is a good practice to check if the future object is not already invalid after the call */
342 /* if valid() is false, the future has UNDEFINED behavior */
343 if (!future_msg.valid())
344 return std::nullopt;
345
346 // wait for the future to become available and then return
347 return future_msg.get();
348 }
349
357 std::optional<std::shared_future<std::shared_ptr<typename ServiceType::Response>>> callAsync(const std::shared_ptr<typename ServiceType::Request>& request)
358 {
359 /* always check if the service is ready before calling */
360 if (!service_client_->service_is_ready())
361 return std::nullopt;
362
363 const auto future = service_client_->async_send_request(request).future.share();
364
365 /* it is a good practice to check if the future object is not already invalid after the call */
366 /* if valid() is false, the future has UNDEFINED behavior */
367 if (!future.valid())
368 return std::nullopt;
369
370 return future;
371 }
372
373 internal::ServiceAwaitable<ServiceType> callAwaitable(const std::shared_ptr<typename ServiceType::Request>& request)
374 {
375 return internal::ServiceAwaitable<ServiceType>(service_client_, request);
376 }
377
383 std::string getServiceName() const
384 {
385 return service_client_->get_service_name();
386 }
387
395 template <typename RepT = int64_t, typename RatioT = std::milli>
396 bool waitForService(std::chrono::duration<RepT, RatioT> timeout)
397 {
398 return service_client_->wait_for_service(timeout);
399 }
400
406 bool isServiceReady() const
407 {
408 return service_client_->service_is_ready();
409 }
410
411 private:
412 rclcpp::CallbackGroup::SharedPtr callback_group_;
413 typename rclcpp::Client<ServiceType>::SharedPtr service_client_;
414 };
415
416 //}
417
418} // namespace mrs_lib
implementation of the service client handler
Definition service_client_handler.hpp:308
std::optional< std::shared_future< std::shared_ptr< typename ServiceType::Response > > > callAsync(const std::shared_ptr< typename ServiceType::Request > &request)
asynchronous service call
Definition service_client_handler.hpp:357
std::optional< std::shared_ptr< typename ServiceType::Response > > callSync(const std::shared_ptr< typename ServiceType::Request > &request)
"classic" synchronous service call
Definition service_client_handler.hpp:332
Impl(rclcpp::Node::SharedPtr &node, const std::string &address, const rclcpp::QoS &qos, const rclcpp::CallbackGroup::SharedPtr &callback_group)
constructor
Definition service_client_handler.hpp:319
std::string getServiceName() const
Returns the name of the service this client connects to.
Definition service_client_handler.hpp:383
bool waitForService(std::chrono::duration< RepT, RatioT > timeout)
Waits for the service to be available.
Definition service_client_handler.hpp:396
bool isServiceReady() const
Checks if the service is available.
Definition service_client_handler.hpp:406
user wrapper of the service client handler implementation
Definition service_client_handler.h:35
ServiceClientHandler()
Default constructor to avoid having to use pointers.
Definition service_client_handler.hpp:187
std::optional< std::shared_ptr< typename ServiceType::Response > > callSync(const std::shared_ptr< typename ServiceType::Request > &request)
Synchronous (blocking) call of the service.
Definition service_client_handler.hpp:211
std::string getServiceName() const
Returns the name of the service this client connects to.
Definition service_client_handler.hpp:255
bool waitForService(std::chrono::duration< RepT, RatioT > timeout=std::chrono::seconds(1))
Waits for the service to be available.
Definition service_client_handler.hpp:271
std::optional< std::shared_future< std::shared_ptr< typename ServiceType::Response > > > callAsync(const std::shared_ptr< typename ServiceType::Request > &request)
Asynchronous (non-blocking) call of the service.
Definition service_client_handler.hpp:227
bool isServiceReady() const
Checks if the service is available.
Definition service_client_handler.hpp:285
Task< std::optional< std::shared_ptr< typename ServiceType::Response > > > callAwaitable(std::shared_ptr< typename ServiceType::Request > request)
Awaitable call of the service.
Definition service_client_handler.hpp:241
Task type for creating coroutines.
Definition task.hpp:313
Definition service_client_handler.hpp:19
All mrs_lib functions, classes, variables and definitions are contained in this namespace.
Definition attitude_converter.h:24
Defines ServiceClientHandler and related convenience classes for upgrading the ROS service client.