Consider the producer-consumer model:
producer -+- consumer1
+- consumer2
+- consumer3
+- consumer4
+- consumer5
The producer periodly push items into a queue, and those consumers wait
on that queue for items
It's easy to implement in a synchronous, multi-threaded environment: the
producer just locks the queue and push into it, and notify one consumer
through a condition variable. Consumers waiting on that condition
variable will be woken one by one, popping items and consume them
How to do those in a asynchronous environment?
Consumers async_pop() on that queue, posting their completion tokens
into the executor. When there are items available, the executor notify
one consumer by calling its completion token
Is it right?
How do the producer tell the executor "The asynchronous operation is
done, You should call that completion token"?
My implemention of a possible async queue (based on timers):
#include
#include <deque>
#include <iostream>
using namespace std::literals;
namespace io = boost::asio;
using io::awaitable;
auto& uawait = io::use_awaitable;
template<class D>
awaitable<void> delay(const D& dur) {
io::steady_timer tm{ co_await io::this_coro::executor };
tm.expires_after(dur);
co_await tm.async_wait(uawait);
}
template<class T>
class aqueue {
std::deque<T> mq;
public:
using reference = T&;
using const_reference = const T&;
size_t size() const noexcept { return mq.size(); }
bool empty() const noexcept { return mq.empty(); }
awaitable<T> async_pop() {
while (empty()) {
co_await delay(1ms);
}
auto t = std::move(mq.front());
mq.pop_front();
co_return t;
}
awaitable<void> async_push(T t) {
mq.push_back(std::move(t));
co_return;
}
};
class consumer {
public:
const int id;
aqueue<int>& queue;
consumer(int _i, aqueue<int>& _q) : id{ _i }, queue{ _q } {}
awaitable<void> operator()() {
std::cout << "Consumer " << id << " started\n";
auto i = co_await queue.async_pop();
std::cout << "Consumer " << id << " got " << i << '\n';
}
};
int main() {
io::io_context ctx;
aqueue<int> aq;
auto producer = [&]() -> awaitable<void> {
std::cout << "Producer started\n";
for (int i{}; i != 5; ++i) {
co_await delay(1s);
co_await aq.async_push(i);
}
};
io::co_spawn(ctx, producer, io::detached);
for (int i{}; i != 5; ++i)
io::co_spawn(ctx, consumer{ i, aq }, io::detached);
std::cout << "RUN\n";
ctx.run();
return 0;
}
Is it a proper way to do so using timers? I think there should be a
better way