VTK/Streaming
Streaming pieces
Streaming a number of pieces is pretty straightforward. Here is an example
alg->UpdateInformation();
vtkStreamingDemandDrivenPipeline* exec = vtkStreamingDemandDrivenPipeline::SafeDownCast(alg->GetExecutive());
if (!exec || exec->GetMaximumNumberOfPieces() < 10)
{
die();
}
for (int i=0; i<10; i++)
{
exec->SetUpdateExtent(0, i, 10, 0);
alg->Update();
// do something with alg->GetOutputDataObject(0)
}
This example streams a pipeline that ends with the algorithm alg in 10 pieces. The best example of streaming pieces is vtkPolyDataMapper:
void vtkPolyDataMapper::Render(vtkRenderer *ren, vtkActor *act)
{
// ...
nPieces = this->NumberOfPieces * this->NumberOfSubPieces;
for(int i=0; i<this->NumberOfSubPieces; i++)
{
// If more than one pieces, render in loop.
currentPiece = this->NumberOfSubPieces * this->Piece + i;
input->SetUpdateExtent(currentPiece, nPieces, this->GhostLevel);
this->RenderPiece(ren, act);
}
}
For a good example of streaming extents, see vtkMemoryLimitImageDataStreamer and its superclass vtkImageDataStreamer.
Streaming time steps
This example demonstrates how a filter can stream multiple time steps. The example we will use to demonstrate this functionality is vtkTemporalStatistics.
Streaming happens over multiple executions of the filter. Therefore, we need a way of keeping track of which time step we are currently processing. For this purpose, vtkTemporalStatistics uses an data member called CurrentTimeIndex. CurrentTimeIndex is initialized to 0 in the constructor.
vtkTemporalStatistics::vtkTemporalStatistics()
{
this->CurrentTimeIndex = 0;
}
In RequestUpdateExtent(), we request the current time index. This is initially set to 0 but will be incremented after the streaming starts.
int vtkTemporalStatistics::RequestUpdateExtent(
vtkInformation *vtkNotUsed(request),
vtkInformationVector **inputVector,
vtkInformationVector *vtkNotUsed(outputVector))
{
vtkInformation *inInfo = inputVector[0]->GetInformationObject(0);
double *inTimes = inInfo->Get(vtkStreamingDemandDrivenPipeline::TIME_STEPS());
if (inTimes)
{
inInfo->Set(vtkStreamingDemandDrivenPipeline::UPDATE_TIME_STEPS(),
&inTimes[this->CurrentTimeIndex], 1);
}
return 1;
}
In RequestData(), we first process the current time step. We then increment the current time index and set CONTINUE_EXECUTING if there are more time steps to process. This tells the executive to perform another pass of the REQUEST_UPDATE_EXTENT and REQUEST_DATA. This will lead to another call to RequestUpdateExtent() and RequestData() with the next time index. This will continue until the filter removes CONTINUE_EXECUTING from the request.
int vtkTemporalStatistics::RequestData(vtkInformation *request,
vtkInformationVector **inputVector,
vtkInformationVector *outputVector)
{
vtkInformation *inInfo = inputVector[0]->GetInformationObject(0);
vtkInformation *outInfo = outputVector->GetInformationObject(0);
vtkDataObject *input = vtkDataObject::GetData(inInfo);
vtkDataObject *output = vtkDataObject::GetData(outInfo);
if (this->CurrentTimeIndex == 0)
{
// First execution, initialize arrays.
this->InitializeStatistics(input, output);
}
else
{
// Subsequent execution, accumulate new data.
this->AccumulateStatistics(input, output);
}
this->CurrentTimeIndex++;
if ( this->CurrentTimeIndex
< inInfo->Length(vtkStreamingDemandDrivenPipeline::TIME_STEPS()))
{
// There is still more to do.
request->Set(vtkStreamingDemandDrivenPipeline::CONTINUE_EXECUTING(), 1);
}
else
{
// We are done. Finish up.
this->PostExecute(input, output);
request->Remove(vtkStreamingDemandDrivenPipeline::CONTINUE_EXECUTING());
this->CurrentTimeIndex = 0;
}
return 1;
}