GSoC: Boost.Pipeline -- scheduling of segments
Hi I'm working on the GSoC Pipeline project, which is based on the N3534 [1] proposal. Work in progress can be found on GitHub [2]. A simple example of using the pipeline: pipeline::from(input_container) | transformation1 | t2 | t3 | output_container; When running this pipeline, items will be read from `input_container`, processed by the transformations and written to the `output_container`. Each segment should be applied in a parallel manner, this is the point of the pipeline. However, scheduling of these works is not trivial. Quoting the proposal: "The current pipeline framework uses one thread for each stage of the pipeline. To limit the use of resources, it should be possible to run with fewer threads, using work-stealing or work-sharing techniques." We are wondering if we could improve on this. Lets assume a thread pool of a single thread and the example pipeline above. On run(), the ideal would be that the thread reads *some* items from `input_container`, applies the transformations on them and pushes the results to `output_container`. That is: spending some time on each transformation then yield and pick up the next one. However, doing this would imply the transformations are reentrant. This additional constraint must be considered carefully. This behavior is implemented in the `development` branch [2]. Aside the two solutions above, Vicente J. Botet Escriba coined in the following idea: Let the threads work on a single transformation until the queue gets closed (no more input), then move to the next one. This is easy to implement and scales to as many threads as many segments are present. On the other hand, it kills the performance of the "online usecase": pipeline::from(read_message_from_socket) | process_message | send_message; It's not deterministic when the first segment will end, the pipeline will hang and won't provide any output. Also, in a slightly less strict scenario, when the end of input is specified, the latency could be just too high. To summarize, the following options are on the table: 1. Dedicate a thread to each segment (what to do with a fixed size threadpool?) 2. Constrain the transformations to be reentrant. 3. Run each transformations until there is input to be processed, from beginning to the end. We are kindly asking the recipients to share their ideas or opinions, ideally with a matching usecase. Thanks, Benedek [1]: http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2013/n3534.html [2]: https://github.com/erenon/pipeline/tree/development
On 1 Jul 2014 at 23:24, Benedek Thaler wrote:
It's not deterministic when the first segment will end, the pipeline will hang and won't provide any output. Also, in a slightly less strict scenario, when the end of input is specified, the latency could be just too high.
To summarize, the following options are on the table:
1. Dedicate a thread to each segment (what to do with a fixed size threadpool?) 2. Constrain the transformations to be reentrant. 3. Run each transformations until there is input to be processed, from beginning to the end.
We are kindly asking the recipients to share their ideas or opinions, ideally with a matching usecse.
Mmm, you're getting into needing to implement a demand-driven transformation based data stream framework, something I suspected would always happen with this proposal. In short, either you make all your processors coroutineable and resumable - which is a ton of work. Or you implement a full data stream implementation, which is also a ton of work. If you go with your approach 2 you'll murder reusability, with approach 3 you'll use a ton of RAM. Given yours is a GSoC and needs to be done sooner rather than later, I'd actually implement a dynamic thread pool which scales up the worker threads to whatever is the minimum set of blocking processors. Or borrow the threading facilities in HPX and port your code over to HPX completely, then your threading and blocking problems go away because HPX has implemented the data stream framework for you. You'll obviously lose C++ genericity if you do this though. Niall -- ned Productions Limited Consulting http://www.nedproductions.biz/ http://ie.linkedin.com/in/nialldouglas/
2014-07-02 1:37 GMT+02:00 Niall Douglas
In short, either you make all your processors coroutineable and resumable - which is a ton of work.
A chain of symmetric coroutines already model a dataflow and with fibers you already get synchronization primitives - I assume to integrate those stuff isn't too hard.
Thanks for all the responses so far! *Niall*: I'm afraid using HPX is out of scope in context of a Boost library. If all else fail, I'll fall back to a simple implementation (e.g: 1). IMO, depending on this design decision will be this library a great tool or just something nice. *Adam*: I'm not sure how this smart queue would solve the problem of reentrancy. Currently, if a thread returns from a transformation, it can never be resumed. This makes yielding hard/not possible. If we make reentrancy mandatory, then we face the next challenge, which you described above IIUC. The think the key points are here: (1) A task can yield, and can point to an other task which is waited upon. (2) A task (but not the thread) can "block" on the upstream queue, if it's empty. If it's not clear, I'll describe these in greater length. *Oliver*: I considered using coroutines, however I don't clearly see how this would solve this problem exactly. IIUC, Coroutines provide a way for a single thread to execute multiple methods. I'd need multiples threads executing multiple methods in a similar fashion. I'll continue thinking about this. I couldn't found it in the docs. Does Coroutine have any drawbacks? Thanks, Benedek
2014-07-02 10:50 GMT+02:00 Benedek Thaler
*Oliver*: I considered using coroutines, however I don't clearly see how this would solve this problem exactly. IIUC, Coroutines provide a way for a single thread to execute multiple methods. I'd need multiples threads executing multiple methods in a similar fashion. I'll continue thinking about this.
1.) 1 thread -> 1 method - coroutines not required 2.) 1 thread -> n methods (coroutines or fibers) - each coroutine/fiber could execute one method - multple coroutines/fibers executed in one thread == thread executes multiple methods - you need to know when a method has to be executed/resumed -> synchro. classes + scheduler required required -> fibers (boost.fiber) - you could chain multiple threads (thread-pool?) with channels/queues together => fixed sized thread-pool running arbitrary amount of methods
I couldn't found it in the docs. Does Coroutine have any drawbacks?
it depends on what you want to achieve
On 1404291049408, Benedek Thaler
Thanks for all the responses so far!
*Niall*: I'm afraid using HPX is out of scope in context of a Boost library. If all else fail, I'll fall back to a simple implementation (e.g: 1). IMO, depending on this design decision will be this library a great tool or just something nice.
*Adam*: I'm not sure how this smart queue would solve the problem of reentrancy. Currently, if a thread returns from a transformation, it can never be resumed. This makes yielding hard/not possible.
I was suggesting that the thread would grab a single piece of data from a queue, do a single transformation, and then return where it would make a scheduling decision about where to go next. Then it would once again grab one piece of data and do one transformation before returning to scheduling. I'm not suggesting that we would interrupt a thread in the middle of a stage... That would require writing most of an operating system kernel :) The tricky part is knowing where there's work to be done. The "smart queues" would have to update some stats in the scheduler on every push, and potentially wake up threads. It's certainly a non-trivial task. There's also the issue of the multi-in (N:1 & N:M) transformations, i.e. that take input queues. They would either have to take a whole thread, provide some kind of yield, or something else. I'd probably ignore this for now (just consume a thread) and focus on the simple 1:1 and 1:N cases as they naturally return after each transformation. Let me know if that makes sense. Adam If we make reentrancy mandatory, then we face the next challenge, which you
described above IIUC. The think the key points are here: (1) A task can yield, and can point to an other task which is waited upon. (2) A task (but not the thread) can "block" on the upstream queue, if it's empty. If it's not clear, I'll describe these in greater length.
*Oliver*: I considered using coroutines, however I don't clearly see how this would solve this problem exactly. IIUC, Coroutines provide a way for a single thread to execute multiple methods. I'd need multiples threads executing multiple methods in a similar fashion. I'll continue thinking about this.
I couldn't found it in the docs. Does Coroutine have any drawbacks?
Thanks, Benedek
*Niall*: I'm afraid using HPX is out of scope in context of a Boost library. If all else fail, I'll fall back to a simple implementation (e.g: 1). IMO, depending on this design decision will be this library a great tool or just something nice.
*Adam*: I'm not sure how this smart queue would solve the problem of reentrancy. Currently, if a thread returns from a transformation, it can never be resumed. This makes yielding hard/not possible.
I was suggesting that the thread would grab a single piece of data from a queue, do a single transformation, and then return where it would make a scheduling decision about where to go next. Then it would once again grab one piece of data and do one transformation before returning to scheduling.
I'm not suggesting that we would interrupt a thread in the middle of a stage... That would require writing most of an operating system kernel :)
The tricky part is knowing where there's work to be done. The "smart queues" would have to update some stats in the scheduler on every push, and potentially wake up threads. It's certainly a non-trivial task.
There's also the issue of the multi-in (N:1 & N:M) transformations, i.e. that take input queues. They would either have to take a whole thread, provide some kind of yield, or something else. I'd probably ignore this for now (just consume a thread) and focus on the simple 1:1 and 1:N cases as they naturally return after each transformation.
All of that functionality is available from HPX. It gives you work-queue based scheduling (with work stealing) of suspend-able/resume-able threads (i.e. supporting yield) with very little overhead. It also manages the 1:N or N:M threading for you. You could try building all of your functionality on top of HPX first. This could allow to figure out the actual underlying mechanisms your library would rely on. Later on you can move it to Boost after all of the required functionality has been accepted there. Regards Hartmut --------------- http://boost-spirit.com http://stellar.cct.lsu.edu
Let me know if that makes sense. Adam
If we make reentrancy mandatory, then we face the next challenge, which you
described above IIUC. The think the key points are here: (1) A task can yield, and can point to an other task which is waited upon. (2) A task (but not the thread) can "block" on the upstream queue, if it's empty. If it's not clear, I'll describe these in greater length.
*Oliver*: I considered using coroutines, however I don't clearly see how this would solve this problem exactly. IIUC, Coroutines provide a way for a single thread to execute multiple methods. I'd need multiples threads executing multiple methods in a similar fashion. I'll continue thinking about this.
I couldn't found it in the docs. Does Coroutine have any drawbacks?
Thanks, Benedek
_______________________________________________ Unsubscribe & other changes: http://lists.boost.org/mailman/listinfo.cgi/boost
On Thu, Jul 3, 2014 at 2:19 PM, Hartmut Kaiser
All of that functionality is available from HPX. It gives you work-queue based scheduling (with work stealing) of suspend-able/resume-able threads (i.e. supporting yield) with very little overhead. It also manages the 1:N or N:M threading for you.
You could try building all of your functionality on top of HPX first. This could allow to figure out the actual underlying mechanisms your library would rely on. Later on you can move it to Boost after all of the required functionality has been accepted there.
Thanks for the further encouragement. I'll definitely take a closer look at HPX in the future. However, the deadline of GSoC is closing in, I have to found a simpler solution for now. Hopefully, having a more solid base will enable us to build something truly great later. Thanks, Benedek
On 4 Jul 2014 at 11:55, Benedek Thaler wrote:
Thanks for the further encouragement. I'll definitely take a closer look at HPX in the future. However, the deadline of GSoC is closing in, I have to found a simpler solution for now. Hopefully, having a more solid base will enable us to build something truly great later.
This is why I originally suggested HPX for this. Hartmut, who is one of the oldest core Boost people still here, did a really great job on HPX which is pretty much the final say on how to do task execution at large scale. A Pipelines implementation matching your proposal wouldn't be trivial on HPX, but it would be tractable before GSoC ends. Your only alternative to deliver before GSoC ends I suspect is to fire a thread at every component in the pipeline as without async i/o, you can't leverage coroutines. As you've probably realised by now, pipeline processors need to communicate in two directions, so end stage processors need to be able to tell the first data source "I need one more byte or else I can't continue" and such. Thing is, that one extra byte at the end of the pipeline may turn in Kb of extra bytes as each processor imposes its own needs on the forward propagation. This is the kind of stuff HPX takes care of for you. Niall -- ned Productions Limited Consulting http://www.nedproductions.biz/ http://ie.linkedin.com/in/nialldouglas/
All of that functionality is available from HPX. It gives you work-queue based scheduling (with work stealing) of suspend-able/resume-able threads (i.e. supporting yield) with very little overhead. It also manages the 1:N or N:M threading for you.
You could try building all of your functionality on top of HPX first. This could allow to figure out the actual underlying mechanisms your library would rely on. Later on you can move it to Boost after all of the required functionality has been accepted there.
Thanks for the further encouragement. I'll definitely take a closer look at HPX in the future. However, the deadline of GSoC is closing in, I have to found a simpler solution for now. Hopefully, having a more solid base will enable us to build something truly great later.
I understand your situation, and everything very much depends on your GSoC mentor.
From my perspective however, even if your GSoC is related to Boost, it is not necessarily required for your project to produce a ready-made Boost library. If in the end you coined out a functional interface with a proof of concept implementation, all of that combined with a clear understanding what facilities you would need in Boost or in the library, then you would have achieve your goal. HPX is an existing library which could help you with reaching that goal as you wouldn't have to worry about the underlying functionality. But that's just my - possibly biased - 2c.
Regards Hartmut --------------- http://boost-spirit.com http://stellar.cct.lsu.edu
Hartmut Kaiser
There's also the issue of the multi-in (N:1 & N:M) transformations, i.e. that take input queues. They would either have to take a whole thread, provide some kind of yield, or something else. I'd probably ignore this for now (just consume a thread) and focus on the simple 1:1 and 1:N cases as they naturally return after each transformation.
All of that functionality is available from HPX. It gives you work-queue based scheduling (with work stealing) of suspend-able/resume-able threads (i.e. supporting yield) with very little overhead. It also manages the 1:N or N:M threading for you.
You could try building all of your functionality on top of HPX first. This could allow to figure out the actual underlying mechanisms your library would rely on. Later on you can move it to Boost after all of the required functionality has been accepted there.
There is also Christophe Henry's Boost.Asynchronous. Just like the HPX guys he spent a lot of time thinking about this kind of problem and from what I understand, his solutions is reasonable. The interface is still a little rough around the edges but that can be changed. Advantage over HPX: it is a library already targeted at becoming part of Boost one day. As for the original question, I think option one (dedicate a thread to each segment) is fine for now. Maybe it can be implemented with a pseudo task abstraction that can be exchanged with a real task abstraction (or whatever HPX or Boost.Asynchronous call it) once it is available. I would consider a fixed-size threadpool with not enough threads to run all segments a runtime-error (or compile-time if the threadpool allows for this) - for now. [0] https://github.com/henry-ch/asynchronous
On 7 Jul 2014 at 8:35, Sebastian Schaetz wrote:
There is also Christophe Henry's Boost.Asynchronous. Just like the HPX guys he spent a lot of time thinking about this kind of problem and from what I understand, his solutions is reasonable. The interface is still a little rough around the edges but that can be changed. Advantage over HPX: it is a library already targeted at becoming part of Boost one day.
I agree with all of this assessment. I would add that I somewhat struggle to see how his "go it alone" approach fits snugly with what WG21 have already agreed is going to happen (his Preface in particular is stale) - I should worry his approach may become orphaned. Also, though he has taken time and care to have his approach fit somewhat with other Boost libraries present and future, I find the depth of that part somewhat lacking. None of this is insurmountable in a peer review of course. It looks to be a great library. Vicente, if you're reading you really should have a look at proposed Boost.Asynchronous. He's already done a lot of your todo list for Boost.Thread.
As for the original question, I think option one (dedicate a thread to each segment) is fine for now. Maybe it can be implemented with a pseudo task abstraction that can be exchanged with a real task abstraction (or whatever HPX or Boost.Asynchronous call it) once it is available. I would consider a fixed-size threadpool with not enough threads to run all segments a runtime-error (or compile-time if the threadpool allows for this) - for now.
My only qualm, really, is that proposed Boost.Asynchronous does not appear to me to implement pull-push pipeline management which is the hard part of a Pipelines implementation. I am under the possibly mistaken understanding that HPX does. Niall -- ned Productions Limited Consulting http://www.nedproductions.biz/ http://ie.linkedin.com/in/nialldouglas/
My only qualm, really, is that proposed Boost.Asynchronous does not appear to me to implement pull-push pipeline management which is the hard part of a Pipelines implementation. I am under the possibly mistaken understanding that HPX does.
What do you mean with pull-push? Maybe I'm making it myself too easy but I'd see every pipeline stage as a scheduler, say, for Asynchronous a stealing threadpool scheduler(with one or more threads), every stage getting a job transforming input data and posting to the queue of the next scheduler a functor doing the next stage transformation, etc. Then I'd create a composite in one line of code to make sure work-stealing happens and that would be it for the infrastructure. Am I missing something? Cheers, Christophe
Maybe I'm making it myself too easy but I'd see every pipeline stage as a scheduler, say, for Asynchronous a stealing threadpool scheduler(with one or more threads), every stage getting a job transforming input data and posting to the queue of the next scheduler a functor doing the next stage transformation, etc. Then I'd create a composite in one line of code to make sure work-stealing happens and that would be it for the infrastructure.
Purely for the fun, it took me a few minutes to write such pipeline, a
simple version using a thread for every stage, then one with work stealing.
There are a tons of stuff to improve, for example strings should be moved
but I hope you get the idea.
Now one "just" needs to write the syntactic sugar to have beautiful
pipelines.
Cheers,
Christophe
The simple version is:
#include <iostream>
#include <string>
#include <vector>
#include <regex>
#include <functional>
#include
(1));
auto scheduler2 = boost::asynchronous::create_shared_scheduler_proxy( new boost::asynchronous::threadpool_scheduler< boost::asynchronous::lockfree_queue<>
(1));
auto scheduler3 = boost::asynchronous::create_shared_scheduler_proxy( new boost::asynchronous::threadpool_scheduler< boost::asynchronous::lockfree_queue<>
(1));
// job for first stage
auto grep = [scheduler2,scheduler3](std::string const&
re,std::string const& item)
{
std::regex regex(re);
if (std::regex_match(item, regex))
{
// job for second stage
auto trim = [scheduler3](std::string const& item)
{
std::string tc(boost::algorithm::trim_copy(item));
// 3rd stage job, cout
boost::asynchronous::post_future(scheduler3,
[tc](){std::cout << "->" << tc << endl;});
};
auto trim_ = std::bind(trim, std::move(item));
boost::asynchronous::post_future(scheduler2, trim_);
}
};
for(auto s : input)
{
auto grep_error = std::bind(grep, "Error.*", std::move(s));
boost::asynchronous::post_future(scheduler1, grep_error);
}
}
};
int main()
{
std::vectorstd::string input = {
"Error: foobar",
"Error. foo",
" Warning: barbaz",
"Notice: qux",
"\tError: abc"
};
pipeline p;
p.process(input);
// we are going to shutdown, schedulers will all block until completely
done
return 0;
}
And the stealing version is:
#include <iostream>
#include <string>
#include <vector>
#include <regex>
#include <functional>
#include
(1));
auto scheduler2 = boost::asynchronous::create_shared_scheduler_proxy( new boost::asynchronous::stealing_multiqueue_threadpool_scheduler< boost::asynchronous::lockfree_queue<>
(1));
auto scheduler3 = boost::asynchronous::create_shared_scheduler_proxy( new boost::asynchronous::stealing_multiqueue_threadpool_scheduler< boost::asynchronous::lockfree_queue<>
(1));
// composite pool made of the previous pools // keeping it alive will ensure automatic work-stealing between pools. m_composite = boost::asynchronous::create_shared_scheduler_proxy( new boost::asynchronous::composite_threadpool_scheduler<> (scheduler1,scheduler2,scheduler3)); } void process(std::vectorstd::string const& input) { auto grep = [this](std::string const& re,std::string const& item) { std::regex regex(re); if (std::regex_match(item, regex)) { auto trim = [this](std::string const& item) { std::string tc(boost::algorithm::trim_copy(item)); // post to third pool of composite boost::asynchronous::post_future(m_composite, [tc](){std::cout << "->" << tc << endl;},"",3); }; auto trim_ = std::bind(trim, std::move(item)); // post to second pool of composite boost::asynchronous::post_future(m_composite, trim_,"",2); } }; for(auto s : input) { auto grep_error = std::bind(grep, "Error.*", std::move(s)); // post to first pool of composite boost::asynchronous::post_future(m_composite, grep_error,"",1); } } private: boost::asynchronous::any_shared_scheduler_proxy<> m_composite; }; int main() { std::vectorstd::string input = { "Error: foobar", "Error. foo", " Warning: barbaz", "Notice: qux", "\tError: abc" }; pipeline p; p.process(input); // we are going to shutdown, schedulers will all block until completely done return 0; }
Maybe I'm making it myself too easy but I'd see every pipeline stage as a scheduler, say, for Asynchronous a stealing threadpool scheduler(with one or more threads), every stage getting a job transforming input data and posting to the queue of the next scheduler a functor doing the next stage transformation, etc. Then I'd create a composite in one line of code to make sure work- stealing happens and that would be it for the infrastructure.
Purely for the fun, it took me a few minutes to write such pipeline, a simple version using a thread for every stage, then one with work stealing. There are a tons of stuff to improve, for example strings should be moved but I hope you get the idea. Now one "just" needs to write the syntactic sugar to have beautiful pipelines.
Purely for fun as well and for the sake of completeness (as HPX was
mentioned here before), here is Christophe's code in HPX:
#include
On 8 Jul 2014 at 21:23, christophe.j.henry@googlemail.com wrote:
My only qualm, really, is that proposed Boost.Asynchronous does not appear to me to implement pull-push pipeline management which is the hard part of a Pipelines implementation. I am under the possibly mistaken understanding that HPX does.
What do you mean with pull-push? Maybe I'm making it myself too easy but I'd see every pipeline stage as a scheduler, say, for Asynchronous a stealing threadpool scheduler(with one or more threads), every stage getting a job transforming input data and posting to the queue of the next scheduler a functor doing the next stage transformation, etc. Then I'd create a composite in one line of code to make sure work-stealing happens and that would be it for the infrastructure.
Am I missing something?
We almost certainly are meaning different things. I meant something like: auto p = Paragraphs << TextFile << HTMLFile << "http://www.boost.org/" Paragraphs::Words::iterator it=p[4].match("Niall"); while(it) std::cout << "'Niall' found at offset " << it->offset << std::endl; Here a pipeline extracting paragraphs from the text content of the HTML file from boost.org is constructed on the first line. No processing actually happens yet. The second line configures more pipeline, an iterator matching all instances of the word 'Niall' in the fifth paragraph of the extracted text. Still no processing happens yet. The third line loops the iterator, printing the offsets of 'Niall' in the fifth paragraph. At this point the Paragraphs issues a pull forward that it wants the fifth paragraph to TextFile, TextFile tells HTMLFile it only wants the fifth <p>...</p>, HTMLFile only bothers fetching as much of the content as will supply the fifth <p> content. It then pushes what content it also retrieved back down the pipeline as maybe empty paragraphs are to be disregarded etc and only the lower stage processors will know anything about that. That's what I meant by push-pull pipeline management. Oh, and it should cope with live content changes, so if the source HTML changes the entire pipeline is flushed of state until the next pull or is dynamically updated with the new content and all current code using it is told. Niall -- ned Productions Limited Consulting http://www.nedproductions.biz/ http://ie.linkedin.com/in/nialldouglas/
We almost certainly are meaning different things. I meant something like:
auto p = Paragraphs << TextFile << HTMLFile << "http://www.boost.org/"
Paragraphs::Words::iterator it=p[4].match("Niall");
while(it) std::cout << "'Niall' found at offset " << it->offset << std::endl;
Nice. I'm pretty sure my example can be augmented by a pipeline stage in form of a tcp scheduler pushing into a HTML file stage etc. But don't you think it's a bit too much for a GSoC project and we should set realistic goals a student can achieve? Cheers, Christophe
Purely for the fun, it took me a few minutes to write such pipeline, a simple version using a thread for every stage, then one with work stealing. There are a tons of stuff to improve, for example strings should be moved but I hope you get the idea. Now one "just" needs to write the syntactic sugar to have beautiful pipelines.
Well, if it's possible to use as many threads as there are segments (transformation), then I think it's a fairly easier problem. Making it possible for a thread to alternate between segments is something interesting. (Please correct me if I misunderstood your solution) Regarding work stealing: I don't know how does it work, but is the order of the input stable across segments?
We almost certainly are meaning different things. I meant something like:
auto p = Paragraphs << TextFile << HTMLFile << "http://www.boost.org/"
Paragraphs::Words::iterator it=p[4].match("Niall");
while(it) std::cout << "'Niall' found at offset " << it->offset << std::endl;
I think this demand driven computing is not something I'm aiming at, but it can approximated. The goal of my design is to provide a low-latency pipeline with acceptable throughput (or a high throughput one with acceptable latency); data driven nature serve this better IMO. Hartmut, thanks for the HPX example, I haven't got the time to analyze it yet. Currently, I'm experimenting with coroutines, I think (hope) there is a way we could provide an interface like this: void duplicate(int input, queue_back<int>& output) { output.push_or_yield(input); output.push_or_yield(input); } push_or_yield enqueues the element, or if the queue is full: the coroutine yields and tries to enter the monitor of the downstream task. If it's already taken, pick another task. If there is no such task, block until a task becomes available. (or spin on the previous task a bit) I think this would have nice (configurable) latency characteristics. Thanks for the intense discussion, Benedek
2014-07-09 19:33 GMT+02:00 Benedek Thaler
Currently, I'm experimenting with coroutines, I think (hope) there is a way we could provide an interface like this:
void duplicate(int input, queue_back<int>& output) { output.push_or_yield(input); output.push_or_yield(input); }
push_or_yield enqueues the element, or if the queue is full: the coroutine yields and tries to enter the monitor of the downstream task. If it's already taken, pick another task. If there is no such task, block until a task becomes available. (or spin on the previous task a bit)
that's exactly what boost.fibers provide (fibers are coroutines + scheduler + synch. classes). a fiber gets suspended if it calls bounded_queue<>::push() and the queue if full or (un)bounded_queue<>::pop() if the queue is empty. the hosting thread is not blocked an can execute other tasks in the mean time.
Well, if it's possible to use as many threads as there are segments (transformation), then I think it's a fairly easier problem. Making it possible for a thread to alternate between segments is something interesting. (Please correct me if I misunderstood your solution)
The simple solution is using 1 thread per segment, though it can be 1..n. I'm unsure what you mean by migrating. In this first solution, a thread takes only care of one segment, so no thread handles tasks from another segment. Work stealing would do it.
Regarding work stealing: I don't know how does it work, but is the order of the input stable across segments?
No. If you have more than one thread per segment, or use work stealing (where a thread from another segment which has nothing to do at some point will steal work from another segment according to the order where schedulers are defined), or a pool of threads, then it is possible that input data gets processed in a different order than read. This is not specific to work stealing but likely also applies to each solution where more than one thread handles a segment. It might or might not be a problem but the performance is likely to be higher, even if you need to reorder data afterwards to stay stable. HTH, Christophe
Hartmut, thanks for the HPX example, I haven't got the time to analyze it yet.
Currently, I'm experimenting with coroutines, I think (hope) there is a way we could provide an interface like this:
void duplicate(int input, queue_back<int>& output) { output.push_or_yield(input); output.push_or_yield(input); }
push_or_yield enqueues the element, or if the queue is full: the coroutine yields and tries to enter the monitor of the downstream task. If it's already taken, pick another task. If there is no such task, block until a task becomes available. (or spin on the previous task a bit)
I think this would have nice (configurable) latency characteristics.
To explain things: HPX creates a coroutine (i.e. hpx::thread) for each hpx::async. The returned future can be used to synchronize with the thread's execution. The overhead of one such thread is in the range of 700-900ns, so you can easily spawn fairly small amounts of work (i.e. segments) and still be efficient. Creating of millions (literally!) of such threads is not a problem either. HPX implements almost all of the Standards TS related to concurrency and parallelism (N4088, N4104, N4107 - see here: http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2014/) which makes the returned futures very versatile and - together with the other proposed extensions - composable in many contexts. One added benefit of HPX is that all of this works across machines. You can have the same functionality as outlined in a distributed application. Regards Hartmut --------------- http://boost-spirit.com http://stellar.cct.lsu.edu
The overhead of one such thread is in the range of 700-900ns
Do you have numbers for specific architectures (ABIs)?
These numbers are for Sandybridge x64, depending on the number of cores used concurrently (e.g. the difference is caused by contention effects in the scheduler). We have numbers for XeonPhi, which are worse, but that's because this architecture runs on a slower clock rate, and other architecture specific things. We have it running on ARM32 and BlueGene/Q (Power) as well, but I don't have any numbers available at this point Regards Hartmut --------------- http://boost-spirit.com http://stellar.cct.lsu.edu
Hi, I've put together some slides to show what I'm thinking of and make sure we are on the same page: http://www.slideshare.net/erenon/boostpipeline-scheduling-of-segments-368324... To point here, assuming a pipeline, one can use extra information to make some educated guesses regarding scheduling: Yield to offending queue (upstream if input is empty, downstream if output is full), choose specific segment to optimize latency. Probably this can be achieved using fibers, or even with coroutines. Due to the complexity of HPX I'm not sure how does it compare. Benedek
I've put together some slides to show what I'm thinking of and make sure we are on the same page: http://www.slideshare.net/erenon/boostpipeline-scheduling-of-segments- 36832455
To point here, assuming a pipeline, one can use extra information to make some educated guesses regarding scheduling: Yield to offending queue (upstream if input is empty, downstream if output is full), choose specific segment to optimize latency.
Probably this can be achieved using fibers, or even with coroutines.
Due to the complexity of HPX I'm not sure how does it compare.
Huh? Complexity? It's 100% API compatible to C++11 and various proposals to C++14/17. I'd like to know more about why you think HPX is complex. Regards Hartmut --------------- http://boost-spirit.com http://stellar.cct.lsu.edu
I've put together some slides to show what I'm thinking of and make sure we are on the same page: http://www.slideshare.net/erenon/boostpipeline-scheduling-of-segments- 36832455
To point here, assuming a pipeline, one can use extra information to make some educated guesses regarding scheduling: Yield to offending queue (upstream if input is empty, downstream if output is full), choose specific segment to optimize latency.
Probably this can be achieved using fibers, or even with coroutines.
Due to the complexity of HPX I'm not sure how does it compare.
Huh? Complexity? It's 100% API compatible to C++11 and various proposals to C++14/17. I'd like to know more about why you think HPX is complex.
I added a much simplified implementation of what you described in your
slides. Most of the complexity lies in the operator&, not it the HPX code
itself.
#include
On Thu, Jul 10, 2014 at 5:42 PM, Hartmut Kaiser
I've put together some slides to show what I'm thinking of and make sure we are on the same page: http://www.slideshare.net/erenon/boostpipeline-scheduling-of-segments- 36832455
To point here, assuming a pipeline, one can use extra information to make some educated guesses regarding scheduling: Yield to offending queue (upstream if input is empty, downstream if output is full), choose specific segment to optimize latency.
Probably this can be achieved using fibers, or even with coroutines.
Due to the complexity of HPX I'm not sure how does it compare.
Huh? Complexity? It's 100% API compatible to C++11 and various proposals to C++14/17. I'd like to know more about why you think HPX is complex.
I added a much simplified implementation of what you described in your slides. Most of the complexity lies in the operator&, not it the HPX code itself. [snip]
Thanks for showing this sample, it's definitely very clean! By complexity,
I meant internal details I couldn't cover yet, not something silly in the API. Three things come to my mind regarding this example: - It **seems** every segment creates a future instead of pushing the result into a bounded queue. Possibly the later would be faster. - The segments never yield. Is there a complete scheduler underneath? - By launching tasks using hpx::async, it seems the scheduler can't have any notion of the pipeline, which makes efficient scheduling harder. Sorry for my ignorance, I can't answer these questions just by glancing at the docs. Thanks, Benedek
I've put together some slides to show what I'm thinking of and make sure we are on the same page: http://www.slideshare.net/erenon/boostpipeline-scheduling-of- segments- 36832455
To point here, assuming a pipeline, one can use extra information to make some educated guesses regarding scheduling: Yield to offending queue (upstream if input is empty, downstream if output is full), choose specific segment to optimize latency.
Probably this can be achieved using fibers, or even with coroutines.
Due to the complexity of HPX I'm not sure how does it compare.
Huh? Complexity? It's 100% API compatible to C++11 and various proposals to C++14/17. I'd like to know more about why you think HPX is complex.
I added a much simplified implementation of what you described in your slides. Most of the complexity lies in the operator&, not it the HPX code itself. [snip]
Thanks for showing this sample, it's definitely very clean! By complexity, I meant internal details I couldn't cover yet, not something silly in the API.
Three things come to my mind regarding this example: - It **seems** every segment creates a future instead of pushing the result into a bounded queue. Possibly the later would be faster.
Possibly. This depends on the amount of work you're having in each segment. As said, the overhead of one async/future is in the range of microseconds, so it will be efficient (in terms of parallel efficiency) once you have a certain amount of work. At the same time, if you have segments which have not sufficient work it might not make sense to overlap those in the first place, you could simply return a ready-future (like the read() step I showed). But in the end you'll have to measure for your use cases.
- The segments never yield. Is there a complete scheduler underneath?
Yes. The segments could yield if needed.
- By launching tasks using hpx::async, it seems the scheduler can't have any notion of the pipeline, which makes efficient scheduling harder.
Well, it depends on your scheduler. In HPX the default scheduler (others can be used) is based on a FIFO queue with work-stealing from the back-end (one queue per core) for load balancing across all used cores.
Sorry for my ignorance, I can't answer these questions just by glancing at the docs.
You might not find those answers in the docs anyways ;)
From my perspective, having the segments return a future has many advantages:
a) you can utilize the asynchronous nature of future-based continuations without effort b) this composes well with other asynchronous parallelization techniques (like when_all, when_any, etc.) c) it generally allows for hiding raw threads from the user (which is always good) and helps transforming concurrency into parallelism Regards Hartmut --------------- http://boost-spirit.com http://stellar.cct.lsu.edu
On 9 Jul 2014 at 18:45, christophe.j.henry@googlemail.com wrote:
We almost certainly are meaning different things. I meant something like:
auto p = Paragraphs << TextFile << HTMLFile << "http://www.boost.org/"
Paragraphs::Words::iterator it=p[4].match("Niall");
while(it) std::cout << "'Niall' found at offset " << it->offset << std::endl;
Nice.
Yeah I implemented one of those a decade ago for my next gen platform. Thing is, Carl Sassenrath has us all beat on that technology, it's basically REBOL. And he did it a lot better than I did (mine was clunky).
I'm pretty sure my example can be augmented by a pipeline stage in form of a tcp scheduler pushing into a HTML file stage etc. But don't you think it's a bit too much for a GSoC project and we should set realistic goals a student can achieve?
Oh without doubt that's way beyond a GSoC. It took me over a year working on it full time. I was simply illustrating we were talking about different things, that's all. Niall -- ned Productions Limited Consulting http://www.nedproductions.biz/ http://ie.linkedin.com/in/nialldouglas/
On Thu, Jul 3, 2014 at 2:27 AM, Adam Berkan
I was suggesting that the thread would grab a single piece of data from a queue, do a single transformation, and then return where it would make a scheduling decision about where to go next. Then it would once again grab one piece of data and do one transformation before returning to scheduling.
In case of 1-1 transformations, I think this is done in the `development` branch, altough the really interesting issues are not taken care of... ->
I'm not suggesting that we would interrupt a thread in the middle of a stage... That would require writing most of an operating system kernel :)
The tricky part is knowing where there's work to be done. The "smart queues" would have to update some stats in the scheduler on every push, and potentially wake up threads. It's certainly a non-trivial task.
-> like this. Now I think we are on the same page regarding such queues or scheduling facility..
There's also the issue of the multi-in (N:1 & N:M) transformations, i.e. that take input queues. They would either have to take a whole thread, provide some kind of yield, or something else. I'd probably ignore this for now (just consume a thread) and focus on the simple 1:1 and 1:N cases as they naturally return after each transformation.
I think only 1:1 is an easy case. Even 1:N has an issue: consider a transformation which duplicates its input. What if the second element cannot be pushed to the downstream queue because it's full? The transformation should yield (and not spin). Thanks, Benedek
2014-07-04 11:52 GMT+02:00 Benedek Thaler
What if the second element cannot be pushed to the downstream queue because it's full? The transformation should yield (and not spin).
that problem could be solved by fibers - if a fiber would enqueue an item in a bounded-queue and the queue is full, the fiber is suspended and the thread can resume another fiber (executing another transformation). if items are removed from the bounded-queue, the waiting fiber gets notified an will be resumed, e.g. is able to enqueue its item.
On Wed, Jul 2, 2014 at 1:55 AM, Oliver Kowalke
2014-07-02 1:37 GMT+02:00 Niall Douglas
:
In short, either you make all your processors coroutineable and resumable - which is a ton of work.
A chain of symmetric coroutines already model a dataflow
You don't even need symmetric coroutines; asymmetric coroutines model that just fine -- see: https://github.com/boostorg/coroutine/blob/master/example/cpp03/asymmetric/c... A coroutine implementation of pipelines becomes syntactic sugar, a facile way to construct a chain that you can already build (admittedly somewhat verbosely).
Hello Benedek,
2014-07-01 23:24 GMT+02:00 Benedek Thaler
1. Dedicate a thread to each segment (what to do with a fixed size threadpool?) 2. Constrain the transformations to be reentrant. 3. Run each transformations until there is input to be processed, from beginning to the end.
You could use boost.fiber (std::thread-like API) to run multiple tasks (transformations) inside a single thread concurrently. At least with fibers you can use a fixed size threadpool without overloading/blocking the pool with too many tasks.
On 2 Jul 2014 at 7:51, Oliver Kowalke wrote:
You could use boost.fiber (std::thread-like API) to run multiple tasks (transformations) inside a single thread concurrently. At least with fibers you can use a fixed size threadpool without overloading/blocking the pool with too many tasks.
How would that help him with the blocking from the i/o? The coroutines need to fire every time a chunk comes in from the i/o source, but what is a chunk? Each processor will define a chunk differently, and that means each section of the pipeline needs to carry state. Niall -- ned Productions Limited Consulting http://www.nedproductions.biz/ http://ie.linkedin.com/in/nialldouglas/
2014-07-02 15:20 GMT+02:00 Niall Douglas
How would that help him with the blocking from the i/o?
non-blocking IO (maybe with boost.asio)
The coroutines need to fire every time a chunk comes in from the i/o source, but what is a chunk?
it depends on the algorithm - if the chunk of bytes contains a complete set of data it can be pushed along the pipeline - otherwise the chunk can be stored in a local variable
Each processor will define a chunk differently, and that means each section of the pipeline needs to carry state.
the state could be stored in a local variable for instance a local buffer is filled by an IO-op until enough bytes are arrived and the message can be parsed from the buffer
participants (8)
-
Adam Berkan
-
Benedek Thaler
-
christophe.j.henry@googlemail.com
-
Hartmut Kaiser
-
Nat Goodspeed
-
Niall Douglas
-
Oliver Kowalke
-
Sebastian Schaetz