Maybe I'm making it myself too easy but I'd see every pipeline stage as a scheduler, say, for Asynchronous a stealing threadpool scheduler(with one or more threads), every stage getting a job transforming input data and posting to the queue of the next scheduler a functor doing the next stage transformation, etc. Then I'd create a composite in one line of code to make sure work-stealing happens and that would be it for the infrastructure.
Purely for the fun, it took me a few minutes to write such pipeline, a
simple version using a thread for every stage, then one with work stealing.
There are a tons of stuff to improve, for example strings should be moved
but I hope you get the idea.
Now one "just" needs to write the syntactic sugar to have beautiful
pipelines.
Cheers,
Christophe
The simple version is:
#include <iostream>
#include <string>
#include <vector>
#include <regex>
#include <functional>
#include
(1));
auto scheduler2 = boost::asynchronous::create_shared_scheduler_proxy( new boost::asynchronous::threadpool_scheduler< boost::asynchronous::lockfree_queue<>
(1));
auto scheduler3 = boost::asynchronous::create_shared_scheduler_proxy( new boost::asynchronous::threadpool_scheduler< boost::asynchronous::lockfree_queue<>
(1));
// job for first stage
auto grep = [scheduler2,scheduler3](std::string const&
re,std::string const& item)
{
std::regex regex(re);
if (std::regex_match(item, regex))
{
// job for second stage
auto trim = [scheduler3](std::string const& item)
{
std::string tc(boost::algorithm::trim_copy(item));
// 3rd stage job, cout
boost::asynchronous::post_future(scheduler3,
[tc](){std::cout << "->" << tc << endl;});
};
auto trim_ = std::bind(trim, std::move(item));
boost::asynchronous::post_future(scheduler2, trim_);
}
};
for(auto s : input)
{
auto grep_error = std::bind(grep, "Error.*", std::move(s));
boost::asynchronous::post_future(scheduler1, grep_error);
}
}
};
int main()
{
std::vectorstd::string input = {
"Error: foobar",
"Error. foo",
" Warning: barbaz",
"Notice: qux",
"\tError: abc"
};
pipeline p;
p.process(input);
// we are going to shutdown, schedulers will all block until completely
done
return 0;
}
And the stealing version is:
#include <iostream>
#include <string>
#include <vector>
#include <regex>
#include <functional>
#include
(1));
auto scheduler2 = boost::asynchronous::create_shared_scheduler_proxy( new boost::asynchronous::stealing_multiqueue_threadpool_scheduler< boost::asynchronous::lockfree_queue<>
(1));
auto scheduler3 = boost::asynchronous::create_shared_scheduler_proxy( new boost::asynchronous::stealing_multiqueue_threadpool_scheduler< boost::asynchronous::lockfree_queue<>
(1));
// composite pool made of the previous pools // keeping it alive will ensure automatic work-stealing between pools. m_composite = boost::asynchronous::create_shared_scheduler_proxy( new boost::asynchronous::composite_threadpool_scheduler<> (scheduler1,scheduler2,scheduler3)); } void process(std::vectorstd::string const& input) { auto grep = [this](std::string const& re,std::string const& item) { std::regex regex(re); if (std::regex_match(item, regex)) { auto trim = [this](std::string const& item) { std::string tc(boost::algorithm::trim_copy(item)); // post to third pool of composite boost::asynchronous::post_future(m_composite, [tc](){std::cout << "->" << tc << endl;},"",3); }; auto trim_ = std::bind(trim, std::move(item)); // post to second pool of composite boost::asynchronous::post_future(m_composite, trim_,"",2); } }; for(auto s : input) { auto grep_error = std::bind(grep, "Error.*", std::move(s)); // post to first pool of composite boost::asynchronous::post_future(m_composite, grep_error,"",1); } } private: boost::asynchronous::any_shared_scheduler_proxy<> m_composite; }; int main() { std::vectorstd::string input = { "Error: foobar", "Error. foo", " Warning: barbaz", "Notice: qux", "\tError: abc" }; pipeline p; p.process(input); // we are going to shutdown, schedulers will all block until completely done return 0; }