Stjepan Rajko wrote: Hey Stjepan,
I want to use dataflow.signals, which is build on top of thread_safe_signals.
Do you mean that you would like to use it, or are using it, on top of thread_safe_signals?
I'm using dataflow. I just wanted to point out, that the knowledge of dataflow is not needed.
Each call of a producer (each signal) is put into a threadpool. Now i want not to execute the whole chain of data processing at once, but just one chain link at the time and put a new task to the threadpool than. The goal is that i can put rather complex and long (running) dataflow chains in the threadpool, which are divided into many pieces, so the threadpool can often decide which task is most important.
This is a very nice use case. Are you using any particular threadpool implementation?
Yes, I'm using the proposed threadpool implemantation from Oliver Kowalke, so i can chain tasked by using futures.
So how can i alter the call of the next signal in a signal network/chain?
I can see two ways of doing this. One is to leave the signal/connection semantics the same but change the semantics of the receiving component, to where the receiver adds a task to the threadpool upon receiving the signal (and makes sure that the threadpool task either reports back when it is done, or knows how to launch the next threadpool component). The syntax would maybe look something like this:
// the components are task launchers launcher<task1> component_a(some_threadpool); launcher<task2> component_b(some_threadpool);
// connect them component_a >>= component_b;
// start the processing chain component_a(data_for_task_a);
The other way is to change the semantics of connecting, to where the connection takes care of entering the receiving component into the threadpool. The syntax would be:
// the tasks themselves are the components task1 component_a(some_threadpool); task2 component_b(some_threadpool);
// connect the tasks directly component_a >>= component_b;
// start the processing chain component_a(data_for_task_a);
I wouldn't mind putting together a little example of something like this. Do you have a preference for either the above approaches? The first one would fit neatly into the existing dataflow.signals network, and for the other one we would have to implement a new type of connection. Not sure which one would be easier.
I think, i would prefer something more like the second approach, where the operators are altered. I have this in mind: You should just be able to user another type of conenction of components, an async typ, which uses a threadpool. This way you are still able to connect components the normal way so they get executed in one row (would be useful e.g. when the components are rather fast executed) MyComponentA componentA; MyComponentB componentB; //Well This would be nice, but i'm not sure if it is possible? //Overload the >>= Operator (and the | operator of course) componentA >>=(pool) componentB; //A submit function, similar to invoke //All parts of the chain have to be added to the threadpool now submit(componentA, pool); ----------- You will need the submit function because a chained submit in the threadpool library works like this: tp::task< int > t1( pool.submit( boost::bind( add_fn, 1, 2) ) ); tp::task< int > t2( pool.chained_submit( boost::bind( add_fn, t1.get_future(), 3), t1) ); So until you add the first component to a threadpool, no task object is created and therefore you are not able to chain submit the rest of the components in the dataflow. This approach would make it also possible to add different chain links to different pool objects (handy for resource managment) or tie them directly to the prior chain link. If the overloaded >>= syntax isn't possible would be an extra filter possible too: ComponentA >>= delay(pool) >>= ComponentB; Maybe "delay" isn't a adequate name for it, but i can't think of another at the moment. An example would be really great! Do you know the proposed threadpool library? Kind Regards Manuel