19#include <netinet/in.h>
20#include <netinet/tcp.h>
21#include <sys/socket.h>
37TcpStream::TcpStream(
const char* address, uint16_t port, uint32_t timeout_usec,
39 : sockfd_(-1), errno_(0), auto_reconnect_(auto_reconnect) {
40 peer_addr_ = inet_addr(address);
41 peer_port_ = htons(port);
42 timeout_usec_ = timeout_usec;
47void TcpStream::open() {
48 int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
51 AERROR <<
"create socket failed, errno: " << errno <<
", "
59bool TcpStream::InitSocket() {
65 if (timeout_usec_ != 0) {
66 int flags = fcntl(sockfd_, F_GETFL, 0);
69 AERROR <<
"fcntl get flag failed, error: " << strerror(errno);
73 if (fcntl(sockfd_, F_SETFL, flags & ~O_NONBLOCK) == -1) {
75 AERROR <<
"fcntl set block failed, error: " << strerror(errno);
79 timeval block_to = {timeout_usec_ / 1000000, timeout_usec_ % 1000000};
80 if (setsockopt(sockfd_, SOL_SOCKET, SO_RCVTIMEO, &block_to,
81 sizeof(block_to)) < 0) {
83 AERROR <<
"setsockopt set rcv timeout failed, error: " << strerror(errno);
87 if (setsockopt(sockfd_, SOL_SOCKET, SO_SNDTIMEO, &block_to,
88 sizeof(block_to)) < 0) {
90 AERROR <<
"setsockopt set snd timeout failed, error: " << strerror(errno);
94 int flags = fcntl(sockfd_, F_GETFL, 0);
97 AERROR <<
"fcntl get flag failed, error: " << strerror(errno);
101 if (fcntl(sockfd_, F_SETFL, flags | O_NONBLOCK) == -1) {
103 AERROR <<
"fcntl set non block failed, error: " << strerror(errno);
111 ret = setsockopt(sockfd_, IPPROTO_TCP, TCP_NODELAY,
112 reinterpret_cast<void*
>(&enable),
sizeof(enable));
115 AERROR <<
"setsockopt disable Nagle failed, errno: " << errno <<
", "
123void TcpStream::close() {
131bool TcpStream::Reconnect() {
132 if (auto_reconnect_) {
155 timeval timeo = {10, 0};
157 sockaddr_in peer_addr;
159 bzero(&peer_addr,
sizeof(peer_addr));
160 peer_addr.sin_family = AF_INET;
161 peer_addr.sin_port = peer_port_;
162 peer_addr.sin_addr.s_addr = peer_addr_;
164 int fd_flags = fcntl(sockfd_, F_GETFL);
165 if (fd_flags < 0 || fcntl(sockfd_, F_SETFL, fd_flags | O_NONBLOCK) < 0) {
166 AERROR <<
"Failed to set noblock, error: " << strerror(errno);
170 while ((ret = ::connect(sockfd_,
reinterpret_cast<sockaddr*
>(&peer_addr),
171 sizeof(peer_addr))) < 0) {
172 if (errno == EINTR) {
173 AINFO <<
"Tcp connect return EINTR, continue.";
176 if ((errno != EISCONN) && (errno != EINPROGRESS) && (errno != EALREADY)) {
179 AERROR <<
"Connect failed, error: " << strerror(errno);
184 FD_SET(sockfd_, &fds);
185 ret = select(sockfd_ + 1, NULL, &fds, NULL, &timeo);
189 AERROR <<
"Wait connect failed, error: " << strerror(errno);
191 }
else if (ret == 0) {
192 AINFO <<
"Tcp connect timeout.";
194 }
else if (FD_ISSET(sockfd_, &fds)) {
196 socklen_t len =
sizeof(int);
198 if (getsockopt(sockfd_, SOL_SOCKET, SO_ERROR, &error, &len) < 0) {
201 AERROR <<
"Getsockopt failed, error: " << strerror(errno);
207 AERROR <<
"Socket error: " << strerror(errno);
216 AERROR <<
"Should not be here.";
226 AERROR <<
"Failed to init socket.";
229 AINFO <<
"Tcp connect success.";
255 if (!Readable(10000)) {
259 while ((ret = ::recv(sockfd_, buffer, max_length, 0)) < 0) {
260 if (errno == EINTR) {
264 if (errno != EAGAIN) {
267 AERROR <<
"Read errno " << errno <<
", error " << strerror(errno);
277 AERROR <<
"Remote closed.";
279 AINFO <<
"Reconnect tcp success.";
287 size_t total_nsent = 0;
297 ssize_t nsent = ::send(sockfd_, buffer, length, 0);
299 if (errno == EINTR) {
303 if (errno == EPIPE || errno == ECONNRESET) {
306 }
else if (errno != EAGAIN) {
314 total_nsent += nsent;
322bool TcpStream::Readable(uint32_t timeout_us) {
327 FD_SET(sockfd_, &readfds);
329 timeout_ts.tv_sec = timeout_us / 1000000;
330 timeout_ts.tv_nsec = (timeout_us % 1000000) * 1000;
331 int r = pselect(sockfd_ + 1, &readfds, NULL, NULL, &timeout_ts, NULL);
335 AERROR <<
"Failed to wait tcp data: " << errno <<
", " << strerror(errno);
337 }
else if (r == 0 || !FD_ISSET(sockfd_, &readfds)) {
345 uint32_t timeout_usec) {
346 return new TcpStream(address, port, timeout_usec);
static Stream * create_tcp(const char *address, uint16_t port, uint32_t timeout_usec=1000000)
virtual size_t read(uint8_t *buffer, size_t max_length)
virtual size_t write(const uint8_t *data, size_t length)
virtual bool Disconnect()