Apollo 10.0
自动驾驶开放平台
client.h
浏览该文件的文档.
1/******************************************************************************
2 * Copyright 2018 The Apollo Authors. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
16
17#ifndef CYBER_SERVICE_CLIENT_H_
18#define CYBER_SERVICE_CLIENT_H_
19
20#include <future>
21#include <map>
22#include <memory>
23#include <sstream>
24#include <string>
25#include <tuple>
26#include <unordered_map>
27#include <utility>
28
29#include "cyber/common/log.h"
30#include "cyber/common/types.h"
33
34namespace apollo {
35namespace cyber {
36
46template <typename Request, typename Response>
47class Client : public ClientBase {
48 public:
49 using SharedRequest = typename std::shared_ptr<Request>;
50 using SharedResponse = typename std::shared_ptr<Response>;
51 using Promise = std::promise<SharedResponse>;
52 using SharedPromise = std::shared_ptr<Promise>;
53 using SharedFuture = std::shared_future<SharedResponse>;
54 using CallbackType = std::function<void(SharedFuture)>;
55
62 Client(const std::string& node_name, const std::string& service_name)
63 : ClientBase(service_name),
64 node_name_(node_name),
65 request_channel_(service_name + SRV_CHANNEL_REQ_SUFFIX),
66 response_channel_(service_name + SRV_CHANNEL_RES_SUFFIX),
67 sequence_number_(0) {}
68
72 Client() = delete;
73
74 virtual ~Client() {}
75
82 bool Init();
83
92 SharedRequest request,
93 const std::chrono::seconds& timeout_s = std::chrono::seconds(5));
94
103 const Request& request,
104 const std::chrono::seconds& timeout_s = std::chrono::seconds(5));
105
110
114 SharedFuture AsyncSendRequest(const Request& request);
115
125
129 bool ServiceIsReady() const;
130
134 void Destroy();
135
144 template <typename RatioT = std::milli>
145 bool WaitForService(std::chrono::duration<int64_t, RatioT> timeout =
146 std::chrono::duration<int64_t, RatioT>(-1)) {
148 std::chrono::duration_cast<std::chrono::nanoseconds>(timeout));
149 }
150
151 private:
152 void HandleResponse(const std::shared_ptr<Response>& response,
153 const transport::MessageInfo& request_info);
154
155 bool IsInit(void) const { return response_receiver_ != nullptr; }
156
157 std::string node_name_;
158
159 std::function<void(const std::shared_ptr<Response>&,
160 const transport::MessageInfo&)>
161 response_callback_;
162
163 std::unordered_map<uint64_t,
164 std::tuple<SharedPromise, CallbackType, SharedFuture>>
165 pending_requests_;
166 std::mutex pending_requests_mutex_;
167
168 std::shared_ptr<transport::Transmitter<Request>> request_transmitter_;
169 std::shared_ptr<transport::Receiver<Response>> response_receiver_;
170 std::string request_channel_;
171 std::string response_channel_;
172
173 transport::Identity writer_id_;
174 uint64_t sequence_number_;
175};
176
177template <typename Request, typename Response>
179
180template <typename Request, typename Response>
183 role.set_node_name(node_name_);
184 role.set_channel_name(request_channel_);
185 auto channel_id = common::GlobalData::RegisterChannel(request_channel_);
186 role.set_channel_id(channel_id);
187 role.mutable_qos_profile()->CopyFrom(
189 auto transport = transport::Transport::Instance();
190 request_transmitter_ =
191 transport->CreateTransmitter<Request>(role, proto::OptionalMode::RTPS);
192 if (request_transmitter_ == nullptr) {
193 AERROR << "Create request pub failed.";
194 return false;
195 }
196 writer_id_ = request_transmitter_->id();
197
198 response_callback_ =
200 std::placeholders::_1, std::placeholders::_2);
201
202 role.set_channel_name(response_channel_);
203 channel_id = common::GlobalData::RegisterChannel(response_channel_);
204 role.set_channel_id(channel_id);
205 response_receiver_ = transport->CreateReceiver<Response>(
206 role,
207 [=](const std::shared_ptr<Response>& response,
208 const transport::MessageInfo& message_info,
209 const proto::RoleAttributes& reader_attr) {
210 (void)message_info;
211 (void)reader_attr;
212 response_callback_(response, message_info);
213 },
215 if (response_receiver_ == nullptr) {
216 AERROR << "Create response sub failed.";
217 request_transmitter_.reset();
218 return false;
219 }
220 return true;
221}
222
223template <typename Request, typename Response>
226 const std::chrono::seconds& timeout_s) {
227 if (!IsInit()) {
228 return nullptr;
229 }
230 auto future = AsyncSendRequest(request);
231 if (!future.valid()) {
232 return nullptr;
233 }
234 auto status = future.wait_for(timeout_s);
235 if (status == std::future_status::ready) {
236 return future.get();
237 } else {
238 return nullptr;
239 }
240}
241
242template <typename Request, typename Response>
245 const std::chrono::seconds& timeout_s) {
246 if (!IsInit()) {
247 return nullptr;
248 }
249 auto request_ptr = std::make_shared<const Request>(request);
250 return SendRequest(request_ptr, timeout_s);
251}
252
253template <typename Request, typename Response>
256 auto request_ptr = std::make_shared<const Request>(request);
257 return AsyncSendRequest(request_ptr);
258}
259
260template <typename Request, typename Response>
263 return AsyncSendRequest(request, [](SharedFuture) {});
264}
265
266template <typename Request, typename Response>
269 CallbackType&& cb) {
270 if (IsInit()) {
271 std::lock_guard<std::mutex> lock(pending_requests_mutex_);
272 sequence_number_++;
273 transport::MessageInfo info(writer_id_, sequence_number_, writer_id_);
274 request_transmitter_->Transmit(request, info);
275 SharedPromise call_promise = std::make_shared<Promise>();
276 SharedFuture f(call_promise->get_future());
277 pending_requests_[info.seq_num()] =
278 std::make_tuple(call_promise, std::forward<CallbackType>(cb), f);
279 return f;
280 } else {
281 return std::shared_future<std::shared_ptr<Response>>();
282 }
283}
284
285template <typename Request, typename Response>
287 return true;
288}
289
290template <typename Request, typename Response>
292 const std::shared_ptr<Response>& response,
293 const transport::MessageInfo& request_header) {
294 ADEBUG << "client recv response.";
295 std::lock_guard<std::mutex> lock(pending_requests_mutex_);
296 if (request_header.spare_id() != writer_id_) {
297 return;
298 }
299 uint64_t sequence_number = request_header.seq_num();
300 if (this->pending_requests_.count(sequence_number) == 0) {
301 return;
302 }
303 auto tuple = this->pending_requests_[sequence_number];
304 auto call_promise = std::get<0>(tuple);
305 auto callback = std::get<1>(tuple);
306 auto future = std::get<2>(tuple);
307 this->pending_requests_.erase(sequence_number);
308 call_promise->set_value(response);
309 callback(future);
310}
311
312} // namespace cyber
313} // namespace apollo
314
315#endif // CYBER_SERVICE_CLIENT_H_
double f
Base class of Client
Definition client_base.h:33
bool WaitForServiceNanoseconds(std::chrono::nanoseconds time_out)
Definition client_base.h:62
Client get Response from a responding Service by sending a Request
Definition client.h:47
void Destroy()
destroy this Client
Definition client.h:178
std::promise< SharedResponse > Promise
Definition client.h:51
Client(const std::string &node_name, const std::string &service_name)
Construct a new Client object
Definition client.h:62
bool Init()
Init the Client
Definition client.h:181
bool ServiceIsReady() const
Is the Service is ready?
Definition client.h:286
bool WaitForService(std::chrono::duration< int64_t, RatioT > timeout=std::chrono::duration< int64_t, RatioT >(-1))
wait for the connection with the Service established
Definition client.h:145
virtual ~Client()
Definition client.h:74
Client()=delete
forbid Constructing a new Client object with empty params
SharedFuture AsyncSendRequest(SharedRequest request)
Send Request shared ptr asynchronously
Definition client.h:262
std::function< void(SharedFuture)> CallbackType
Definition client.h:54
typename std::shared_ptr< Request > SharedRequest
Definition client.h:49
std::shared_future< SharedResponse > SharedFuture
Definition client.h:53
SharedResponse SendRequest(SharedRequest request, const std::chrono::seconds &timeout_s=std::chrono::seconds(5))
Request the Service with a shared ptr Request type
Definition client.h:225
std::shared_ptr< Promise > SharedPromise
Definition client.h:52
typename std::shared_ptr< Response > SharedResponse
Definition client.h:50
static uint64_t RegisterChannel(const std::string &channel)
const Identity & spare_id() const
static const QosProfile QOS_PROFILE_SERVICES_DEFAULT
#define ADEBUG
Definition log.h:41
#define AERROR
Definition log.h:44
class register implement
Definition arena_queue.h:37