Boost ASIO: Problems building a conditional variable using io_context / strand
= cnt) { // We bypass the pending queue mem_used_ += cnt; lock_.unlock(); return; } } assert(request_queue_.size() == request_routines_.pending_count()); std::cout << "Pushing " << cnt << std::endl; request_queue_.push(cnt); auto wg = asio::make_work_guard(completion_ioctx_); request_routines_.async_submit(yield, [this] { lock_.unlock(); }); auto
Hi all,
In boost-asio, I realized that there was no easy way to have something that
resembles a condition variable.
However, I realized that I could get something very similar using stackful
coroutines (
https://www.boost.org/doc/libs/1_66_0/doc/html/boost_asio/overview/core/spaw...),
while using an io_context as a 'queue' for completion tokens. Below is my
approach:
*#include "asio_types.hpp"#include <atomic>#include <iostream>class
async_pending_queue {public: async_pending_queue() :
pending_handlers_(0), strand_(pending_queue_),
wg_(asio::make_work_guard(pending_queue_)) {} template <typename
CompletionToken> auto async_submit( CompletionToken &&token,
std::function
On 23/05/2023 10:39, Suraaj K S wrote:
Using this, I wanted to build a 'memory checker'. It has two functions -> `request_space` and `free_space`. A coroutine calls `request_space`, which may block if there is no space left. Meanwhile, another thread / coroutine can call `free_space`, which will run blocked coroutines if possible.
Coroutines and mutexes are mortal enemies; they should not be mixed. Very especially holding locks across async boundaries is a big no-no. Usually the best way to manage coroutines is to not schedule them until they have useful work to do. If you want something more like traditional threading primitives for coroutines instead, consider Boost.Fiber.
Finally, somewhat more surprisingly, If I replace /post(stand_,std::move(posted_lambda));/ by /post(pending_queue_,std::move(posted_lambda));/, things seem to work. However, the asio documentation says that only strands guarantee a FIFO execution order. I am not sure if using a simple `io_context` will work as a FIFO queue (even though it seems to in these examples).
An io_context with only a single thread will behave equivalently to posting all work via a strand (as will posting new work only in response to existing work completing, or when there is no existing work -- read up about "implicit strand"). However, if you block that thread, nothing will be able to progress, possibly including the work needed to unblock the thread, leading to deadlock.
I encountered this problem too. Chris recommended using a steady_timer to construct an async semaphore. I wasn't happy with that, so wrote an async_semaphore type. Klemens then picked up on that and refined it. https://github.com/klemens-morgenstern/sam On Tue, 23 May 2023 at 00:44, Suraaj K S via Boost-users < boost-users@lists.boost.org> wrote:
Hi all,
In boost-asio, I realized that there was no easy way to have something that resembles a condition variable.
However, I realized that I could get something very similar using stackful coroutines ( https://www.boost.org/doc/libs/1_66_0/doc/html/boost_asio/overview/core/spaw...), while using an io_context as a 'queue' for completion tokens. Below is my approach:
*#include "asio_types.hpp"#include <atomic>#include <iostream>class async_pending_queue {public: async_pending_queue() : pending_handlers_(0), strand_(pending_queue_), wg_(asio::make_work_guard(pending_queue_)) {} template <typename CompletionToken> auto async_submit( CompletionToken &&token, std::function
atomic_action = [] {}) { auto init = [this, &atomic_action](auto completion_handler) { auto posted_lambda = [handler = std::move(completion_handler), this]() mutable { pending_handlers_--; asio_sys_err ec; handler(ec); }; post(strand_,std::move(posted_lambda)); pending_handlers_++; atomic_action(); }; return asio::async_initiate (init, token); } int pending_count() { return pending_handlers_.load(); } // It may not run 1 and run 0 bool try_run_one() { auto cnt = pending_queue_.poll_one(); std::cout << "completion token result" << cnt << std::endl; bool ret = (cnt == 1); return ret; }private: std::atomic<unsigned int> pending_handlers_; asio_ioctx pending_queue_; asio_ioctx::strand strand_; decltype(asio::make_work_guard(pending_queue_)) wg_;};* Here, one simply uses calls `my_async_pending_queue.async_submit(yield)`, if calling from a stackful coroutine. The coroutine can be continued by calling `my_async_pending_queue.try_run_one()`. Using this, I wanted to build a 'memory checker'. It has two functions -> `request_space` and `free_space`. A coroutine calls `request_space`, which may block if there is no space left. Meanwhile, another thread / coroutine can call `free_space`, which will run blocked coroutines if possible.
I built a toy memory checker wrapper as follows:
= cnt) { // We bypass the pending queue mem_used_ += cnt; lock_.unlock(); return; } } assert(request_queue_.size() == request_routines_.pending_count()); std::cout << "Pushing " << cnt << std::endl; request_queue_.push(cnt); auto wg = asio::make_work_guard(completion_ioctx_); request_routines_.async_submit(yield, [this] { lock_.unlock(); }); auto
*#ifndef MEM_CHECK_HPP#define MEM_CHECK_HPP#include <cstddef>#include <mutex>#include <queue>#include
#include namespace asio = boost::asio;using asio_ioctx = asio::io_context;using asio_sys_err = boost::system::error_code;using asio::yield_context;#include "async_pending_queue.hpp"class MemoryChecker {public: using bytes_cnt = size_t; MemoryChecker(asio_ioctx &ioctx, bytes_cnt total_mem = 1024, bytes_cnt initial_fill = 0); // This requests some space, and possibly yields; void request_space(bytes_cnt cnt, yield_context yield); void free_space(bytes_cnt cnt);private: bytes_cnt get_available_mem(); const bytes_cnt total_mem_; bytes_cnt mem_used_; std::queue request_queue_; async_pending_queue request_routines_; std::mutex lock_; asio_ioctx::strand fifo_strand_; asio_ioctx &completion_ioctx_;};inline MemoryChecker::MemoryChecker(asio_ioctx &ioctx, bytes_cnt total_mem, bytes_cnt initial_fill) : total_mem_(total_mem), mem_used_(initial_fill), request_queue_{}, lock_{}, request_routines_{}, completion_ioctx_(ioctx), fifo_strand_(ioctx) {}inline MemoryChecker::bytes_cnt MemoryChecker::get_available_mem() { assert(total_mem_ >= mem_used_); return total_mem_ - mem_used_;}inline void MemoryChecker::request_space(bytes_cnt cnt, yield_context yield) { // if (cnt > total_mem_) throw logic_error lock_.lock(); assert(cnt <= total_mem_); assert(cnt > 0); if (request_queue_.empty()) { assert(request_routines_.pending_count() == 0); if (get_available_mem() oldest_req{request_queue_.front()}; assert(cnt == oldest_req); request_queue_.pop(); mem_used_ += cnt; assert(request_queue_.size() == request_routines_.pending_count()); asio::post(fifo_strand_, yield);}inline void MemoryChecker::free_space(bytes_cnt cnt) { { std::lock_guardstd::mutex lg{lock_}; mem_used_ -= cnt; // Here, we own the lock, and free as many coroutines as we can while (true) { if (request_queue_.size() == 0) { std::cout << "No pending requests. Bailing" << std::endl; break; } assert(request_queue_.size() == request_routines_.pending_count()); auto oldest_req{request_queue_.front()}; auto available_mem{get_available_mem()}; if (available_mem < oldest_req) { std::cout << "Oldest request is larger than available_mem. Bailing" << std::endl; break; } assert(request_routines_.try_run_one() == true); } }}#endif /* MEM_CHECK_HPP */* Here is a test program that can run it:
*#include "mem_check.hpp"#include <thread>#include
constexpr size_t mc_size{4};asio_ioctx ioctx;size_t total{0};MemoryChecker mc{ioctx, mc_size};void requestor_coroutine(size_t rq,yield_context yield) { asio::steady_timer t(ioctx); while (true) { total += rq; mc.request_space(rq, yield); std::cout << "Got requested space "; asio_sys_err ec; }}int main() { asio::spawn(ioctx, [](yield_context yield) { requestor_coroutine(1,yield); }); asio::spawn(ioctx, [](yield_context yield) { requestor_coroutine(2,yield); }); asio::spawn(ioctx, [](yield_context yield) { requestor_coroutine(3,yield); }); asio::spawn(ioctx, [](yield_context yield) { requestor_coroutine(4,yield); }); std::thread t([] { ioctx.run(); }); while (true) { getchar(); std::cout << total << std::endl; if (total > 0) { std::cout << "freeing" << std::endl; mc.free_space(1); total -= 1; } } t.join();}* Finally, the problem we face is as follows. When we run the program, the assertion `total_mem >= mem_used_` fails. On some further investigation, I realized that our completion token was being called even when we do not call `try_run_one`, which was very weird.
Finally, somewhat more surprisingly, If I replace *post(stand_,std::move(posted_lambda));* by *post(pending_queue_,std::move(posted_lambda));*, things seem to work. However, the asio documentation says that only strands guarantee a FIFO execution order. I am not sure if using a simple `io_context` will work as a FIFO queue (even though it seems to in these examples).
Any inputs would be helpful - I am happy to hear the problem in this implementation, as well as other implementations (for example, using a std::queue as a proper queue instead of this io_context hack).
This question has also been posted here: https://stackoverflow.com/questions/76310252/c-boost-asio-building-a-conditi... .
Thanks, Suraaj _______________________________________________ Boost-users mailing list Boost-users@lists.boost.org https://lists.boost.org/mailman/listinfo.cgi/boost-users
participants (3)
-
Gavin Lambert
-
Richard Hodges
-
Suraaj K S