Apollo 10.0
自动驾驶开放平台
service.h
浏览该文件的文档.
1/******************************************************************************
2 * Copyright 2018 The Apollo Authors. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
16
17#ifndef CYBER_SERVICE_SERVICE_H_
18#define CYBER_SERVICE_SERVICE_H_
19
20#include <list>
21#include <memory>
22#include <string>
23#include <utility>
24
25#include "cyber/common/types.h"
29
30namespace apollo {
31namespace cyber {
32
41template <typename Request, typename Response>
42class Service : public ServiceBase {
43 public:
44 using ServiceCallback = std::function<void(const std::shared_ptr<Request>&,
45 std::shared_ptr<Response>&)>;
53 Service(const std::string& node_name, const std::string& service_name,
54 const ServiceCallback& service_callback)
56 node_name_(node_name),
57 service_callback_(service_callback),
58 request_channel_(service_name + SRV_CHANNEL_REQ_SUFFIX),
59 response_channel_(service_name + SRV_CHANNEL_RES_SUFFIX) {}
60
68 Service(const std::string& node_name, const std::string& service_name,
69 ServiceCallback&& service_callback)
71 node_name_(node_name),
72 service_callback_(service_callback),
73 request_channel_(service_name + SRV_CHANNEL_REQ_SUFFIX),
74 response_channel_(service_name + SRV_CHANNEL_RES_SUFFIX) {}
75
79 Service() = delete;
80
82
86 bool Init();
87
91 void destroy();
92
93 private:
94 void HandleRequest(const std::shared_ptr<Request>& request,
95 const transport::MessageInfo& message_info);
96
97 void SendResponse(const transport::MessageInfo& message_info,
98 const std::shared_ptr<Response>& response);
99
100 bool IsInit(void) const { return request_receiver_ != nullptr; }
101
102 std::string node_name_;
103 ServiceCallback service_callback_;
104
105 std::function<void(const std::shared_ptr<Request>&,
106 const transport::MessageInfo&)>
107 request_callback_;
108 std::shared_ptr<transport::Transmitter<Response>> response_transmitter_;
109 std::shared_ptr<transport::Receiver<Request>> request_receiver_;
110 std::string request_channel_;
111 std::string response_channel_;
112 std::mutex service_handle_request_mutex_;
113
114 volatile bool inited_ = false;
115 void Enqueue(std::function<void()>&& task);
116 void Process();
117 std::thread thread_;
118 std::mutex queue_mutex_;
119 std::condition_variable condition_;
120 std::list<std::function<void()>> tasks_;
121};
122
123template <typename Request, typename Response>
125 inited_ = false;
126 {
127 std::lock_guard<std::mutex> lg(queue_mutex_);
128 this->tasks_.clear();
129 }
130 condition_.notify_all();
131 if (thread_.joinable()) {
132 thread_.join();
133 }
134}
135
136template <typename Request, typename Response>
137inline void Service<Request, Response>::Enqueue(std::function<void()>&& task) {
138 std::lock_guard<std::mutex> lg(queue_mutex_);
139 tasks_.emplace_back(std::move(task));
140 condition_.notify_one();
141}
142
143template <typename Request, typename Response>
144void Service<Request, Response>::Process() {
145 while (!cyber::IsShutdown()) {
146 std::unique_lock<std::mutex> ul(queue_mutex_);
147 condition_.wait(ul, [this]() { return !inited_ || !this->tasks_.empty(); });
148 if (!inited_) {
149 break;
150 }
151 if (!tasks_.empty()) {
152 auto task = tasks_.front();
153 tasks_.pop_front();
154 ul.unlock();
155 task();
156 }
157 }
158}
159
160template <typename Request, typename Response>
162 if (IsInit()) {
163 return true;
164 }
166 role.set_node_name(node_name_);
167 role.set_channel_name(response_channel_);
168 auto channel_id = common::GlobalData::RegisterChannel(response_channel_);
169 role.set_channel_id(channel_id);
170 role.mutable_qos_profile()->CopyFrom(
172 auto transport = transport::Transport::Instance();
173 response_transmitter_ =
174 transport->CreateTransmitter<Response>(role, proto::OptionalMode::RTPS);
175 if (response_transmitter_ == nullptr) {
176 AERROR << " Create response pub failed.";
177 return false;
178 }
179
180 request_callback_ =
182 std::placeholders::_1, std::placeholders::_2);
183
184 role.set_channel_name(request_channel_);
185 channel_id = common::GlobalData::RegisterChannel(request_channel_);
186 role.set_channel_id(channel_id);
187 request_receiver_ = transport->CreateReceiver<Request>(
188 role,
189 [=](const std::shared_ptr<Request>& request,
190 const transport::MessageInfo& message_info,
191 const proto::RoleAttributes& reader_attr) {
192 (void)reader_attr;
193 auto task = [this, request, message_info]() {
194 this->HandleRequest(request, message_info);
195 };
196 Enqueue(std::move(task));
197 },
199 inited_ = true;
200 thread_ = std::thread(&Service<Request, Response>::Process, this);
201 if (request_receiver_ == nullptr) {
202 AERROR << " Create request sub failed." << request_channel_;
203 response_transmitter_.reset();
204 return false;
205 }
206 return true;
207}
208
209template <typename Request, typename Response>
211 const std::shared_ptr<Request>& request,
212 const transport::MessageInfo& message_info) {
213 if (!IsInit()) {
214 // LOG_DEBUG << "not inited error.";
215 return;
216 }
217 ADEBUG << "handling request:" << request_channel_;
218 std::lock_guard<std::mutex> lk(service_handle_request_mutex_);
219 auto response = std::make_shared<Response>();
220 service_callback_(request, response);
221 transport::MessageInfo msg_info(message_info);
222 msg_info.set_sender_id(response_transmitter_->id());
223 SendResponse(msg_info, response);
224}
225
226template <typename Request, typename Response>
227void Service<Request, Response>::SendResponse(
228 const transport::MessageInfo& message_info,
229 const std::shared_ptr<Response>& response) {
230 if (!IsInit()) {
231 // LOG_DEBUG << "not inited error.";
232 return;
233 }
234 // publish return value ?
235 // LOG_DEBUG << "send response id:" << message_id.sequence_number;
236 response_transmitter_->Transmit(response, message_info);
237}
238
239} // namespace cyber
240} // namespace apollo
241
242#endif // CYBER_SERVICE_SERVICE_H_
Base class for Service
const std::string & service_name() const
Get the service name
Service handles Request from the Client, and send a Response to it.
Definition service.h:42
Service(const std::string &node_name, const std::string &service_name, const ServiceCallback &service_callback)
Construct a new Service object
Definition service.h:53
bool Init()
Init the Service
Definition service.h:161
Service(const std::string &node_name, const std::string &service_name, ServiceCallback &&service_callback)
Construct a new Service object
Definition service.h:68
Service()=delete
Forbid default constructing
void destroy()
Destroy the Service
Definition service.h:124
std::function< void(const std::shared_ptr< Request > &, std::shared_ptr< Response > &)> ServiceCallback
Definition service.h:45
static uint64_t RegisterChannel(const std::string &channel)
static const QosProfile QOS_PROFILE_SERVICES_DEFAULT
#define ADEBUG
Definition log.h:41
#define AERROR
Definition log.h:44
bool IsShutdown()
Definition state.h:46
class register implement
Definition arena_queue.h:37