19#include <netinet/in.h>
20#include <sys/socket.h>
34 typedef uint16_t be16_t;
35 typedef uint32_t be32_t;
38 UdpStream(
const char* address, uint16_t port, uint32_t timeout_usec);
43 virtual size_t read(uint8_t* buffer,
size_t max_length);
44 virtual size_t write(
const uint8_t* data,
size_t length);
50 be16_t peer_port_ = 0;
51 be32_t peer_addr_ = 0;
52 uint32_t timeout_usec_ = 0;
58 uint32_t timeout_usec) {
59 return new UdpStream(address, port, timeout_usec);
62UdpStream::UdpStream(
const char* address, uint16_t port, uint32_t timeout_usec)
63 : sockfd_(-1), errno_(0) {
64 peer_addr_ = inet_addr(address);
65 peer_port_ = htons(port);
66 timeout_usec_ = timeout_usec;
72void UdpStream::open() {
73 int fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
76 AERROR <<
"Create socket failed, errno: " << errno <<
", "
81 struct sockaddr_in peer_sockaddr;
82 socklen_t socklenth =
sizeof(peer_sockaddr);
83 bzero(&peer_sockaddr,
sizeof(peer_sockaddr));
84 peer_sockaddr.sin_family = AF_INET;
85 peer_sockaddr.sin_port = peer_port_;
86 peer_sockaddr.sin_addr.s_addr = INADDR_ANY;
87 if (bind(fd, (
struct sockaddr*)&peer_sockaddr,
sizeof(peer_sockaddr)) == -1) {
88 AERROR <<
" Socket bind failed! Port " << peer_port_;
94 if (timeout_usec_ != 0) {
95 int flags = fcntl(fd, F_GETFL, 0);
98 AERROR <<
"fcntl get flag failed, errno: " << errno <<
", "
103 if (fcntl(fd, F_SETFL, flags & ~O_NONBLOCK) == -1) {
105 AERROR <<
"fcntl set block failed, errno: " << errno <<
", "
110 struct timeval block_to = {timeout_usec_ / 1000000,
111 timeout_usec_ % 1000000};
112 if (setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO,
113 reinterpret_cast<char*
>(&block_to),
sizeof(block_to)) < 0) {
115 AERROR <<
"setsockopt set rcv timeout failed, errno: " << errno <<
", "
120 if (setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO,
121 reinterpret_cast<char*
>(&block_to),
sizeof(block_to)) < 0) {
123 AERROR <<
"setsockopt set snd timeout failed, errno: " << errno <<
", "
128 int flags = fcntl(fd, F_GETFL, 0);
131 AERROR <<
"fcntl get flag failed, errno: " << errno <<
", "
136 if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) {
138 AERROR <<
"fcntl set non block failed, errno: " << errno <<
", "
147void UdpStream::close() {
185 struct sockaddr_in peer_sockaddr;
186 socklen_t socklenth =
sizeof(peer_sockaddr);
187 bzero(&peer_sockaddr,
sizeof(peer_sockaddr));
188 peer_sockaddr.sin_family = AF_INET;
189 peer_sockaddr.sin_port = peer_port_;
190 peer_sockaddr.sin_addr.s_addr = htonl(INADDR_ANY);
192 while ((ret = ::recvfrom(sockfd_, buffer, max_length, 0,
193 (
struct sockaddr*)&peer_sockaddr,
194 reinterpret_cast<socklen_t*
>(&socklenth))) < 0) {
195 if (errno == EINTR) {
199 if (errno != EAGAIN) {
212 size_t total_nsent = 0;
213 struct sockaddr_in peer_sockaddr;
214 bzero(&peer_sockaddr,
sizeof(peer_sockaddr));
215 peer_sockaddr.sin_family = AF_INET;
216 peer_sockaddr.sin_port = peer_port_;
217 peer_sockaddr.sin_addr.s_addr = peer_addr_;
221 ::sendto(sockfd_, data, length, 0, (
struct sockaddr*)&peer_sockaddr,
222 (socklen_t)
sizeof(peer_sockaddr));
224 if (errno == EINTR) {
228 if (errno == EPIPE || errno == ECONNRESET) {
231 }
else if (errno != EAGAIN) {
239 total_nsent += nsent;
static Stream * create_udp(const char *address, uint16_t port, uint32_t timeout_usec=1000000)
virtual bool Disconnect()
virtual size_t read(uint8_t *buffer, size_t max_length)
virtual size_t write(const uint8_t *data, size_t length)