[boost::asio] Wait for multiple async_read()'s

Hi, I'm writing a boost::asio application that communicates with multiple streams. I'd like to write data to these streams then wait for everyone to reply before performing computation on the resulting data. What is the best way to implement this? The route I've ventured down involves me calling async_write() on each stream, with the callback hander executing async_read(). But this where I'm stuck. What is the best way to execute one final hander once all read operations have completed? One solution I've found, though it feels "un-asio" is to pass the use_future option as the hander for the reads. The result is futures for each read, which I can then place into a list<future>, and have an additional io_service.post()'ed handler call get() on these futures. The only issue with this is that I must call io_service.run() from more than one thread, since the handler calling get() blocks — otherwise I can potentially deadlock the whole application. Another alternative I can think of is that the function that schedules all the async_writes knows ahead-of-time the number of reads that need to be completed. Each async_read() handler would put its results into a shared list and then, if the list's size is equal to the expected number of results, fires the "completion" handler. While easy enough to implement, this seems fraught with issues. If there is more than one thread calling io_service.run(), then access to the shared list has to be synchronised. Also, it doesn't seem very generic — I have a number of places where this write n, read n, execute (or is it map, reduce?) pattern needs to happen, and I'd prefer a solution that was write one, worked everywhere. Any help would be appreciated. - Adam

On 21 January 2017 at 13:09, Adam Zegelin
Another alternative I can think of is that the function that schedules all the async_writes knows ahead-of-time the number of reads that need to be completed. Each async_read() handler would put its results into a shared list and then, if the list's size is equal to the expected number of results, fires the "completion" handler. While easy enough to implement, this seems fraught with issues. If there is more than one thread calling io_service.run(), then access to the shared list has to be synchronised. Also, it doesn't seem very generic — I have a number of places where this write n, read n, execute (or is it map, reduce?) pattern needs to happen, and I'd prefer a solution that was write one, worked everywhere.
I would definitely go with this approach. It's by far the simplest and to me it seems to match an asynchronous philosophy pretty well. You can use an io_service::strand [1] to synchronize access to the shared list easily without explicit locking. For best performance you should probably keep the handler running in the strand as short as possible and keep the parts of the read handler that don't access shared state outside of the strand. [1] http://www.boost.org/doc/libs/1_63_0/doc/html/boost_asio/reference.html#boos... Hope this helps, -- Maarten

On 21/01/17 12:09, Adam Zegelin wrote:
I'm writing a boost::asio application that communicates with multiple streams. I'd like to write data to these streams then wait for everyone to reply before performing computation on the resulting data.
What is the best way to implement this?
The route I've ventured down involves me calling async_write() on each stream, with the callback hander executing async_read(). But this where I'm stuck. What is the best way to execute one final hander once all read operations have completed?
One solution I've found, though it feels "un-asio" is to pass the use_future option as the hander for the reads. The result is futures for each read, which I can then place into a list<future>, and have an additional io_service.post()'ed handler call get() on these futures. The only issue with this is that I must call io_service.run() from more than one thread, since the handler calling get() blocks — otherwise I can potentially deadlock the whole application.
This is why futures without continuations aren't that useful; with continuations, useful helpers can be made: http://www.boost.org/doc/libs/1_63_0/doc/html/thread/synchronization.html#th... http://www.boost.org/doc/libs/1_63_0/doc/html/thread/synchronization.html#th... Good luck. Ben

2017-01-21 20:09 GMT+08:00 Adam Zegelin
I'm writing a boost::asio application that communicates with multiple streams. I'd like to write data to these streams then wait for everyone to reply before performing computation on the resulting data.
What is the best way to implement this?
The route I've ventured down involves me calling async_write() on each stream, with the callback hander executing async_read(). But this where I'm stuck. What is the best way to execute one final hander once all read operations have completed?
Coroutine may help. Take a simple echo-server for example: https://github.com/jamboree/co2#asio-echo-server In your case, you just need to keep the tasks in a list and 'await' for them to complete.

On 21-01-17 13:09, Adam Zegelin wrote:
I'm writing a boost::asio application that communicates with multiple streams. I'd like to write data to these streams then wait for everyone to reply before performing computation on the resulting data.
What is the best way to implement this?
The route I've ventured down involves me calling async_write() on each stream, with the callback hander executing async_read(). But this where I'm stuck. What is the best way to execute one final hander once all read operations have completed?
[snip] Not sure if it's the best way, but for this typical use case I've written a combiner, which uses reference counting in a wrapped handler. A part of this io_object's signature is combiner_service::wrapped_handler async_combine( const boost::system::error_code& a1, const handler& handler ); and the following usage auto wrapped_handler = some_combiner_.async_combine( default_error_code, ..handler when all is complete... ) for( ..0 or more items.. ) { some_io_object.async_do_stuff( ..., wrapped_handler ); } in the example above, the operation will only finish if the variable wrapped_handler goes out of scope, and all copies of wrapped_handler have been called. The wrapped handler maximizes the final error code. HtH, Cheers, Rutger

On 22/01/2017 01:09, Adam Zegelin wrote:
One solution I've found, though it feels "un-asio" is to pass the use_future option as the hander for the reads. The result is futures for each read, which I can then place into a list<future>, and have an additional io_service.post()'ed handler call get() on these futures. The only issue with this is that I must call io_service.run() from more than one thread, since the handler calling get() blocks — otherwise I can potentially deadlock the whole application.
You can pass a collection of futures to when_all(), which returns a future that will be completed when all the parameters are completed. You can then use future.then() to attach a continuation to this future which will execute when that occurs.

Thanks for everyones suggestions.
On Mon, Jan 23, 2017 at 9:58 AM, Gavin Lambert
You can pass a collection of futures to when_all(), which returns a future that will be completed when all the parameters are completed.
You can then use future.then() to attach a continuation to this future which will execute when that occurs.
That's exactly what I was looking for, thank you!
I swear I read the docs for boost::future multiple times, but it looks
like its a fairly recent addition to the library (labeled
Google has this tendency to link to older documentation sets (this
always gets me when googling for postgres docs) and I probably Googled
boost::future and ended up reading the docs from 20 years ago!
Hint to boost devs: do what postgres does and have a link to the most
recent doc version at the top of every page.
On Sun, Jan 22, 2017 at 1:31 AM, TONGARI J
Coroutine may help. Take a simple echo-server for example: https://github.com/jamboree/co2#asio-echo-server In your case, you just need to keep the tasks in a list and 'await' for them to complete.
I looked into co-routines but they seem like too much magic for my taste.
Perhaps in a future project.
On Sat, Jan 21, 2017 at 11:24 PM, Maarten de Vries
I would definitely go with this approach. It's by far the simplest and to me it seems to match an asynchronous philosophy pretty well.
I got this to work, but it seems like a lot of work at every "join" point. Not to mention it combines the concerns of the upstream and downstream read-handlers — upstream has to know about downstream. Regards, Adam

On 25/01/2017 11:47, Adam Zegelin wrote:
I swear I read the docs for boost::future multiple times, but it looks like its a fairly recent addition to the library (labeled experimental). Google has this tendency to link to older documentation sets (this always gets me when googling for postgres docs) and I probably Googled boost::future and ended up reading the docs from 20 years ago! Hint to boost devs: do what postgres does and have a link to the most recent doc version at the top of every page.
FWIW, the top Google hit for "boost future" links to the 1.55 docs (which do not have when_all), but there is indeed a big yellow box at the top of the page which links to the latest version. Perhaps it didn't look enough like a hyperlink so you mentally skipped it. :) Or perhaps instead of entering the page at the top you entered at one of the anchors further down? Perhaps that's an argument to style the header so that it stays stuck at the top of the window, even when scrolled down. (It also wouldn't be the first time that I've wished I didn't have to scroll to get to the navigation arrows.) Although that might annoy people on mobile browsers.

Next problem :)
asio's use_future returns a std::future. when_all() expects boost::futures.
I can't find an obvious way to convert between the two and/or get
asio's use_future to return a boost::future.
My understanding is that std::futures are pretty much a re-namespaced
boost::future (aka, boost's implementation was standardised).
My compiler (clang, Apple LLVM version 8.0.0 (clang-800.0.42.1),
bundled with Xcode 8.2.1) doesn't come with experimental/future (see
Besides re-implementing all the "magic" in asio/impl/use_future.hpp,
is there a way to mix these future types?
On Wed, Jan 25, 2017 at 1:16 PM, Gavin Lambert
FWIW, the top Google hit for "boost future" links to the 1.55 docs (which do not have when_all), but there is indeed a big yellow box at the top of the page which links to the latest version.
Perhaps it didn't look enough like a hyperlink so you mentally skipped it. :)
Now that you've told me that its there I can't miss it. Perhaps I just mentally skipped it before as it doesn't look much like a link and is inline with the header.

On Thu, 26 Jan 2017 at 22:41 Adam Zegelin
Next problem :)
asio's use_future returns a std::future. when_all() expects boost::futures. I can't find an obvious way to convert between the two and/or get asio's use_future to return a boost::future.
ASIO is set up to solve this -- you can write use_boost_future in the same way that asio::use_future is written, but make it return a boost::future instead. You should be able to copy and paste the ASIO implementation of use_future, and change the details to use boost::future instead. A similar example can be found here, which adapts a custom future implementation into Chris' std::executors reference implementation. This is same author as ASIO and so it uses a similar technique. The code may even work as is with just a change of the namespace from std::experimental to boost::asio. https://github.com/cdglove/daily-future/blob/master/include/daily/future/use... Good Luck! -- chris
participants (7)
Adam Zegelin
Ben Pope
Chris Glover
Gavin Lambert
Maarten de Vries
Rutger ter Borg