Apollo 10.0
自动驾驶开放平台
session.cc
浏览该文件的文档.
1/******************************************************************************
2 * Copyright 2018 The Apollo Authors. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *****************************************************************************/
16
17#include "cyber/io/session.h"
18
19#include "cyber/common/log.h"
20
21namespace apollo {
22namespace cyber {
23namespace io {
24
26
27Session::Session(int fd) : fd_(fd), poll_handler_(nullptr) {
28 poll_handler_.reset(new PollHandler(fd_));
29}
30
31int Session::Socket(int domain, int type, int protocol) {
32 if (fd_ != -1) {
33 AINFO << "session has hold a valid fd[" << fd_ << "]";
34 return -1;
35 }
36 int sock_fd = socket(domain, type | SOCK_NONBLOCK, protocol);
37 if (sock_fd != -1) {
38 set_fd(sock_fd);
39 }
40 return sock_fd;
41}
42
43int Session::Listen(int backlog) {
44 ACHECK(fd_ != -1);
45 return listen(fd_, backlog);
46}
47
48int Session::Bind(const struct sockaddr *addr, socklen_t addrlen) {
49 ACHECK(fd_ != -1);
50 ACHECK(addr != nullptr);
51 return bind(fd_, addr, addrlen);
52}
53
54auto Session::Accept(struct sockaddr *addr, socklen_t *addrlen) -> SessionPtr {
55 ACHECK(fd_ != -1);
56
57 int sock_fd = accept4(fd_, addr, addrlen, SOCK_NONBLOCK);
58 while (sock_fd == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
59 poll_handler_->Block(-1, true);
60 sock_fd = accept4(fd_, addr, addrlen, SOCK_NONBLOCK);
61 }
62
63 if (sock_fd == -1) {
64 return nullptr;
65 }
66
67 return std::make_shared<Session>(sock_fd);
68}
69
70int Session::Connect(const struct sockaddr *addr, socklen_t addrlen) {
71 ACHECK(fd_ != -1);
72
73 int optval;
74 socklen_t optlen = sizeof(optval);
75 int res = connect(fd_, addr, addrlen);
76 if (res == -1 && errno == EINPROGRESS) {
77 poll_handler_->Block(-1, false);
78 getsockopt(fd_, SOL_SOCKET, SO_ERROR, reinterpret_cast<void *>(&optval),
79 &optlen);
80 if (optval == 0) {
81 res = 0;
82 } else {
83 errno = optval;
84 }
85 }
86 return res;
87}
88
90 ACHECK(fd_ != -1);
91
92 poll_handler_->Unblock();
93 int res = close(fd_);
94 fd_ = -1;
95 return res;
96}
97
98ssize_t Session::Recv(void *buf, size_t len, int flags, int timeout_ms) {
99 ACHECK(buf != nullptr);
100 ACHECK(fd_ != -1);
101
102 ssize_t nbytes = recv(fd_, buf, len, flags);
103 if (timeout_ms == 0) {
104 return nbytes;
105 }
106
107 while (nbytes == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
108 if (poll_handler_->Block(timeout_ms, true)) {
109 nbytes = recv(fd_, buf, len, flags);
110 }
111 if (timeout_ms > 0) {
112 break;
113 }
114 }
115 return nbytes;
116}
117
118ssize_t Session::RecvFrom(void *buf, size_t len, int flags,
119 struct sockaddr *src_addr, socklen_t *addrlen,
120 int timeout_ms) {
121 ACHECK(buf != nullptr);
122 ACHECK(fd_ != -1);
123
124 ssize_t nbytes = recvfrom(fd_, buf, len, flags, src_addr, addrlen);
125 if (timeout_ms == 0) {
126 return nbytes;
127 }
128
129 while (nbytes == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
130 if (poll_handler_->Block(timeout_ms, true)) {
131 nbytes = recvfrom(fd_, buf, len, flags, src_addr, addrlen);
132 }
133 if (timeout_ms > 0) {
134 break;
135 }
136 }
137 return nbytes;
138}
139
140ssize_t Session::Send(const void *buf, size_t len, int flags, int timeout_ms) {
141 ACHECK(buf != nullptr);
142 ACHECK(fd_ != -1);
143
144 ssize_t nbytes = send(fd_, buf, len, flags);
145 if (timeout_ms == 0) {
146 return nbytes;
147 }
148
149 while ((nbytes == -1) && (errno == EAGAIN || errno == EWOULDBLOCK)) {
150 if (poll_handler_->Block(timeout_ms, false)) {
151 nbytes = send(fd_, buf, len, flags);
152 }
153 if (timeout_ms > 0) {
154 break;
155 }
156 }
157 return nbytes;
158}
159
160ssize_t Session::SendTo(const void *buf, size_t len, int flags,
161 const struct sockaddr *dest_addr, socklen_t addrlen,
162 int timeout_ms) {
163 ACHECK(buf != nullptr);
164 ACHECK(dest_addr != nullptr);
165 ACHECK(fd_ != -1);
166
167 ssize_t nbytes = sendto(fd_, buf, len, flags, dest_addr, addrlen);
168 if (timeout_ms == 0) {
169 return nbytes;
170 }
171
172 while ((nbytes == -1) && (errno == EAGAIN || errno == EWOULDBLOCK)) {
173 if (poll_handler_->Block(timeout_ms, false)) {
174 nbytes = sendto(fd_, buf, len, flags, dest_addr, addrlen);
175 }
176 if (timeout_ms > 0) {
177 break;
178 }
179 }
180 return nbytes;
181}
182
183ssize_t Session::Read(void *buf, size_t count, int timeout_ms) {
184 ACHECK(buf != nullptr);
185 ACHECK(fd_ != -1);
186
187 ssize_t nbytes = read(fd_, buf, count);
188 if (timeout_ms == 0) {
189 return nbytes;
190 }
191
192 while ((nbytes == -1) && (errno == EAGAIN || errno == EWOULDBLOCK)) {
193 if (poll_handler_->Block(timeout_ms, true)) {
194 nbytes = read(fd_, buf, count);
195 }
196 if (timeout_ms > 0) {
197 break;
198 }
199 }
200 return nbytes;
201}
202
203ssize_t Session::Write(const void *buf, size_t count, int timeout_ms) {
204 ACHECK(buf != nullptr);
205 ACHECK(fd_ != -1);
206
207 ssize_t nbytes = write(fd_, buf, count);
208 if (timeout_ms == 0) {
209 return nbytes;
210 }
211
212 while ((nbytes == -1) && (errno == EAGAIN || errno == EWOULDBLOCK)) {
213 if (poll_handler_->Block(timeout_ms, false)) {
214 nbytes = write(fd_, buf, count);
215 }
216 if (timeout_ms > 0) {
217 break;
218 }
219 }
220 return nbytes;
221}
222
223} // namespace io
224} // namespace cyber
225} // namespace apollo
ssize_t RecvFrom(void *buf, size_t len, int flags, struct sockaddr *src_addr, socklen_t *addrlen, int timeout_ms=-1)
Definition session.cc:118
ssize_t Write(const void *buf, size_t count, int timeout_ms=-1)
Definition session.cc:203
std::shared_ptr< Session > SessionPtr
Definition session.h:35
ssize_t SendTo(const void *buf, size_t len, int flags, const struct sockaddr *dest_addr, socklen_t addrlen, int timeout_ms=-1)
Definition session.cc:160
int Listen(int backlog)
Definition session.cc:43
int Bind(const struct sockaddr *addr, socklen_t addrlen)
Definition session.cc:48
ssize_t Recv(void *buf, size_t len, int flags, int timeout_ms=-1)
Definition session.cc:98
ssize_t Read(void *buf, size_t count, int timeout_ms=-1)
Definition session.cc:183
SessionPtr Accept(struct sockaddr *addr, socklen_t *addrlen)
Definition session.cc:54
ssize_t Send(const void *buf, size_t len, int flags, int timeout_ms=-1)
Definition session.cc:140
int Connect(const struct sockaddr *addr, socklen_t addrlen)
Definition session.cc:70
int Socket(int domain, int type, int protocol)
Definition session.cc:31
#define ACHECK(cond)
Definition log.h:80
#define AINFO
Definition log.h:42
class register implement
Definition arena_queue.h:37