17#ifndef CYBER_SERVICE_SERVICE_H_
18#define CYBER_SERVICE_SERVICE_H_
41template <
typename Request,
typename Response>
45 std::shared_ptr<Response>&)>;
56 node_name_(node_name),
57 service_callback_(service_callback),
59 response_channel_(
service_name + SRV_CHANNEL_RES_SUFFIX) {}
71 node_name_(node_name),
72 service_callback_(service_callback),
74 response_channel_(
service_name + SRV_CHANNEL_RES_SUFFIX) {}
94 void HandleRequest(
const std::shared_ptr<Request>& request,
98 const std::shared_ptr<Response>& response);
100 bool IsInit(
void)
const {
return request_receiver_ !=
nullptr; }
102 std::string node_name_;
105 std::function<void(
const std::shared_ptr<Request>&,
106 const transport::MessageInfo&)>
108 std::shared_ptr<transport::Transmitter<Response>> response_transmitter_;
109 std::shared_ptr<transport::Receiver<Request>> request_receiver_;
110 std::string request_channel_;
111 std::string response_channel_;
112 std::mutex service_handle_request_mutex_;
114 volatile bool inited_ =
false;
115 void Enqueue(std::function<
void()>&& task);
118 std::mutex queue_mutex_;
119 std::condition_variable condition_;
120 std::list<std::function<void()>> tasks_;
123template <
typename Request,
typename Response>
127 std::lock_guard<std::mutex> lg(queue_mutex_);
128 this->tasks_.clear();
130 condition_.notify_all();
131 if (thread_.joinable()) {
136template <
typename Request,
typename Response>
138 std::lock_guard<std::mutex> lg(queue_mutex_);
139 tasks_.emplace_back(std::move(task));
140 condition_.notify_one();
143template <
typename Request,
typename Response>
144void Service<Request, Response>::Process() {
146 std::unique_lock<std::mutex> ul(queue_mutex_);
147 condition_.wait(ul, [
this]() {
return !inited_ || !this->tasks_.empty(); });
151 if (!tasks_.empty()) {
152 auto task = tasks_.front();
160template <
typename Request,
typename Response>
166 role.set_node_name(node_name_);
167 role.set_channel_name(response_channel_);
169 role.set_channel_id(channel_id);
170 role.mutable_qos_profile()->CopyFrom(
172 auto transport = transport::Transport::Instance();
173 response_transmitter_ =
175 if (response_transmitter_ ==
nullptr) {
176 AERROR <<
" Create response pub failed.";
182 std::placeholders::_1, std::placeholders::_2);
184 role.set_channel_name(request_channel_);
186 role.set_channel_id(channel_id);
187 request_receiver_ = transport->CreateReceiver<Request>(
189 [=](
const std::shared_ptr<Request>& request,
193 auto task = [
this, request, message_info]() {
194 this->HandleRequest(request, message_info);
196 Enqueue(std::move(task));
201 if (request_receiver_ ==
nullptr) {
202 AERROR <<
" Create request sub failed." << request_channel_;
203 response_transmitter_.reset();
209template <
typename Request,
typename Response>
211 const std::shared_ptr<Request>& request,
217 ADEBUG <<
"handling request:" << request_channel_;
218 std::lock_guard<std::mutex> lk(service_handle_request_mutex_);
219 auto response = std::make_shared<Response>();
220 service_callback_(request, response);
221 transport::MessageInfo msg_info(message_info);
222 msg_info.set_sender_id(response_transmitter_->id());
223 SendResponse(msg_info, response);
226template <
typename Request,
typename Response>
227void Service<Request, Response>::SendResponse(
228 const transport::MessageInfo& message_info,
229 const std::shared_ptr<Response>& response) {
236 response_transmitter_->Transmit(response, message_info);
const std::string & service_name() const
Get the service name
Service handles Request from the Client, and send a Response to it.
Service(const std::string &node_name, const std::string &service_name, const ServiceCallback &service_callback)
Construct a new Service object
bool Init()
Init the Service
Service(const std::string &node_name, const std::string &service_name, ServiceCallback &&service_callback)
Construct a new Service object
Service()=delete
Forbid default constructing
void destroy()
Destroy the Service
std::function< void(const std::shared_ptr< Request > &, std::shared_ptr< Response > &)> ServiceCallback
static uint64_t RegisterChannel(const std::string &channel)
static const QosProfile QOS_PROFILE_SERVICES_DEFAULT