Hello,
I am new in this mailing list, I hope this is the right list to post my
issue.
Context:
I often have to deal with some code which is originally synchronous. When
we introduce an asynchronous call, all the callers must be made
asynchronous with async/await everywhere.
Let's take a concrete example:
Suppose we have an existing synchronous library which makes computation.
Suppose we can give a custom function to this library to compute a part of
the calculation. We decide that this function will call a server...
We may wish the main compute function to become asynchronous...
Now the big question:
Why isn't it possible so far to do this without changing all the functions
?? I find this really ugly, this is really difficult to make a good design.
With asymetric async calls, I doubt this is possible, so I tried
boost::context.
In my example, the "library" is represented by the function calculate which
is synchronous. It calls a function call_server which simulates the call to
a server, in fact it will just wait for 5 seconds but in a asynchronous
way.
The main function will do a computation in parallel while the server will
be called. You will find the code at the end of the mail.
This sounds too good to be true but in this case, where did I miss
something ? I haven't implemented a socket with a select but this would
work, right ?
The library can still be used in an asynchronous or synchronous context
(with no extra cost).
To help the understanding of the code, we have 3 tasks, the continuation
are stored in the tasks vector. task are identified thanks to their indice
in the vector:
- 0: main task (id_main)
- 1: reactor(id_reactor)
- 2; computation_task (id_computation)
I implement a kind of future, whose value is retrieved at the end of the
main function and which launches a task to compute the value.
Remarks:
1/ This is not optimised, we could make fewer context switches. There is no
need for a specific task for the reactor, it could be called directly.
2/ The call_server function could detect automatically the sync or async
context, if the reactor is not active (no waiting tasks to be run), then it
would run i a synchronous way.
3/ This solution is based on a singleton for the reactor, but this is what
we want,
4/ This could be certainly better coded, I am not an expert, I use to code
more in Python and wanted to use C++ to see if I could solve these designs
problems.
I am curious to read what you think about this, I certainly missed
something so please clarify and tell me if a clean solution is possible.
Thanks a lot for reading.
Regards,
Chris
#include
#include <iostream>
#include <iterator>
#include
#include <iostream>
#include <string>
#include <vector>
#include <tuple>
#include <queue>
#include
int a = 0;
int result = 0;
namespace ctx = boost::context;
std::vectorctx::continuation tasks;
std::queue<int> tasks_to_run;
std::vector<int> end_tasks; // list of tasks which ended
using clock_info = std::tuple; // task, start_time,
duration
using wait_info = std::tuple; // source, target: source wainting
for target to end
std::queue clocks; // timer to wake up tasks
std::queue waits;
const int id_main = 0;
const int id_reactor = 1;
const int id_computation = 2;
void add_clock(int task, int duration)
{
time_t start;
time(&start);
clocks.push(std::make_tuple(task, start, duration));
}
void wait_for(int source, int target) {
waits.push(std::make_tuple(source, target));
}
void end(int task) {
end_tasks.push_back(task);
}
ctx::continuation reactor(ctx::continuation && main_continuation) {
ctx::continuation task;
std::cout << "start reactor" << std::endl;
tasks.push_back(std::move(main_continuation)); // add main continuation to
list of tasks
tasks_to_run.push(id_main);
unsigned int k;
time_t end_time;
for (;;) {
if (tasks_to_run.size()) {
int id_task = tasks_to_run.front();
tasks_to_run.pop();
tasks[id_task] = tasks[id_task].resume();
}
time(&end_time);
for (k = 0; k < clocks.size(); k++) {
clock_info clock = clocks.front();
int elapsed = end_time - std::get<1>(clock);
if (elapsed > std::get<2>(clock)) {
tasks_to_run.push(std::get<0>(clock));
}
else
clocks.push(clock);
clocks.pop();
}
for (k = 0; k < waits.size(); k++) {
wait_info wait = waits.front();
if (std::find(end_tasks.begin(), end_tasks.end(), std::get<1>(wait)) !=
end_tasks.end()) {
tasks_to_run.push(std::get<0>(wait));
}
else
waits.push(wait);
waits.pop();
}
}
return std::move(main_continuation);
}
struct int_future {
std::function func;
int future_result;
int_future(std::function f) :func(f) {}
auto make_task() {
return [this](ctx::continuation && main_cont) {
tasks[id_reactor] = main_cont.resume();
future_result = this->func();
end(id_computation);
tasks[id_reactor] = tasks[id_reactor].resume();
return std::move(main_cont);
};
}
int get() {
wait_for(id_main, id_computation);
tasks[id_reactor] = tasks[id_reactor].resume();
return future_result;
}
};
int call_server() {
std::cout << "waiting for server results" << std::endl;
add_clock(id_computation, 5);
tasks[id_reactor] = tasks[id_reactor].resume();
result = 5;
std::cout << "server returned result " << result << std::endl;
return result;
}
int calculate() {
return 1 + call_server();
}
int main()
{
ctx::continuation task;
tasks.push_back(ctx::callcc(reactor));
std::cout << "launch computation task" << std::endl;
int_future fut(calculate);
tasks.push_back(ctx::callcc(fut.make_task()));
tasks_to_run.push(id_computation);
tasks_to_run.push(id_main);
tasks[id_reactor] = tasks[id_reactor].resume();
std::cout << "main task running" << std::endl;
for (int j = 0; j < 10; ++j) {
std::cout << a << " ";
}
std::cout << std::endl;
std::cout << "wait for future" << std::endl;
std::cout << "future result:" << fut.get() << std::endl;
}