VTK/ThreadedStreamingPipeline
The current design of VTK pipeline is very comprehensive in the way that it can support streaming of structured data as well as allowing users to develop unstructured data streaming (see streaming, VTK's Pipeline). However, it is very difficult (if not possible) to have modules of a pipeline executed simultaneously in a multi-threaded environment (vs.dividing data into smaller pieces and run through multiple instances of that pipeline). This is because in a demand-driven pipeline, when a consumer demands data, it has to wait (and lock) for all of its upstream modules to update. This only serves well with a single-threaded updating mechanism. To make this threaded, an event-driven model has to added. Modules can process data as soon as all of its inputs arrive and signal its downstream modules to execute when it is done processing.
Below are the details (still super brief and incomplete, just act as notes for the meeting now).
vtkThreadedStreamingPipeline
This is the executive inheriting from vtkCopmositeDataPipeline that add supports for multi-threaded streaming in using the event-driven model but still fit into the demand-driven pipeline of VTK. Since there is going to be a subset of event-driven modules inside a demand-driven pipeline, the executive must be informed of the start (generator) and end (merger) modules of that will executed in the event-driven mode. At run-time, it checks the IS_STREAMING_GENERATOR() and IS_STREAMING_MERGER() keys of an algorithm to acquire the information.
(more information to come soon)
Streaming Generators
In the pipeline, the sources (stream generators), instead of sinks in vtkStreamingDemandDrivenPipeline, are used to control the stream. Here is the example from vtkImageDataStreamGenerator.
<source lang="cpp"> if (request->Has(vtkThreadedStreamingPipeline::REQUEST_STREAM_START())) {
vtkThreadedStreamingPipeline *exec = vtkThreadedStreamingPipeline::SafeDownCast(this->GetExecutive()); this->StreamRestart(); while (this->StreamNext()) { this->StreamRetrieve(outputVector); exec->DonePiece(); } }
</source>
Another simple for loop control is: <source lang="cpp"> if (request->Has(vtkThreadedStreamingPipeline::REQUEST_STREAM_START())) {
vtkThreadedStreamingPipeline *exec = vtkThreadedStreamingPipeline::SafeDownCast(this->GetExecutive()); for (int i=0; i<10; i++) { // Generate data of the current piece ... exec->DonePiece(); } }
</source>
Here the job of DonePiece() is to signal the downstream modules that its output is ready and they can process it. DonePiece() will lock up and wait until its downstream modules are done with the data to move to the next step.
Streaming Modules
The actual syntax of DonePiece() above is: <source lang="cpp"> void DonePiece(bool Pull=true, bool Push=true); </source> If Pull==true, it will also signal the upstream modules (not applicable for generators) to be active and produce more output and Push==true will signal the downstream modules to process what it has. In away, this is like allowing both Pull and Push for modules but only apply for event-driven modules. With this, we can write also write a streamer that accumulate several pieces before processing downstream. Here is an example with a streaming modules require 2 pieces before moving forward: <source lang="cpp">
if (request->Has(vtkThreadedStreamingPipeline::REQUEST_DATA())) { vtkThreadedStreamingPipeline *exec = vtkThreadedStreamingPipeline::SafeDownCast(this->GetExecutive()); // Accumulate the data ... this->AccumTimeSteps++; bool Pull = true; bool Push = false; if (this->AccumPieces==2) { Push = true; this->AccumPieces = 0; } exec->DonePiece(Pull, Push) }
</source>
Note: if DonePiece is not called explicitly in the streaming modules, the executive will call DonePiece(true, true)
Streaming Mergers
Stream Mergers are supposed to handle two request REQUEST_STREAM_START() and REQUEST_STREAM_MERGE(): <source lang="cpp">
if (request->Has(vtkThreadedStreamingPipeline::REQUEST_STREAM_START())) { // prepare to merge to output this->StreamRestart(); } else if (request->Has(vtkThreadedStreamingPipeline::REQUEST_STREAM_MERGE())) { // merge current input to the output this->StreamMergeCurrentInput(outputVector); }
</source>