17#ifndef CYBER_SERVICE_CLIENT_H_
18#define CYBER_SERVICE_CLIENT_H_
26#include <unordered_map>
46template <
typename Request,
typename Response>
51 using Promise = std::promise<SharedResponse>;
62 Client(
const std::string& node_name,
const std::string& service_name)
64 node_name_(node_name),
65 request_channel_(service_name + SRV_CHANNEL_REQ_SUFFIX),
66 response_channel_(service_name + SRV_CHANNEL_RES_SUFFIX),
67 sequence_number_(0) {}
93 const std::chrono::seconds& timeout_s = std::chrono::seconds(5));
103 const Request& request,
104 const std::chrono::seconds& timeout_s = std::chrono::seconds(5));
144 template <
typename RatioT = std::milli>
146 std::chrono::duration<int64_t, RatioT>(-1)) {
148 std::chrono::duration_cast<std::chrono::nanoseconds>(timeout));
152 void HandleResponse(
const std::shared_ptr<Response>& response,
155 bool IsInit(
void)
const {
return response_receiver_ !=
nullptr; }
157 std::string node_name_;
159 std::function<void(
const std::shared_ptr<Response>&,
160 const transport::MessageInfo&)>
163 std::unordered_map<uint64_t,
164 std::tuple<SharedPromise, CallbackType, SharedFuture>>
166 std::mutex pending_requests_mutex_;
168 std::shared_ptr<transport::Transmitter<Request>> request_transmitter_;
169 std::shared_ptr<transport::Receiver<Response>> response_receiver_;
170 std::string request_channel_;
171 std::string response_channel_;
173 transport::Identity writer_id_;
174 uint64_t sequence_number_;
177template <
typename Request,
typename Response>
180template <
typename Request,
typename Response>
183 role.set_node_name(node_name_);
184 role.set_channel_name(request_channel_);
186 role.set_channel_id(channel_id);
187 role.mutable_qos_profile()->CopyFrom(
189 auto transport = transport::Transport::Instance();
190 request_transmitter_ =
192 if (request_transmitter_ ==
nullptr) {
193 AERROR <<
"Create request pub failed.";
196 writer_id_ = request_transmitter_->id();
200 std::placeholders::_1, std::placeholders::_2);
202 role.set_channel_name(response_channel_);
204 role.set_channel_id(channel_id);
205 response_receiver_ = transport->CreateReceiver<Response>(
207 [=](
const std::shared_ptr<Response>& response,
212 response_callback_(response, message_info);
215 if (response_receiver_ ==
nullptr) {
216 AERROR <<
"Create response sub failed.";
217 request_transmitter_.reset();
223template <
typename Request,
typename Response>
226 const std::chrono::seconds& timeout_s) {
230 auto future = AsyncSendRequest(request);
231 if (!future.valid()) {
234 auto status = future.wait_for(timeout_s);
235 if (status == std::future_status::ready) {
242template <
typename Request,
typename Response>
245 const std::chrono::seconds& timeout_s) {
249 auto request_ptr = std::make_shared<const Request>(request);
250 return SendRequest(request_ptr, timeout_s);
253template <
typename Request,
typename Response>
256 auto request_ptr = std::make_shared<const Request>(request);
257 return AsyncSendRequest(request_ptr);
260template <
typename Request,
typename Response>
266template <
typename Request,
typename Response>
271 std::lock_guard<std::mutex> lock(pending_requests_mutex_);
274 request_transmitter_->Transmit(request, info);
277 pending_requests_[info.
seq_num()] =
278 std::make_tuple(call_promise, std::forward<CallbackType>(cb),
f);
281 return std::shared_future<std::shared_ptr<Response>>();
285template <
typename Request,
typename Response>
290template <
typename Request,
typename Response>
292 const std::shared_ptr<Response>& response,
294 ADEBUG <<
"client recv response.";
295 std::lock_guard<std::mutex> lock(pending_requests_mutex_);
296 if (request_header.
spare_id() != writer_id_) {
299 uint64_t sequence_number = request_header.
seq_num();
300 if (this->pending_requests_.count(sequence_number) == 0) {
303 auto tuple = this->pending_requests_[sequence_number];
304 auto call_promise = std::get<0>(tuple);
305 auto callback = std::get<1>(tuple);
306 auto future = std::get<2>(tuple);
307 this->pending_requests_.erase(sequence_number);
308 call_promise->set_value(response);
bool WaitForServiceNanoseconds(std::chrono::nanoseconds time_out)
Client get Response from a responding Service by sending a Request
void Destroy()
destroy this Client
std::promise< SharedResponse > Promise
Client(const std::string &node_name, const std::string &service_name)
Construct a new Client object
bool Init()
Init the Client
bool ServiceIsReady() const
Is the Service is ready?
bool WaitForService(std::chrono::duration< int64_t, RatioT > timeout=std::chrono::duration< int64_t, RatioT >(-1))
wait for the connection with the Service established
Client()=delete
forbid Constructing a new Client object with empty params
SharedFuture AsyncSendRequest(SharedRequest request)
Send Request shared ptr asynchronously
std::function< void(SharedFuture)> CallbackType
typename std::shared_ptr< Request > SharedRequest
std::shared_future< SharedResponse > SharedFuture
SharedResponse SendRequest(SharedRequest request, const std::chrono::seconds &timeout_s=std::chrono::seconds(5))
Request the Service with a shared ptr Request type
std::shared_ptr< Promise > SharedPromise
typename std::shared_ptr< Response > SharedResponse
static uint64_t RegisterChannel(const std::string &channel)
const Identity & spare_id() const
static const QosProfile QOS_PROFILE_SERVICES_DEFAULT