libsocket
dgramoverstream.cpp
Go to the documentation of this file.
1 #include <string.h>
2 #include <unistd.h>
3 #include <string>
4 
5 /*
6  The committers of the libsocket project, all rights reserved
7  (c) 2016, dermesser <lbo@spheniscida.de>
8 
9  Redistribution and use in source and binary forms, with or without
10  modification, are permitted provided that the following conditions are met:
11 
12  1. Redistributions of source code must retain the above copyright notice,
13  this list of conditions and the following disclaimer.
14  2. Redistributions in binary form must reproduce the above copyright notice,
15  this list of conditions and the following disclaimer in the documentation
16  and/or other materials provided with the distribution.
17 
18  THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS “AS IS” AND ANY
19  EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
20  WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
21  DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE FOR ANY
22  DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
23  (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
24  LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
25  ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26  (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
27  SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28 
29 */
30 
38 #include <dgramoverstream.hpp>
39 #include <exception.hpp>
40 
41 namespace libsocket {
42 
43 dgram_over_stream::dgram_over_stream(stream_client_socket socket)
44  : inner(std::unique_ptr<stream_client_socket>(
45  new stream_client_socket(std::move(socket)))) {
46  enable_nagle(false);
47 }
48 
49 dgram_over_stream::dgram_over_stream(
50  std::unique_ptr<stream_client_socket> inner_)
51  : inner(std::move(inner_)) {
52  enable_nagle(false);
53 }
54 
66 void dgram_over_stream::enable_nagle(bool enabled) const {
67  int enabled_ = int(!enabled);
68  inner->set_sock_opt(IPPROTO_TCP, TCP_NODELAY, (const char*)&enabled_,
69  sizeof(int));
70 }
71 
72 ssize_t dgram_over_stream::sndmsg(const std::string& msg) {
73  return sndmsg(msg.c_str(), msg.size());
74 }
75 
81 ssize_t dgram_over_stream::rcvmsg(std::string* dst) {
82  uint32_t expected = receive_header();
83 
84  if (expected <= dst->size()) dst->resize(expected);
85 
86  size_t to_receive = dst->size();
87  size_t received = 0;
88 
89  while (received < to_receive) {
90  ssize_t result = receive_bytes(to_receive - received);
91 
92  if (result < 0)
93  throw socket_exception(
94  __FILE__, __LINE__,
95  "dgram_over_stream::rcvmsg(): Could not receive message!",
96  false);
97 
98  dst->replace(received, result, RECV_BUF);
99 
100  received += (size_t)result;
101  }
102 
103  // Consume remaining frame that doesn't fit into dst.
104  ssize_t rest = expected - to_receive;
105  while (rest > 0) {
106  rest -= receive_bytes(rest);
107  }
108 
109  return received;
110 }
111 
117 ssize_t dgram_over_stream::sndmsg(const std::vector<uint8_t>& msg) {
118  return sndmsg(static_cast<const void*>(msg.data()), msg.size());
119 }
120 
129 ssize_t dgram_over_stream::rcvmsg(std::vector<uint8_t>* dst) {
130  uint32_t expected = receive_header();
131 
132  if (expected <= dst->size()) dst->resize(expected);
133 
134  size_t to_receive = dst->size();
135  size_t received = 0;
136  std::vector<uint8_t>::iterator dst_iter = dst->begin();
137 
138  while (received < to_receive) {
139  ssize_t result = receive_bytes(to_receive - received);
140 
141  if (result < 0)
142  throw socket_exception(
143  __FILE__, __LINE__,
144  "dgram_over_stream::rcvmsg(): Could not receive message!",
145  false);
146 
147  for (ssize_t i = 0; i < result; i++, dst_iter++)
148  *dst_iter = RECV_BUF[i];
149 
150  received += result;
151  }
152 
153  // Consume remaining frame that doesn't fit into dst.
154  ssize_t rest = expected - to_receive;
155  while (rest > 0) {
156  rest -= receive_bytes(rest);
157  }
158 
159  return received;
160 }
161 
167 ssize_t dgram_over_stream::sndmsg(const void* buf, size_t len) {
168  encode_uint32(uint32_t(len), prefix_buffer);
169  ssize_t result = inner->snd(prefix_buffer, FRAMING_PREFIX_LENGTH, 0);
170 
171  if (result < 0) return result;
172 
173  result = inner->snd(buf, len, 0);
174 
175  if (result < 0) return result;
176 
177  return result;
178 }
179 
187 ssize_t dgram_over_stream::rcvmsg(void* dst, size_t len) {
188  uint32_t expected = receive_header();
189 
190  size_t to_receive = len < expected ? len : expected;
191  size_t received = 0;
192 
193  while (received < to_receive) {
194  ssize_t result = receive_bytes(to_receive - received);
195 
196  if (result < 0)
197  throw socket_exception(
198  __FILE__, __LINE__,
199  "dgram_over_stream::rcvmsg(): Could not receive message!",
200  false);
201 
202  memcpy(dst, RECV_BUF, result);
203  dst = static_cast<void*>(static_cast<char*>(dst) + result);
204  received += result;
205  }
206 
207  // Consume remaining frame that doesn't fit into dst.
208  ssize_t rest = expected - to_receive;
209  while (rest > 0) {
210  rest -= receive_bytes(rest);
211  }
212  return received;
213 }
214 
215 // Places up to n bytes into this->RECV_BUF.
216 ssize_t dgram_over_stream::receive_bytes(size_t n) {
217  if (n == 0) return 0;
218 
219  // Ignore rest of message.
220  ssize_t rest_len = n > RECV_BUF_SIZE ? RECV_BUF_SIZE : n;
221  size_t pos = 0;
222 
223  while (rest_len > 0) {
224  ssize_t recvd = inner->rcv(RECV_BUF + pos, rest_len, 0);
225 
226  if (recvd <= 0) return n - rest_len;
227 
228  rest_len -= recvd;
229  pos += recvd;
230  }
231 
232  return pos;
233 }
234 
241  ssize_t pos = 0;
242 
243  do {
244  ssize_t result =
245  inner->rcv(prefix_buffer + pos, FRAMING_PREFIX_LENGTH, 0);
246 
247  if (result < 0)
248  throw socket_exception(__FILE__, __LINE__,
249  "dgram_over_stream::receive_header(): Could "
250  "not receive length prefix!",
251  false);
252 
253  pos += result;
254  } while (pos < 4);
255 
256  return decode_uint32(prefix_buffer);
257 }
258 } // namespace libsocket
259 
Contains libsocket elements.
Definition: dgramclient.hpp:41
void enable_nagle(bool enable) const
Set TCP_NODELAY to !enabled on the underlying socket.
uint32_t receive_header(void)
Receive and decode length header.
ssize_t rcvmsg(void *dst, size_t len)
Receive a message and store the first len bytes into buf.
This class is instantiated and thrown when an error occurs. If there's an error somewhere in libsocke...
Definition: exception.hpp:52
ssize_t sndmsg(const void *buf, size_t len)
Send the message in buf with length len as one frame.