[asio] continuous reads into streambuf
Hello all, once again I find myself struggling with asio streambufs but wanted to ask here before chicken out to char buffers again. My problem is that I cannot read more than one protocol loop successfully. I got a TCP connection here that continuously receives messages on a simple protocol like that: header_size CRLF header (contains payload size) payload <repeat> I thought this is a prime use case for reading into a streambuf as I can easily deal with additional data being in there and the client pushing as fast as it can. Here is what I do (simplified quite a bit, sorry for copy and paste and simplify errors): (m_socket and m_streambuf are only used by one io_context, one thread, no races as far as I can tell) void read_header_size() { asio::async_read_until(m_socket, m_streambuf, "\r\n", [self](const boost::system::error_code, const std::size_t) { // < handle error skipped > std::string hsizestr; hsizestr.reserve(32); { std::istream is(&self->m_streambuf); std::getline(is, hsizestr); } std::size_t header_size = parse(hsizestr); self->read_header(header_size); }); } void read_header(const std::size_t n_header_size) { asio::async_read(m_socket, m_streambuf, boost::asio::transfer_exactly(n_header_size), [self, packet, n_header_size](const boost::system::error_code, const std::size_t) { // < handle error skipped > // Tried two versions of parsing the header (manual consume and istreams of various sorts) // manual consume: size_t payload_size = parse_header(n_streambuf.data().data(), n_header_size)) { n_streambuf.consume(n_header_size); self->read_payload(payload_size); }); } void read_payload(const std::size_t n_payload_size) { asio::async_read(m_socket, m_streambuf, boost::asio::transfer_exactly(payload_size), [self, n_packet, packet_size](const boost::system::error_code, const std::size_t) { // < handle error skipped > { std::istream is(&self->m_streambuf, std::ios::binary); is.read(target_buffer, target_buffer_size); // target buffer is elsewhere and well guarded } // read next message self->read_header_size(); }); } When I run this and make sure the client pushes at least 2 packets I can safely receive and process the first but after that the next asnc_read() in read_header never returns. As in never calls its handler. From debugging and looking at the streambuf I can reasonably assume the buffer contains all the data after the first async_read_until because the packages are very small (only about 60 bytes total). This leads me to believe the transfer_exactly don't return unless something is actually really transferred. But if such is the case, why does the first loop succeed? I am quite lost here. If anybody has some suggestion it is much appreciated. Thanks a bunch! Stephan
On Thu, May 24, 2018 at 7:59 AM, Stephan Menzel via Boost-users
I am quite lost here. If anybody has some suggestion it is much appreciated.
I have no idea why your code is malfunctioning because I am not familiar with using std streams with asio (although I know asio pretty well, asio::basic_streambuf in particular). However, let me propose a wild idea. Set your current code aside for the moment and reimplement it a different way. Instead of doing three asynchronous reads, call async_read_some once into your basic_streambuf. Use a reasonably large size in the call to prepare (say, 2048 or higher). When you have data in your buffer, inspect the contents and determine if you have a complete message. If not, then keep looping and calling async_read_some until your buffer has an entire message. Then extract the message, consume the bytes used, and repeat. When extracting the bytes do not use asynchronous I/O. What, you ask, is the purpose of all this? Calls to asynchronous initiating functions are VERY expensive. Calling async_read_until to get some data, then async_read to get the header, then async_read to get the body, is going to kill performance. Instead, you should be using async_read_some to transfer into your streambuf, all the existing data that has already been received into the socket's kernel buffer. It is possible that you will get more than one message into your buffer when you do this (if your protocol is full-duplex). This is great when it happens because you are getting the most work out of each call to async_read_some (which as I stated, is quite expensive). In the absence of any other information about why your current code is malfunctioning, it is possible that this reimplementation will not only make your code perform better and become easier to reason about, but might also solve your unknown bug. Regards
Hello Vinnie, thanks for your reply On Thu, May 24, 2018 at 5:22 PM, Vinnie Falco via Boost-users < boost-users@lists.boost.org> wrote:
However, let me propose a wild idea. Set your current code aside for the moment and reimplement it a different way. Instead of doing three asynchronous reads, call async_read_some once into your basic_streambuf. Use a reasonably large size in the call to prepare (say, 2048 or higher). When you have data in your buffer, inspect the contents and determine if you have a complete message. If not, then keep looping and calling async_read_some until your buffer has an entire message. Then extract the message, consume the bytes used, and repeat. When extracting the bytes do not use asynchronous I/O.
Not a wild idea at all. Yet a see a two problems with it. This pretty much means taking over what asio calls 'composed operations' myself. Instead of letting asio do read_some calls and figure out when I have, say, a line completely read (until CRL), I read and figure it out myself. Which is very much what I wanted to avoid by going the streambuf approach in the first place. The second issue to me is the inspection of the contents. The packets are of variable size and can theoretically (though not practically) range up to about 64k. I don't know how large the payload is until I read and parsed the header. Which means I would hav to do just that: Inspect the contents of the buffer. Which I have no idea how to do. Over the years, this is not the first time I'm trying this approach and I always folded when it came to inspecting the contents of a streambuf. I simply don't know how this can be done. I can't even reliably do it in the debugger. Sure, I see data but which is input and which is output and where are all those position markers and what would I really get if I read... I guess you are right though. An new implementation using fixed buffers would probably solve the bug as well. As it always did in the past. I guess I just wanted to be fancy again. Thanks for your suggestion... Stephan
On Thu, May 24, 2018 at 10:49 AM, Stephan Menzel via Boost-users
This pretty much means taking over what asio calls 'composed operations'
You don't have to write a composed operation and accompanying initiating function, you can do this right in the code you already have. Just replace `async_read_until` with `stream.async_read_some` and put that in a loop.
...I always folded when it came to inspecting the contents of a streambuf. I simply don't know how this can be done.
There are a few approaches. The easiest solution is to convert the readable bytes of the stream buffer to a pair of iterators using these asio utility functions, and apply std algorithms (or your own) to that range: https://www.boost.org/doc/libs/1_67_0/doc/html/boost_asio/reference/buffers_... https://www.boost.org/doc/libs/1_67_0/doc/html/boost_asio/reference/buffers_... If you are using Boost.Asio 1.67, you can adapt a std::string into a dynamic buffer using dynamic_string_buffer: https://www.boost.org/doc/libs/1_67_0/doc/html/boost_asio/reference/dynamic_... Once you have the message in a std::string it should be pretty straightforward to use string algorithms on it. Hope this helps!
On 25/05/2018 02:59, Stephan Menzel wrote:
once again I find myself struggling with asio streambufs but wanted to ask here before chicken out to char buffers again. My problem is that I cannot read more than one protocol loop successfully. [...] asio::async_read_until(m_socket, m_streambuf, "\r\n", [...] asio::async_read(m_socket, m_streambuf, boost::asio::transfer_exactly(n_header_size),
Mixing read_until and read on the same socket is problematic, because the way they actually behave -- while not actually wrong -- is not what you first expect. There are two key bits of information you need to realise the problem: 1. async_read_until actually calls async_read_some under the hood to read an arbitrary amount of data from the socket. All the data is stored in the streambuf and only a *subset* of that length is returned to indicate where the terminator was found. 2. async_read just calls async_read_some directly with the specified length request. In particular, even if #1 already read the entire message into the streambuf, #2 will ignore that and still wait for the specified number of new incoming bytes -- even if you are reading into the same streambuf. The solution to this is pretty straightforward, once you realise that you need to do it: Instead of calling async_read with the total number of bytes you are expecting in the message body, first check how many bytes are already available in the streambuf (via size()), and only issue a read for the balance (and if the balance is 0 or negative, then obviously don't issue the read at all). In an ideal world, async_read would do this for you, but it doesn't. (And it does make sense that it doesn't, once you think about it -- otherwise multiple consecutive reads without consuming the data would just return the same data over and over instead of appending it.)
Mere moments ago, quoth I:
Instead of calling async_read with the total number of bytes you are expecting in the message body, first check how many bytes are already available in the streambuf (via size()), and only issue a read for the balance (and if the balance is 0 or negative, then obviously don't issue the read at all).
Also, don't forget to call consume on the streambuf once you have finished using a chunk of data. (ie. after you have read the header size, you need to consume all those bytes so that the remaining size() no longer includes them, before you calculate the balance.)
On Fri, May 25, 2018 at 1:44 AM, Gavin Lambert via Boost-users < boost-users@lists.boost.org> wrote:
Mixing read_until and read on the same socket is problematic, because the way they actually behave -- while not actually wrong -- is not what you first expect.
There are two key bits of information you need to realise the problem:
1. async_read_until actually calls async_read_some under the hood to read an arbitrary amount of data from the socket. All the data is stored in the streambuf and only a *subset* of that length is returned to indicate where the terminator was found.
2. async_read just calls async_read_some directly with the specified length request.
In particular, even if #1 already read the entire message into the streambuf, #2 will ignore that and still wait for the specified number of new incoming bytes -- even if you are reading into the same streambuf.
This! That was the key right here. I had this hunch but failed to confirm it or know how to prevent it. Thanks a bunch. I can't believe I have never read this crucial bit of info anywhere in the asio docs. I took your and Vinnie's advice and boiled this down to fewer async ops still reading into the streambuf and scratched the initial read_until, leaving only async_read. And it behaves nicely now. Looking at all my past failures with similar streambuf based approaches I think it may very well be that I always had this mix of the two operations. This should be prominently placed in the asio docs. So, for anybody who comes across this, the key elements are: * You can re-use the same streambuf for many reads and writes as long as you DO NOT MIX async_read() and async_read_until() * streambuf.size() gives you the data available for extraction * when extracting data, either read from streambuf.data() and then consume() the data you have read or use a stream that does the consume for you * when writing data into it either write into buffers given by prepare() and then call commit() or use a stream that does the commit for you Thanks again, Stephan
On 05/25/2018 06:40 AM, Stephan Menzel via Boost-users wrote:
On Fri, May 25, 2018 at 1:44 AM, Gavin Lambert via Boost-users Looking at all my past failures with similar streambuf based approaches I think it may very well be that I always had this mix of the two operations. This should be prominently placed in the asio docs.
Would it be possible for you to post a modified snippet of your code showing your new way of doing things?
So, for anybody who comes across this, the key elements are:
* You can re-use the same streambuf for many reads and writes as long as you DO NOT MIX async_read() and async_read_until() * streambuf.size() gives you the data available for extraction * when extracting data, either read from streambuf.data() and then consume() the data you have read or use a stream that does the consume for you * when writing data into it either write into buffers given by prepare() and then call commit() or use a stream that does the commit for you--
Raymond Burkholder ray@oneunified.net https://blog.raymond.burkholder.net -- This message has been scanned for viruses and dangerous content by MailScanner, and is believed to be clean.
On Fri, May 25, 2018 at 2:49 PM, Raymond Burkholder via Boost-users < boost-users@lists.boost.org> wrote:
Would it be possible for you to post a modified snippet of your code showing your new way of doing things?
I have changed many things and that wasn't real code. But the easiest way
to transform my example would be to change the header size. I went from
having a CRLF limited line to a 64 bit integer, which I serialize right
into the buffer. This way I always know how large it is and don't need
read_until anymore. Like this:
void read_header_size() {
asio::async_read(m_socket, m_streambuf, asio::transfer_at_least(8),
[self](const boost::system::error_code, const std::size_t) {
// < handle error skipped >
boost::uint64_t header_size = 0;
std::istream is(&self->m_streambuf, std::ios::binary);
is.read(reinterpret_cast
On Fri, May 25, 2018 at 8:12 AM, Stephan Menzel via Boost-users
Gave my packet a method to de-serialize itself from a streambuf and return the number of missing bytes or 0 if ready.
That sounds better :) I do the same thing here: https://github.com/boostorg/beast/blob/develop/include/boost/beast/websocket... Regards
participants (4)
-
Gavin Lambert
-
Raymond Burkholder
-
Stephan Menzel
-
Vinnie Falco