[Opengeoscience-developers] OpenGeoscience branch, test-mpi-stream, updated. 97672ee258fc9c11221e787d6fc15d5d0cbf5807

Ben Burnett bburnett at poly.edu
Fri Mar 8 17:51:57 EST 2013


This is an automated email from the git hooks/post-receive script. It was
generated because a ref change was pushed to the repository containing
the project "OpenGeoscience".

The branch, test-mpi-stream has been updated
       via  97672ee258fc9c11221e787d6fc15d5d0cbf5807 (commit)
       via  375b62d6d7d6e0c7f8a4244538a4cc28cfbda657 (commit)
       via  e94d26fffb236126b70849d62993552effc3e074 (commit)
       via  40bd8d06338b9f596045e9e17a39055bf56ccfa5 (commit)
       via  7f91ee4b8ddb7d89f2ea7bcc49923ae436e8bc69 (commit)
       via  60083eb99d470a00ebef09b68cf1bf854023b01b (commit)
       via  ec96a5d69d972437e3371e6eccf049e46675b4d1 (commit)
       via  d6e0f37d6a191b930907c52de66138177776154d (commit)
       via  e8f1834140920aaebaa19f44f40957a673ba9581 (commit)
       via  597c3cb5db990496edbbf33853d68109b2fd2f48 (commit)
       via  4790a51068213c566f1d880e13ba9d3d890b1250 (commit)
       via  3754f834428aa9a448156b1645cdef308effe881 (commit)
       via  a882e6b5d1094a76cdc340f3967f1f5966fb2241 (commit)
       via  9230d31c46f8963d4f9a8bb8c3589769fc31d076 (commit)
       via  93a69f52b63babe3e9642de2b7de5885f961ba6b (commit)
       via  ac5f6fe510bcb782ad39852048e4562440a5bf4b (commit)
       via  d1410c19b0c64d99e709ead16467e3466bd0b0e8 (commit)
       via  df4d6c04de3f0ba69968efd94ffafd5b459e40d9 (commit)
       via  47c6fdc45819e689355385a5b95d15500fe6b333 (commit)
       via  d5d02452db746b4dccf15838bfdea8b5c95bb406 (commit)
       via  2a9f3465e2f164b9e7cb848947d40e45fcd11a2b (commit)
       via  1bc586ea2e6eb2e090671c0d5f2e4e00f807c7e5 (commit)
       via  c6e36c118843dd9f45c34564e664118585fc90d8 (commit)
       via  18ff87b9f943a659be590098e2ac674161ccfcb4 (commit)
      from  38ffba64e072f76e0ac712c1fa1455696c4e4d11 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.

- Log -----------------------------------------------------------------
http://public.kitware.com/gitweb?p=OpenGeoscience/opengeoscience.git;a=commitdiff;h=97672ee258fc9c11221e787d6fc15d5d0cbf5807
commit 97672ee258fc9c11221e787d6fc15d5d0cbf5807
Merge: 375b62d e94d26f
Author:     Ben Burnett <bburnett at poly.edu>
AuthorDate: Fri Mar 8 17:47:48 2013 -0500
Commit:     Ben Burnett <bburnett at poly.edu>
CommitDate: Fri Mar 8 17:47:48 2013 -0500

    Merge branch 'master' into test-mpi-stream


http://public.kitware.com/gitweb?p=OpenGeoscience/opengeoscience.git;a=commitdiff;h=375b62d6d7d6e0c7f8a4244538a4cc28cfbda657
commit 375b62d6d7d6e0c7f8a4244538a4cc28cfbda657
Author:     Ben Burnett <bburnett at poly.edu>
AuthorDate: Fri Mar 8 17:41:58 2013 -0500
Commit:     Ben Burnett <bburnett at poly.edu>
CommitDate: Fri Mar 8 17:41:58 2013 -0500

    Streaming data via websockets WIP
    
    Changed streaming strategy a little.
    
    Master process just finds out the bounds for each sub region
    and sends those to workers.
    
    Workers read the data file directly based on bounds. Currently
    performing a time average on the 'clt' cdmsVariable from 'clt.nc'
    file.
    
    TODO:
    -plot region, save to jpg, encode in base64, send to master process
    which sends to browser.
    -find a way to plot the data to some memory buffer that can be encoded
    to jpg
    
    Right now just sending back random uuid filepath strings as processes
    finish.

diff --git a/CMakeLists.txt b/CMakeLists.txt
index 6ab4a13..ba937c5 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -108,16 +108,18 @@ set(JS_DOCUMENT_FILES
 set(SOURCES
     ogsctl
     ogs.py
-    websocket_chat.py
-    websocket_pi.py
-    websocket_stream.py
-    cdmsProcess.py
-    cpi.py
+    ws/__init__.py
+    ws/chat.py
+    ws/pi.py
+    ws/stream.py
+    ws/workers/cdmsProcess.py
+    ws/workers/cpi.py
     web/index.html
     web/lib/app.js
     web/lib/gl-matrix.js
     web/lib/glUtils.js
     web/lib/sylvester.js
+    tmp
 )
 
 if(DEPLOY_TEST_SERVICES)
diff --git a/cdmsProcess.py b/cdmsProcess.py
deleted file mode 100644
index fbeb0ac..0000000
--- a/cdmsProcess.py
+++ /dev/null
@@ -1,35 +0,0 @@
-from array import array
-from mpi4py import MPI
-try:
-    import cPickle as pickle
-except:
-    import pickle
-
-try:
-    comm = MPI.Comm.Get_parent()
-    rank = comm.Get_rank()
-    size = comm.Get_size()
-
-    print 'Child rank: %d     size: %d' % (rank,size)
-
-    comm.Barrier()
-    region = comm.recv()
-
-    print 'Child %d recieved %s' % (rank, region.typecode())
-
-    data = array('c',pickle.dumps(region))
-    comm.Send(data)
-
-    print 'sent data to master'
-
-    comm.Barrier()
-    comm.Disconnect()
-
-except Exception, err:
-    import traceback
-    tb = traceback.format_exc()
-    print tb
-    try:
-        comm.Disconnect()
-    except:
-        pass
diff --git a/ogs.py b/ogs.py
index 92e9c98..e62738a 100644
--- a/ogs.py
+++ b/ogs.py
@@ -7,9 +7,9 @@ import os
 
 #websocket imports
 from ws4py.server.cherrypyserver import WebSocketPlugin, WebSocketTool
-from websocket_chat import ChatRoot
-from websocket_pi import PiRoot
-from websocket_stream import StreamRoot
+from ws.chat import ChatRoot
+from ws.pi import PiRoot
+from ws.stream import StreamRoot
 
 current_dir = os.path.dirname(os.path.abspath(__file__))
 
diff --git a/server.conf.in b/server.conf.in
index a2f2d11..34b628d 100644
--- a/server.conf.in
+++ b/server.conf.in
@@ -14,12 +14,12 @@ tools.staticdir.dir = "data"
 
 [/chat/ws]
 tools.websocket.on: True
-tools.websocket.handler_cls: websocket_chat.ChatWebSocketHandler
+tools.websocket.handler_cls: ws.chat.ChatWebSocketHandler
 
 [/pi/ws]
 tools.websocket.on: True
-tools.websocket.handler_cls: websocket_pi.PiWebSocketHandler
+tools.websocket.handler_cls: ws.pi.PiWebSocketHandler
 
 [/stream/ws]
 tools.websocket.on: True
-tools.websocket.handler_cls: websocket_stream.StreamWebSocketHandler
\ No newline at end of file
+tools.websocket.handler_cls: ws.stream.StreamWebSocketHandler
\ No newline at end of file
diff --git a/ws/__init__.py b/ws/__init__.py
new file mode 100644
index 0000000..01145f2
--- /dev/null
+++ b/ws/__init__.py
@@ -0,0 +1,2 @@
+#make folder a module
+pass
diff --git a/websocket_chat.py b/ws/chat.py
similarity index 100%
rename from websocket_chat.py
rename to ws/chat.py
diff --git a/websocket_pi.py b/ws/pi.py
similarity index 96%
rename from websocket_pi.py
rename to ws/pi.py
index d0fc5ff..abb7d90 100644
--- a/websocket_pi.py
+++ b/ws/pi.py
@@ -23,9 +23,9 @@ class PiWebSocketHandler(WebSocket):
 
             self.processing = True
             import ogs
-            thisdir = ogs.current_dir
-            cherrypy.log("Dir %s" % thisdir)
-            exec_py = os.path.join(thisdir, 'cpi.py')
+            deploydir = ogs.current_dir
+            cherrypy.log("Dir %s" % deploydir)
+            exec_py = os.path.join(deploydir, 'ws/workers/cpi.py')
             cherrypy.log("Path %s" % exec_py)
             comm = MPI.COMM_SELF.Spawn(sys.executable,
                                        args=[exec_py],
diff --git a/websocket_stream.py b/ws/stream.py
similarity index 63%
rename from websocket_stream.py
rename to ws/stream.py
index dd838a8..9cb0179 100644
--- a/websocket_stream.py
+++ b/ws/stream.py
@@ -19,6 +19,37 @@ from ws4py.server.cherrypyserver import WebSocketPlugin, WebSocketTool
 from ws4py.websocket import WebSocket
 from ws4py.messaging import TextMessage
 
+def get2DBins(x,y,binSizeX,binSizeY):
+    """Splits 2D region into bins
+
+    Arguments
+    x -> list of x coordinates
+    y -> list of y coordinates
+    binSizeX -> number of x coordinates in each bin
+    binSizeY -> number of y coordinates in each bin
+
+    returns: list of tuples, each containing the bounds indexes for bins
+        in the format [(x1,x2,y1,y2),...]
+    """
+
+    result = []
+    xlength = len(x)
+    ylength = len(y)
+
+    i=0
+    for i1 in range(0,xlength,binSizeX):
+        i2 = i1+binSizeX-1
+        if i2 >= xlength:
+            i2 = xlength-1
+        for j1 in range(0,ylength,binSizeY):
+            j2 = j1+binSizeY-1
+            if j2 >= ylength:
+                j2 = ylength-1
+            result.append((i1,i2,j1,j2))
+
+    return result
+
+
 class StreamWebSocketHandler(WebSocket):
 
     def received_message(self, m):
@@ -28,109 +59,88 @@ class StreamWebSocketHandler(WebSocket):
 
             self.processing = True
             import ogs
-            thisdir = ogs.current_dir
-            exec_py = os.path.join(thisdir, 'cdmsProcess.py')
-            data_file = os.path.join(thisdir, 'data/clt.nc')
+            deploydir = ogs.current_dir
+            cherrypy.log("Dir %s" % deploydir)
+            exec_py = os.path.join(deploydir, 'ws/workers/cdmsProcess.py')
+            data_file = os.path.join(deploydir, 'data/clt.nc')
 
-            #read data file
+            cherrypy.log('read data file')
             f = cdms2.open(data_file)
-            u = f['u']
+            clt = f['clt']
 
-            #split data into regions
+            cherrypy.log('split data into regions')
             #need some hueristics to determine size of bins
-            #just split to 10x10 for test
-            latticks = u.getLatitude().getValue()
-            latsize = len(latticks)
-            latstep = int(latsize / 10)
-
-            lonticks = u.getLongitude().getValue()
-            lonsize = len(lonticks)
-            lonstep = int(lonsize / 10)
-
-            #do the actual splitting using cdmsVar.subRegion
-            i=0
-            regions = []
-            while i < latsize:
-                i2 = i + latstep-1
-                if i2 >= latsize:
-                    i2 = latsize-1
-                j=0
-                while j < lonsize:
-                    j2 = j + lonstep-1
-                    if j2 >= lonsize:
-                        j2 = lonsize-1
-                    regions.append(
-                        u.subRegion(
-                            latitude=(latticks[i],latticks[i2]),
-                            longitude=(lonticks[j],lonticks[j2])))
-                    j = j2+1
-                i = i2 + 1
+            #just split into 10x10 regions for test
+            latCoords = clt.getLatitude().getValue()
+            lonCoords = clt.getLongitude().getValue()
+            regionIndexes = get2DBins(latCoords, lonCoords, 10, 10)
+            latBounds = [(latCoords[i1],latCoords[i2]) for (i1,i2,j1,j2) in regionIndexes]
+            lonBounds = [(lonCoords[j1],lonCoords[j2]) for (i1,i2,j1,j2) in regionIndexes]
+            latIndexs = [(i1,i2) for (i1,i2,j1,j2) in regionIndexes]
+            lonIndexs = [(j1,j2) for (i1,i2,j1,j2) in regionIndexes]
+
 
+            cherrypy.log('get cpus')
             try:
                 numcpus = determineNumberOfCPUs()
             except:
                 numcpus = 8
 
-            #spawn processes
+            cherrypy.log('spawn processes')
             comm = MPI.COMM_SELF.Spawn(sys.executable,
                                        args=[exec_py],
                                        maxprocs=numcpus)
             self.comm = comm
 
-            #set up communication objects
-            results = {}
-            requests = {}
-            sizes = {}
-            completed = {}
-            started = {}
-            newregions = {}
-            for k in range(8):
-                cherrypy.log(str(k))
-                comm.send(regions[k],k)
+            cherrypy.log('send data file path %s' % data_file)
+            comm.bcast(data_file, root=MPI.ROOT)
 
-            comm.Barrier()
+            cherrypy.log('set up communication objects')
+            working = [False for i in range(numcpus)]
+            requests = [None for i in range(numcpus)]
+            buffers = [array('c') for i in range(numcpus)]
+            sizes = [0 for i in range(numcpus)]
+            rindexes = [i for i in range(numcpus)]
 
+            currentRegionIndex = 0
+            finishCount = 0
             Status = MPI.Status()
-
-            if False:
-                for kk in range(8):
-                    comm.Probe(kk, 0, Status)
-                    cherrypy.log("data size from %d: %d" % (kk, Status.Get_count(MPI.CHAR)))
-                    buf = array('c','\0'*Status.Get_count(MPI.CHAR))
-                    comm.Recv(buf,kk)
-                comm.Barrier()
-                comm.Disconnect()
-
-                return;
-
-            #as processes finish, send websocket back to client browser
-            completeCount = 0
-            while completeCount < 8:
-                #probe for messages that haven't been started recieving
-                for ii in range(8):
-                    if ii not in started:
-                        if comm.Iprobe(ii,0,Status):
-                            started[ii] = True
-                            datasize = Status.Get_count(MPI.CHAR)
-                            results[ii] = array('c', '\0'*datasize)
-                            cherrypy.log("Process %d sending" % ii)
-                            requests[ii] = comm.Irecv(results[ii],ii)
-                            self.send(TextMessage("Process %d sending" % ii))
-
-                #test recieving messages for copmleteness
-                for jj in range(8):
-                    if jj in started and jj not in completed:
-                        if(requests[jj].Test()):
-                            newregions[jj] = pickle.loads(results[jj].tostring())
-                            completed[jj] = True
-                            completeCount+=1
-
-                #let other threads have a chance to do something
+            running = True
+            while finishCount < len(latBounds):
+                #for each idle process, send data
+                for i in range(numcpus):
+                    if not working[i] and currentRegionIndex < len(latBounds):
+                        comm.send((latIndexs[currentRegionIndex],lonIndexs[currentRegionIndex]),i)
+                        working[i] = True
+                        rindexes[i] = currentRegionIndex
+                        currentRegionIndex += 1
+
+
+                #probe each process, unless it's already sending result
+                for i in range(numcpus):
+                    if working[i] and requests[i] is None and comm.Iprobe(i,0,Status):
+                        sizes[i] = Status.Get_count(MPI.CHAR)
+                        if len(buffers[i]) < sizes[i]:
+                            buffers[i].extend('\0'*(sizes[i]-len(buffers[i])))
+                        requests[i] = comm.Irecv(buffers[i],i)
+
+                #test async recieve operation, send data to browser if done
+                for i in range(numcpus):
+                    if working[i] and requests[i] is not None and requests[i].Test():
+                        self.send(TextMessage('%d,%s'%(rindexes[i],buffers[i][:sizes[i]].tostring())))
+                        finishCount += 1
+                        working[i] = False
+                        requests[i] = None
+
+                #let other threads do some work
                 time.sleep(0.1)
 
             cherrypy.log("All messages completed")
 
-            comm.Barrier()
+            #send quit message to spawned processes
+            for i in range(numcpus):
+                comm.send(0,i)
+
             comm.Disconnect()
             self.comm = None
             self.processing = False
@@ -141,7 +151,6 @@ class StreamWebSocketHandler(WebSocket):
             cherrypy.log(tb)
             cherrypy.log(str(err))
             try:
-                self.comm.Barrier()
                 self.comm.Disconnect()
             except:
                 pass
diff --git a/ws/workers/cdmsProcess.py b/ws/workers/cdmsProcess.py
new file mode 100644
index 0000000..765f8b5
--- /dev/null
+++ b/ws/workers/cdmsProcess.py
@@ -0,0 +1,67 @@
+from array import array
+from mpi4py import MPI
+try:
+    import cPickle as pickle
+except:
+    import pickle
+import cdms2
+import vcs
+from uuid import uuid4
+import base64
+import cdutil, os
+
+try:
+    comm = MPI.Comm.Get_parent()
+    rank = comm.Get_rank()
+    size = comm.Get_size()
+
+    data_file = comm.bcast(None, root=0)
+    print 'Child rank: %d     file: %s' % (rank,data_file)
+
+    f = cdms2.open(data_file, 'r')
+    clt = f['clt']
+
+    #canvas = vcs.init()
+
+    while True:
+        region = comm.recv()
+
+        if type(region) == int and region == 0:
+            print "Child %d quitting" % rank
+            break
+
+        print 'Child %d recieved region %s' % (rank,str(region))
+        latBounds = region[0]
+        lonBounds = region[1]
+
+        #perform time average on data
+        regiondata = clt[:,latBounds[0]:latBounds[1],lonBounds[0]:lonBounds[1]]
+        cdutil.setTimeBoundsMonthly(regiondata)
+        avg = cdutil.averager(regiondata,axis='t')
+
+        #plot image
+        #canvas.plot(avg)
+
+        #save to temp file
+        temp_image_file = os.path.join(os.path.dirname(data_file),'%s.jpg'%str(uuid4()))
+        #canvas.gs(temp_image_file, 'jpeg', 'l', '50x50')
+
+        #convert image data to base64
+        #with open(temp_image_file, "rb") as temp_image:
+        #    base64jpg = base64.b64encode(temp_image.read())
+
+        #send base64 image string back to master to be sent to browser
+        #data = array('c',base64jpg)
+        data = array('c',temp_image_file)
+        comm.Send(data)
+
+    comm.Disconnect()
+
+except Exception, err:
+    import traceback
+    tb = traceback.format_exc()
+    print tb
+    try:
+        comm.Disconnect()
+    except:
+        pass
diff --git a/cpi.py b/ws/workers/cpi.py
similarity index 100%
rename from cpi.py
rename to ws/workers/cpi.py

-----------------------------------------------------------------------

Summary of changes:
 CMakeLists.txt                           |   19 +-
 cdmsProcess.py                           |   35 ---
 ogs.py                                   |    6 +-
 server.conf.in                           |    6 +-
 web/index.html                           |   37 ++-
 web/lib/app.js                           |   49 +++-
 web/lib/geo/{layer.js => feature.js}     |   71 +++---
 web/lib/{vgl/object.js => geo/latlng.js} |   40 ++-
 web/lib/geo/layer.js                     |  136 +++++++++-
 web/lib/geo/map.js                       |  171 +++++++-----
 web/lib/init.js                          |   35 ++--
 web/lib/vgl/actor.js                     |  289 ++++++++++++---------
 web/lib/vgl/blend.js                     |   97 +++++++
 web/lib/vgl/boundingObject.js            |   85 +++---
 web/lib/vgl/camera.js                    |  428 ++++++++++++++++++------------
 web/lib/vgl/geomData.js                  |  311 +++++++++++++---------
 web/lib/vgl/groupNode.js                 |  170 ++++++------
 web/lib/vgl/mapper.js                    |  259 ++++++++++---------
 web/lib/vgl/material.js                  |  177 +++++++------
 web/lib/vgl/materialAttribute.js         |   72 +++---
 web/lib/vgl/modelViewMatrixStack.js      |   12 +-
 web/lib/vgl/node.js                      |  226 +++++++++-------
 web/lib/vgl/object.js                    |   35 ++-
 web/lib/vgl/planeSource.js               |   23 ++-
 web/lib/vgl/renderer.js                  |  301 ++++++++++++---------
 web/lib/vgl/shader.js                    |  121 +++++-----
 web/lib/vgl/shaderProgram.js             |  316 +++++++++++-----------
 web/lib/vgl/source.js                    |    5 +-
 web/lib/vgl/texture.js                   |  310 +++++++++++-----------
 web/lib/vgl/uniform.js                   |  219 +++++++++-------
 web/lib/vgl/utils.js                     |  209 +++++++++++----
 web/lib/vgl/vertexAttribute.js           |   55 ++--
 ws/__init__.py                           |    2 +
 websocket_chat.py => ws/chat.py          |    0
 websocket_pi.py => ws/pi.py              |    6 +-
 websocket_stream.py => ws/stream.py      |  173 +++++++------
 ws/workers/cdmsProcess.py                |   67 +++++
 cpi.py => ws/workers/cpi.py              |    0
 38 files changed, 2678 insertions(+), 1895 deletions(-)
 delete mode 100644 cdmsProcess.py
 copy web/lib/geo/{layer.js => feature.js} (50%)
 copy web/lib/{vgl/object.js => geo/latlng.js} (63%)
 create mode 100644 web/lib/vgl/blend.js
 create mode 100644 ws/__init__.py
 rename websocket_chat.py => ws/chat.py (100%)
 rename websocket_pi.py => ws/pi.py (96%)
 rename websocket_stream.py => ws/stream.py (63%)
 create mode 100644 ws/workers/cdmsProcess.py
 rename cpi.py => ws/workers/cpi.py (100%)


hooks/post-receive
-- 
OpenGeoscience



More information about the Opengeoscience-developers mailing list