[vtkusers] Help with data parallelism!

D.P. Gavidia Simonetti Daniela.GavidiaSimonetti at student.uva.nl
Mon Apr 21 19:03:41 EDT 2003


Hi, 
I've been trying to generate a cutting plane of a volume in parallel using threads. I modified the example ParallelIso.cxx, which generates isosurfaces using data parallelism. In each thread I'm doing the following:
thread 1 - numProcs:
- read structured points -> subsample -> create cutting plane -> put PolyData ouput in Output port
thread 0:
- read structured points -> subsample -> create cutting plane -> append own PolyData output and output from other threads (received through InputPort) with AppendPolyData -> map, update and render
The problem is that when I run on 3 or more threads, I get the plane that I want plus many random polygons all around it. I don't know where these come from. I put some snapshots and my code online:
http://www.geocities.com/dgavidia2003/parallel.html
You can try the code with ironProt.vtk
Have any of you calculated a cutting plane in parallel? I'm using a computer with 4 processors, so I'd like to parallelize this and get some speed-up. 
Do any of you know if vtkCutter supports parallel streaming?
If any of you know what I'm doing wrong or have any suggestions on parallelizing the cutting plane, please let me know.
Thanks in advance.

Daniela Gavidia

// This will be called by all processes
void MyMain( vtkMultiProcessController *controller, void *arg )
{

  vtkStructuredPointsReader *reader;
  vtkExtractVOI *voi;
  vtkPlane *cutPlane = vtkPlane::New();
  vtkCutter *planeCut = vtkCutter::New();

  float *n;
  int myid, numProcs;
  char* fname = reinterpret_cast<char*>(arg);
    
  // define a normal for the cutting plane
  n = (float *)malloc(3*sizeof(float));
  n[0] = 0.0;
  n[1] = 0.0;
  n[2] = 100.0; 

  // Obtain the id of the running process and the total
  // number of processes
  myid = controller->GetLocalProcessId();
  numProcs = controller->GetNumberOfProcesses();
    
  // Create the reader, the data file name might have
  // to be changed depending on where the data files are.

  reader = vtkStructuredPointsReader::New();
  reader->SetFileName(fname);
  reader->Update();

  voi = vtkExtractVOI::New();
  voi->SetInput(reader->GetOutput());
  voi->SetSampleRate(2,2,2);
  voi->Update();
  float *center = (float *)calloc(3, sizeof(float));

  center = reader->GetOutput()->GetCenter();
  cutPlane->SetOrigin(center);
  cutPlane->SetNormal(n[0],n[1],n[2]);
  planeCut->SetCutFunction(cutPlane);
  planeCut->SetInput(voi->GetOutput());

  if ((myid != 0))
    {
      printf("My id = %d\n", myid);
    // If I am not the root process

    // Satellite process! Send data through port.
    vtkOutputPort *upPort = vtkOutputPort::New();
    
    // connect the port to the output of the pipeline
    upPort->SetInput(planeCut->GetOutput());
    // Multiple ports can go through the same connection.
    // This is used to differentiate ports
    upPort->SetTag(PORT_TAG);

    // Loop which processes RMI requests. 
    // Use vtkMultiProcessController::BREAK_RMI_TAG to break it.
    upPort->WaitForUpdate();
    
    // We are done. Clean up.
    upPort->Delete();
    upPort = NULL;
    }
  else
    {
    // If I am the root process

    int i, j;
    vtkAppendPolyData *app = vtkAppendPolyData::New();
    vtkInputPort *downPort;
    vtkRenderer *ren = vtkRenderer::New();
    vtkRenderWindow *renWindow = vtkRenderWindow::New();
    vtkRenderWindowInteractor *iren = vtkRenderWindowInteractor::New();
    vtkDataSetMapper *mapper = vtkDataSetMapper::New();
    vtkActor *actor = vtkActor::New();
    vtkTimerLog *timer = vtkTimerLog::New();


    // Add my pipeline's output to the append filter
    app->AddInput(planeCut->GetOutput());

    // ###################### important ####################
    // # This tells the append filter to request pieces from
    // # each of its inputs.  Since each of its inputs comes from
    // # a different process,  each process generates a separate 
    // # piece of the data (data parallelism).
    // # If this is not used, all processes will iso-surface
    // # all the data.
      app->ParallelStreamingOn();
    
    // This is the main thread: Collect the data and render it.
    for (i = 1; i < numProcs; ++i)
      {
	printf("downPort %d\n",i);
      downPort = vtkInputPort::New();
      downPort->SetRemoteProcessId(i);

      // Multiple ports can go through the same connection.
      // This is used to differentiate ports
      downPort->SetTag(PORT_TAG);
      //downPort->GetPolyDataOutput()->Update();
      app->AddInput(downPort->GetPolyDataOutput());

      // Reference already incremented by AddInput(). Delete()
      // will only decrement the count, not destroy the object.
      // The ports will be destroyed when the append filter
      // goes away.
      downPort->SetDoUpdateInformation(0);
      downPort->Delete();
      downPort = NULL;
      }

    // Create the rendering part of the pipeline
    renWindow->AddRenderer(ren);
    iren->SetRenderWindow(renWindow);
    ren->SetBackground(1.0, 0.9, 0.9);
    renWindow->SetSize( 400, 400);

    // Create white to black colormap
    vtkLookupTable *lbw = vtkLookupTable::New();
    // white to black
    lbw->SetHueRange(0.0, 0.0);
    lbw->SetSaturationRange(0.0, 0.0);
    lbw->SetValueRange(1.0, 0.0);

    float *range = (float *)calloc(2, sizeof(float));
    reader->GetOutput()->GetPointData()->GetScalars()->GetRange(range);
    mapper->SetInput(app->GetOutput());
    mapper->SetScalarRange(range);
    mapper->SetLookupTable(lbw);
    actor->SetMapper(mapper);

    ren->AddActor(actor);
  
    // Time the rendering. Note that the execution on all processes
    // start only after Update()
    timer->StartTimer();
    app->Update();
    timer->StopTimer();

    cerr << "Update  took " << timer->GetElapsedTime() 
           << " seconds\n";
      
    // now render the results
    timer->StartTimer();
    renWindow->Render();
    timer->StopTimer();     
    cerr << "Render() took " << timer->GetElapsedTime() 
           << " seconds\n";

    printf("Before triggerRMI\n");
    // Tell the other processors to stop processing RMIs.
    for (i = 1; i < numProcs; ++i)
      {
      controller->TriggerRMI(i, vtkMultiProcessController::BREAK_RMI_TAG); 
      }
    printf("After triggerRMI\n");    

    iren->Start();
    // Clean up
    app->Delete();
    ren->Delete();
    renWindow->Delete();
    iren->Delete();
    mapper->Delete();
    actor->Delete();
    timer->Delete();
    }
    printf("Before process cleanup\n");  
  // clean up objects in all processes.
  reader->Delete();
}


int main( int argc, char* argv[] )
{
  vtkMultiProcessController *controller;

  // Note that this will create a vtkMPIController if MPI
  // is configured, vtkThreadedController otherwise.
  controller = vtkMultiProcessController::New();

  controller->Initialize(&argc, &argv);

  // Name of StructuredPoints data file
  char *fname = "/home/dgavidia/threads/obj_394.vtk";
  controller->SetSingleMethod(MyMain, reinterpret_cast<void*>(fname));
  int nprocs = atoi(argv[1]);
  printf("Number of Proceses: %d\n", nprocs);

  // When using MPI, the number of processes is determined
  // by the external program which launches this application.
  // However, when using threads, we need to set it ourselves.
  if (controller->IsA("vtkThreadedController"))
    {
    // Set the number of processes to 2 for this example.
    controller->SetNumberOfProcesses(nprocs);
    } 
  controller->SingleMethodExecute();
  printf("Return to main\n");
  delete[] fname;

  controller->Finalize();
  controller->Delete();

  return 0;
}




More information about the vtkusers mailing list