[dataflow.signals][thread_safe_signals] How to alter the Calling of chained signals
Hi! I want to use dataflow.signals, which is build on top of thread_safe_signals. Each call of a producer (each signal) is put into a threadpool. Now i want not to execute the whole chain of data processing at once, but just one chain link at the time and put a new task to the threadpool than. The goal is that i can put rather complex and long (running) dataflow chains in the threadpool, which are divided into many pieces, so the threadpool can often decide which task is most important. So how can i alter the call of the next signal in a signal network/chain? Kind regards Manuel
On Sun, Aug 24, 2008 at 8:36 AM, Manuel Jung
Hi!
Hello,
I want to use dataflow.signals, which is build on top of thread_safe_signals.
Do you mean that you would like to use it, or are using it, on top of thread_safe_signals?
Each call of a producer (each signal) is put into a threadpool. Now i want not to execute the whole chain of data processing at once, but just one chain link at the time and put a new task to the threadpool than. The goal is that i can put rather complex and long (running) dataflow chains in the threadpool, which are divided into many pieces, so the threadpool can often decide which task is most important.
This is a very nice use case. Are you using any particular threadpool implementation?
So how can i alter the call of the next signal in a signal network/chain?
I can see two ways of doing this. One is to leave the signal/connection semantics the same but change the semantics of the receiving component, to where the receiver adds a task to the threadpool upon receiving the signal (and makes sure that the threadpool task either reports back when it is done, or knows how to launch the next threadpool component). The syntax would maybe look something like this: // the components are task launchers launcher<task1> component_a(some_threadpool); launcher<task2> component_b(some_threadpool); // connect them component_a >>= component_b; // start the processing chain component_a(data_for_task_a); The other way is to change the semantics of connecting, to where the connection takes care of entering the receiving component into the threadpool. The syntax would be: // the tasks themselves are the components task1 component_a(some_threadpool); task2 component_b(some_threadpool); // connect the tasks directly component_a >>= component_b; // start the processing chain component_a(data_for_task_a); I wouldn't mind putting together a little example of something like this. Do you have a preference for either the above approaches? The first one would fit neatly into the existing dataflow.signals network, and for the other one we would have to implement a new type of connection. Not sure which one would be easier. Stjepan
Stjepan Rajko wrote: Hey Stjepan,
I want to use dataflow.signals, which is build on top of thread_safe_signals.
Do you mean that you would like to use it, or are using it, on top of thread_safe_signals?
I'm using dataflow. I just wanted to point out, that the knowledge of dataflow is not needed.
Each call of a producer (each signal) is put into a threadpool. Now i want not to execute the whole chain of data processing at once, but just one chain link at the time and put a new task to the threadpool than. The goal is that i can put rather complex and long (running) dataflow chains in the threadpool, which are divided into many pieces, so the threadpool can often decide which task is most important.
This is a very nice use case. Are you using any particular threadpool implementation?
Yes, I'm using the proposed threadpool implemantation from Oliver Kowalke, so i can chain tasked by using futures.
So how can i alter the call of the next signal in a signal network/chain?
I can see two ways of doing this. One is to leave the signal/connection semantics the same but change the semantics of the receiving component, to where the receiver adds a task to the threadpool upon receiving the signal (and makes sure that the threadpool task either reports back when it is done, or knows how to launch the next threadpool component). The syntax would maybe look something like this:
// the components are task launchers launcher<task1> component_a(some_threadpool); launcher<task2> component_b(some_threadpool);
// connect them component_a >>= component_b;
// start the processing chain component_a(data_for_task_a);
The other way is to change the semantics of connecting, to where the connection takes care of entering the receiving component into the threadpool. The syntax would be:
// the tasks themselves are the components task1 component_a(some_threadpool); task2 component_b(some_threadpool);
// connect the tasks directly component_a >>= component_b;
// start the processing chain component_a(data_for_task_a);
I wouldn't mind putting together a little example of something like this. Do you have a preference for either the above approaches? The first one would fit neatly into the existing dataflow.signals network, and for the other one we would have to implement a new type of connection. Not sure which one would be easier.
I think, i would prefer something more like the second approach, where the operators are altered. I have this in mind: You should just be able to user another type of conenction of components, an async typ, which uses a threadpool. This way you are still able to connect components the normal way so they get executed in one row (would be useful e.g. when the components are rather fast executed) MyComponentA componentA; MyComponentB componentB; //Well This would be nice, but i'm not sure if it is possible? //Overload the >>= Operator (and the | operator of course) componentA >>=(pool) componentB; //A submit function, similar to invoke //All parts of the chain have to be added to the threadpool now submit(componentA, pool); ----------- You will need the submit function because a chained submit in the threadpool library works like this: tp::task< int > t1( pool.submit( boost::bind( add_fn, 1, 2) ) ); tp::task< int > t2( pool.chained_submit( boost::bind( add_fn, t1.get_future(), 3), t1) ); So until you add the first component to a threadpool, no task object is created and therefore you are not able to chain submit the rest of the components in the dataflow. This approach would make it also possible to add different chain links to different pool objects (handy for resource managment) or tie them directly to the prior chain link. If the overloaded >>= syntax isn't possible would be an extra filter possible too: ComponentA >>= delay(pool) >>= ComponentB; Maybe "delay" isn't a adequate name for it, but i can't think of another at the moment. An example would be really great! Do you know the proposed threadpool library? Kind Regards Manuel
On Sun, Aug 24, 2008 at 11:51 AM, Manuel Jung
I wouldn't mind putting together a little example of something like this. Do you have a preference for either the above approaches? The first one would fit neatly into the existing dataflow.signals network, and for the other one we would have to implement a new type of connection. Not sure which one would be easier.
I think, i would prefer something more like the second approach, where the operators are altered. I have this in mind: You should just be able to user another type of conenction of components, an async typ, which uses a threadpool. This way you are still able to connect components the normal way so they get executed in one row (would be useful e.g. when the components are rather fast executed)
OK, I thought about this some... Unfortunately, the Dataflow library has some fundamental limitations when it comes to creating connections (you can't provide a parameter for the connection, e.g., to specify that it should be immediate vs. using a threadpool), so the only way to change the type of the connection is to change the type of the components being connected. This will make it harder to implement the second approach, which aims to leave the components in tact. Here are some examples that implement new connection types allowing consumers to track their producers: http://svn.boost.org/svn/boost/sandbox/SOC/2007/signals/libs/dataflow/exampl... http://svn.boost.org/svn/boost/sandbox/SOC/2007/signals/libs/dataflow/exampl...
An example would be really great! Do you know the proposed threadpool library?
I just downloaded it and need to study it a bit. When I figure out how to use it, I'll work on the example. Kind regards, Stjepan
On Mon, Aug 25, 2008 at 11:40 AM, Stjepan Rajko
On Sun, Aug 24, 2008 at 11:51 AM, Manuel Jung
wrote: An example would be really great! Do you know the proposed threadpool library?
I just downloaded it and need to study it a bit. When I figure out how to use it, I'll work on the example.
OK, I have a working example. You can find it in the boost sandbox:
http://svn.boost.org/svn/boost/sandbox/SOC/2007/signals/libs/dataflow/exampl...
The user code looks like this:
typedef
tp::pool<
tp::fixed,
tp::unbounded_channel< tp::fifo >
> threadpool_type;
threadpool_type pool(tp::max_poolsize(5));
typedef boost::signals::storage
Stjepan Rajko wrote: Hi, Here are some of my thoughts. Still not ver much time, more around the weekend.
On Mon, Aug 25, 2008 at 11:40 AM, Stjepan Rajko
wrote: On Sun, Aug 24, 2008 at 11:51 AM, Manuel Jung
wrote: An example would be really great! Do you know the proposed threadpool library?
I just downloaded it and need to study it a bit. When I figure out how to use it, I'll work on the example.
OK, I have a working example. You can find it in the boost sandbox:
http://svn.boost.org/svn/boost/sandbox/SOC/2007/signals/libs/dataflow/exampl... I checked out the whole dataflow from sandbox. Example compiles and runs fine.
The user code looks like this:
typedef tp::pool< tp::fixed, tp::unbounded_channel< tp::fifo > > threadpool_type;
threadpool_type pool(tp::max_poolsize(5));
typedef boost::signals::storage
source_type; typedef boost::signals::function filter_type; // our components source_type source(1); filter_type increase(inc_fn); filter_type increase2(inc_fn); filter_type increase3(inc_fn); filter_type increase4(inc_fn);
// our network // increase >>= increase2 will be in its own thread // increase3 will be in its own thread // increase4 will be in its own thread source | (make_delay(pool, increase) >>= increase2) | (make_delay(pool, increase3) >>= make_delay(pool, increase4));
// submit the first task submit(pool, source);
This seems very usable. I also like that you are able to add filters to different pools.
Like you said, delay might not be the best name, but works for now. A couple of comments:
* The example is hard-wired to a void(int) signature. It wouldn't be hard to generalize, if there is interest.
I need to have it generalize. Would be great if you could do it, otherwise, i'll try at the weekend.
* The example above uses Boost.Signals (not thread_safe_signals). This is unsafe, but in my experience as long as the signals get connected before the threading starts, it works ok. A couple of minor things need to be addressed with thread_safe_signals before it will work as a drop-in replacement with Dataflow.Signals - I'll see if I can get them integrated.
Hmm. thread_safe_signals are needed too. There will be new signal connections after the threading.
* I don't use chained_submit - instead, the delay object will do a regular submit when it gets its input. This was more in line with the push-oriented design of Boost.Signals. To use chained_submits and futures would require either using a pull-based network (each component taking in the input as the argument and returning the result), or propagating the final result back through return values.
That is ok with me. I would prefer all the tasks to be added at the beginning (because of an easier priority handling of the queue), but i can this way too, i think.
Thoughts?
Seems very great so far. This way you can do late evaluated dataflow networks :-). Just create a pool and make a dataflow network (even a cyclic! :-)) and you could evaluate it step for step, depending by the threadpools queueing pollicy. Thanks for what you did so far :-). I like the dataflow lib. Can i help writing some code, the generalization for all data types or something else? I just don't know the inside of the dataflow library that good, it would take much longer, if i try to do it... but im of course willing to help. Maybe an other example?
Stjepan
Kind Regards Manuel
On Thu, Aug 28, 2008 at 1:19 PM, Manuel Jung
I checked out the whole dataflow from sandbox. Example compiles and runs fine.
Great!
[snip]
// our network // increase >>= increase2 will be in its own thread // increase3 will be in its own thread // increase4 will be in its own thread source | (make_delay(pool, increase) >>= increase2) | (make_delay(pool, increase3) >>= make_delay(pool, increase4));
// submit the first task submit(pool, source);
This seems very usable. I also like that you are able to add filters to different pools.
Yeah, that turned out to be a nice advantage.
Like you said, delay might not be the best name, but works for now. A couple of comments:
* The example is hard-wired to a void(int) signature. It wouldn't be hard to generalize, if there is interest.
I need to have it generalize. Would be great if you could do it, otherwise, i'll try at the weekend.
It just need slight tweaks in things that are already there. I might not find time until Sunday, but once I get to it it shouldn't take me long.
Hmm. thread_safe_signals are needed too. There will be new signal connections after the threading.
Frank Mori Hess has integrated a change that I believe is the only change necessary to make thread_safe_signals work with Dataflow.Signals. When I work on the generalization, I will also test it with thread_safe_signals.
* I don't use chained_submit - instead, the delay object will do a regular submit when it gets its input. This was more in line with the push-oriented design of Boost.Signals. To use chained_submits and futures would require either using a pull-based network (each component taking in the input as the argument and returning the result), or propagating the final result back through return values.
That is ok with me. I would prefer all the tasks to be added at the beginning (because of an easier priority handling of the queue), but i can this way too, i think.
To do the chaining at the beginning, we would need have access to the network graph. We can have that using the Dataflow.Blueprint layer, so maybe it's something we can explore in the future.
Seems very great so far. This way you can do late evaluated dataflow networks :-). Just create a pool and make a dataflow network (even a cyclic! :-)) and you could evaluate it step for step, depending by the threadpools queueing pollicy.
Yeah, this was a really good (and concrete) suggestion, thanks for bringing it up. I'm excited about the cyclic possibilities too :-)
Thanks for what you did so far :-). I like the dataflow lib. Can i help writing some code, the generalization for all data types or something else? I just don't know the inside of the dataflow library that good, it would take much longer, if i try to do it... but im of course willing to help. Maybe an other example?
I really appreciate your offer to help. The Dataflow.Signals layer is coming up for review on September 1st, so it would be great if we could continue developing and discussing this example through the review period. As someone that has used / is using the lib (or at least looking at using it), your feedback would be very much appreciated and helpful during the review! If you're interested in joining the development, I have recently started a rewrite of the Dataflow library (the rewrite is to address some things that I already know I want to redesign as well as any feedback received during the review). I am doing the rewrite in a very step-by-step fashion, starting with the library foundations and only moving on when things are properly designed and documented. I can keep you informed of the progress if you'd like, and you would be more than welcome to join in at any point. Best, Stjepan
On Thu, Aug 28, 2008 at 9:10 PM, Stjepan Rajko
On Thu, Aug 28, 2008 at 1:19 PM, Manuel Jung
wrote: * The example is hard-wired to a void(int) signature. It wouldn't be hard to generalize, if there is interest.
I need to have it generalize. Would be great if you could do it, otherwise, i'll try at the weekend.
It just need slight tweaks in things that are already there. I might not find time until Sunday, but once I get to it it shouldn't take me long.
I have it generalized now. The only downside is that the signature has to be specified as a part of the make_async call (I renamed delay to async). We can remove that requirement for certain components in the future. I have committed the changes to the sandbox (the example now also uses thread_safe_signals). I haven't tested it extensively, so there are probably problems to work out. If you run into any, please let me know. Kind regards, Stjepan
Stjepan Rajko wrote:
On Thu, Aug 28, 2008 at 9:10 PM, Stjepan Rajko
wrote: On Thu, Aug 28, 2008 at 1:19 PM, Manuel Jung
wrote: * The example is hard-wired to a void(int) signature. It wouldn't be hard to generalize, if there is interest.
I need to have it generalize. Would be great if you could do it, otherwise, i'll try at the weekend.
It just need slight tweaks in things that are already there. I might not find time until Sunday, but once I get to it it shouldn't take me long.
I have it generalized now. The only downside is that the signature has to be specified as a part of the make_async call (I renamed delay to async). We can remove that requirement for certain components in the future.
I have committed the changes to the sandbox (the example now also uses thread_safe_signals). I haven't tested it extensively, so there are probably problems to work out. If you run into any, please let me know.
Kind regards,
Stjepan
Hey, The current implementation does not cover priority or smart threadpools. I need to submit a priority type (for instance just an int) with the first submit and this value needs to be submit everytime make_async submits a new task. This priority value changes with every data submitted to the dataflow network. And it is very importend: You wont need possible breakpoints in your dataflow network (the make_async calls), if there is no priority system. (Well it would make sense if you split signals with "|" opeartor, so the rest of the dataflow could be parallelized..) II have antother thaught: The make_async filter is ok as interface, to split the dataflow. But i don't like that i loose control over the task object. If i have no task object, i am not able to interrupt that task. And interruptions of tasks would be very handy to make the program more stable. I could control the runtime of tasks and interrupt and kill them, if they didn't respond. Maybe this is out of the scope of dataflow.signals, but i think i would prefer an implementation of dataflow with futures, if that is possible. I think that would make the integration of the threadpool library much more natural. I'm not very familiar with futures yet, still learning, how to use them best. But i will think about this and let you know if i can think of a way to do it. I don't no how much i could contribute to your rewrite of the dataflow library, but altough it would be nice if you would keep me informed. At the moment i have relativly much free time, but by the time the new semester starts, my time will be much shorter. Kind Regards Manuel
On Tue, Sep 2, 2008 at 7:50 AM, Manuel Jung
The current implementation does not cover priority or smart threadpools. I need to submit a priority type (for instance just an int) with the first submit and this value needs to be submit everytime make_async submits a new task. This priority value changes with every data submitted to the dataflow network. And it is very importend: You wont need possible breakpoints in your dataflow network (the make_async calls), if there is no priority system. (Well it would make sense if you split signals with "|" opeartor, so the rest of the dataflow could be parallelized..)
II have antother thaught: The make_async filter is ok as interface, to split the dataflow. But i don't like that i loose control over the task object.
I am trying a new approach: http://svn.boost.org/trac/boost/browser/sandbox/SOC/2007/signals/libs/datafl... With this one, the pool is associated with with a component, rather than a connection. That way, the task can be stored and retrieved from outside at some point. I'm not sure about one thing though - in threadpool examples I've looked at, there is one thread that submits the tasks, and then the same thread controls the tasks. Here, we have worker threads creating subsequent tasks, so they would need to get the task object to the main controller thread somehow. Does the threadpool object provide access to all submitted and running tasks, maybe the main thread can just get them from there? Or, the async_component can store all created tasks internally and provide access to them.
If i have no task object, i am not able to interrupt that task. And interruptions of tasks would be very handy to make the program more stable. I could control the runtime of tasks and interrupt and kill them, if they didn't respond.
I can try something like this.
Maybe this is out of the scope of dataflow.signals, but i think i would prefer an implementation of dataflow with futures, if that is possible. I think that would make the integration of the threadpool library much more natural. I'm not very familiar with futures yet, still learning, how to use them best. But i will think about this and let you know if i can think of a way to do it.
I think the futures interface might not align the best with Dataflow.Signals, but we could try to put together a new Dataflow framework that would be specific to the Threadpool library and use futures.
I don't no how much i could contribute to your rewrite of the dataflow library, but altough it would be nice if you would keep me informed. At the moment i have relativly much free time, but by the time the new semester starts, my time will be much shorter.
I will keep you informed. Best, Stjepan
Hey!
On Tue, Sep 2, 2008 at 7:50 AM, Manuel Jung
wrote: The current implementation does not cover priority or smart threadpools. I need to submit a priority type (for instance just an int) with the first submit and this value needs to be submit everytime make_async submits a new task. This priority value changes with every data submitted to the dataflow network. And it is very importend: You wont need possible breakpoints in your dataflow network (the make_async calls), if there is no priority system. (Well it would make sense if you split signals with "|" opeartor, so the rest of the dataflow could be parallelized..)
II have antother thaught: The make_async filter is ok as interface, to split the dataflow. But i don't like that i loose control over the task object.
I am trying a new approach:
http://svn.boost.org/trac/boost/browser/sandbox/SOC/2007/signals/libs/datafl... I have looked at the new example. I think it is a good approach to make a async component from a normal component. The first example with the inc works fine, but the 3rd (2nd is disabled, but looking forward to this kind..) seems not to work right? My output is e.g.: adding task tick 1... adding task tick 2... adding task tick 3... adding task tick 1... adding task tick 2... adding task But since the network is just "tick_starter >>= ticker1 >>= ticker2 >>= ticker3 >>= ticker1;", the last "tick 2..." is too much, isn't it?
With this one, the pool is associated with with a component, rather than a connection. That way, the task can be stored and retrieved from outside at some point. I'm not sure about one thing though - in threadpool examples I've looked at, there is one thread that submits the tasks, and then the same thread controls the tasks. Here, we have worker threads creating subsequent tasks, so they would need to get the task object to the main controller thread somehow. Does the threadpool object provide access to all submitted and running tasks, maybe the main thread can just get them from there? Or, the async_component can store all created tasks internally and provide access to them.
Hm, I'm not sure about this, i will try to investigate about it.
If i have no task object, i am not able to interrupt that task. And interruptions of tasks would be very handy to make the program more stable. I could control the runtime of tasks and interrupt and kill them, if they didn't respond.
I can try something like this.
Maybe this is out of the scope of dataflow.signals, but i think i would prefer an implementation of dataflow with futures, if that is possible. I think that would make the integration of the threadpool library much more natural. I'm not very familiar with futures yet, still learning, how to use them best. But i will think about this and let you know if i can think of a way to do it.
I think the futures interface might not align the best with Dataflow.Signals, but we could try to put together a new Dataflow framework that would be specific to the Threadpool library and use futures.
I don't no how much i could contribute to your rewrite of the dataflow library, but altough it would be nice if you would keep me informed. At the moment i have relativly much free time, but by the time the new semester starts, my time will be much shorter.
I will keep you informed.
Great, Greetings Manu
Best,
Stjepan
On Sun, Sep 7, 2008 at 3:41 AM, Manuel Jung
I have looked at the new example. I think it is a good approach to make a async component from a normal component. The first example with the inc works fine, but the 3rd (2nd is disabled, but looking forward to this kind..)
2nd example should work now (just did a commit), with the latest updates Oliver Kowalke did on the threadpool lib.
seems not to work right? My output is e.g.:
adding task tick 1... adding task tick 2... adding task tick 3... adding task tick 1... adding task tick 2... adding task
But since the network is just "tick_starter >>= ticker1 >>= ticker2 >>= ticker3 >>= ticker1;", the last "tick 2..." is too much, isn't it?
ticker1 >>= ticker2 >>= ticker3 >>= ticker1 creates a cycle in the network. Hence, the signal will cycle through the network indefinitely (it only stops because of the shutdown). I am now writing some docs to talk about threading in Dataflow.Signals and comment on this example further. Should be done later today. Best, Stjepan
Stjepan Rajko wrote: Hi
On Sun, Sep 7, 2008 at 3:41 AM, Manuel Jung
wrote: I have looked at the new example. I think it is a good approach to make a async component from a normal component. The first example with the inc works fine, but the 3rd (2nd is disabled, but looking forward to this kind..)
2nd example should work now (just did a commit), with the latest updates Oliver Kowalke did on the threadpool lib.
Great, will take a look at it.
seems not to work right? My output is e.g.:
adding task tick 1... adding task tick 2... adding task tick 3... adding task tick 1... adding task tick 2... adding task
But since the network is just "tick_starter >>= ticker1 >>= ticker2 >>= ticker3 >>= ticker1;", the last "tick 2..." is too much, isn't it?
ticker1 >>= ticker2 >>= ticker3 >>= ticker1 creates a cycle in the network. Hence, the signal will cycle through the network indefinitely (it only stops because of the shutdown).
Ah... sure :-) I just got confused. Hmm seems like very few events in 5 seconds!
I am now writing some docs to talk about threading in Dataflow.Signals and comment on this example further. Should be done later today.
Great :-) Greetings Manuel
Best,
Stjepan
participants (2)
-
Manuel Jung
-
Stjepan Rajko