[Opengeoscience-developers] OpenGeoscience branch, test-mpi-stream, created. 38ffba64e072f76e0ac712c1fa1455696c4e4d11

Ben Burnett bburnett at poly.edu
Tue Mar 5 16:32:13 EST 2013


This is an automated email from the git hooks/post-receive script. It was
generated because a ref change was pushed to the repository containing
the project "OpenGeoscience".

The branch, test-mpi-stream has been created
        at  38ffba64e072f76e0ac712c1fa1455696c4e4d11 (commit)

- Log -----------------------------------------------------------------
http://public.kitware.com/gitweb?p=OpenGeoscience/opengeoscience.git;a=commitdiff;h=38ffba64e072f76e0ac712c1fa1455696c4e4d11
commit 38ffba64e072f76e0ac712c1fa1455696c4e4d11
Author:     Ben Burnett <bburnett at poly.edu>
AuthorDate: Tue Mar 5 16:28:11 2013 -0500
Commit:     Ben Burnett <bburnett at poly.edu>
CommitDate: Tue Mar 5 16:28:11 2013 -0500

    Initial commit testing data streaming
    
    It now opens a cdms file, splits it into 100 subregions,
    sends some regions to different processes, which then send
    it back.
    
    TODO:
    -set up logic to process all regions asynchronously
    -perform some operations on the data in each process
    -send data to browser

diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..90746ea
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,2 @@
+#temporary emacs files
+*~
\ No newline at end of file
diff --git a/CMakeLists.txt b/CMakeLists.txt
index abbf47d..6ab4a13 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -110,6 +110,8 @@ set(SOURCES
     ogs.py
     websocket_chat.py
     websocket_pi.py
+    websocket_stream.py
+    cdmsProcess.py
     cpi.py
     web/index.html
     web/lib/app.js
diff --git a/cdmsProcess.py b/cdmsProcess.py
new file mode 100644
index 0000000..fbeb0ac
--- /dev/null
+++ b/cdmsProcess.py
@@ -0,0 +1,35 @@
+from array import array
+from mpi4py import MPI
+try:
+    import cPickle as pickle
+except:
+    import pickle
+
+try:
+    comm = MPI.Comm.Get_parent()
+    rank = comm.Get_rank()
+    size = comm.Get_size()
+
+    print 'Child rank: %d     size: %d' % (rank,size)
+
+    comm.Barrier()
+    region = comm.recv()
+
+    print 'Child %d recieved %s' % (rank, region.typecode())
+
+    data = array('c',pickle.dumps(region))
+    comm.Send(data)
+
+    print 'sent data to master'
+
+    comm.Barrier()
+    comm.Disconnect()
+
+except Exception, err:
+    import traceback
+    tb = traceback.format_exc()
+    print tb
+    try:
+        comm.Disconnect()
+    except:
+        pass
diff --git a/experimental/mpi/child.py b/experimental/mpi/child.py
new file mode 100644
index 0000000..10efceb
--- /dev/null
+++ b/experimental/mpi/child.py
@@ -0,0 +1,23 @@
+from mpi4py import MPI
+
+try:
+    comm = MPI.Comm.Get_parent()
+    rank = comm.Get_rank()
+    size = comm.Get_size()
+
+    print 'Child rank: %d     size: %d' % (rank,size)
+
+    if rank >-1:
+        comm.Barrier()
+        number = comm.recv()
+
+        print 'Child %d recieved %d' % (rank, number)
+    else:
+        comm.Barrier()
+
+    comm.Disconnect()
+
+except Exception, err:
+    import traceback
+    tb = traceback.format_exc()
+    print tb
diff --git a/experimental/mpi/master.py b/experimental/mpi/master.py
new file mode 100644
index 0000000..4bacf2e
--- /dev/null
+++ b/experimental/mpi/master.py
@@ -0,0 +1,15 @@
+from mpi4py import MPI
+import sys
+
+comm = MPI.COMM_SELF.Spawn(sys.executable,
+                           args=['child.py'],
+                           maxprocs=8)
+
+print 'master rank %d' % comm.Get_rank()
+print 'master size %d' % comm.Get_size()
+for i in range(8):
+    print 'sending %d to process %d' % (i,i)
+    comm.send(i,i)
+
+comm.Barrier()
+comm.Disconnect()
diff --git a/ogs.py b/ogs.py
index e434884..92e9c98 100644
--- a/ogs.py
+++ b/ogs.py
@@ -9,6 +9,7 @@ import os
 from ws4py.server.cherrypyserver import WebSocketPlugin, WebSocketTool
 from websocket_chat import ChatRoot
 from websocket_pi import PiRoot
+from websocket_stream import StreamRoot
 
 current_dir = os.path.dirname(os.path.abspath(__file__))
 
@@ -35,6 +36,9 @@ class Root(object):
     # access at http://localhost:8080/pi
     pi = PiRoot(host='127.0.0.1', port=8080, ssl=False)
 
+    # access at http://localhost:8080/stream
+    stream = StreamRoot(host='127.0.0.1', port=8080, ssl=False)
+
     @cherrypy.expose
     def index(self):
         # Redirect user to the index
diff --git a/server.conf.in b/server.conf.in
index 1714d99..a2f2d11 100644
--- a/server.conf.in
+++ b/server.conf.in
@@ -18,4 +18,8 @@ tools.websocket.handler_cls: websocket_chat.ChatWebSocketHandler
 
 [/pi/ws]
 tools.websocket.on: True
-tools.websocket.handler_cls: websocket_pi.PiWebSocketHandler
\ No newline at end of file
+tools.websocket.handler_cls: websocket_pi.PiWebSocketHandler
+
+[/stream/ws]
+tools.websocket.on: True
+tools.websocket.handler_cls: websocket_stream.StreamWebSocketHandler
\ No newline at end of file
diff --git a/websocket_stream.py b/websocket_stream.py
new file mode 100644
index 0000000..dd838a8
--- /dev/null
+++ b/websocket_stream.py
@@ -0,0 +1,326 @@
+# -*- coding: utf-8 -*-
+import argparse
+import inspect
+import os
+import random
+import sys
+import time
+
+#third party libs
+import numpy, cherrypy, cdms2
+from array import array
+from mpi4py import MPI
+try:
+    import cPickle as pickle
+except:
+    import pickle
+
+from ws4py.server.cherrypyserver import WebSocketPlugin, WebSocketTool
+from ws4py.websocket import WebSocket
+from ws4py.messaging import TextMessage
+
+class StreamWebSocketHandler(WebSocket):
+
+    def received_message(self, m):
+        try:
+            if hasattr(self, 'processing') and self.processing:
+                return
+
+            self.processing = True
+            import ogs
+            thisdir = ogs.current_dir
+            exec_py = os.path.join(thisdir, 'cdmsProcess.py')
+            data_file = os.path.join(thisdir, 'data/clt.nc')
+
+            #read data file
+            f = cdms2.open(data_file)
+            u = f['u']
+
+            #split data into regions
+            #need some hueristics to determine size of bins
+            #just split to 10x10 for test
+            latticks = u.getLatitude().getValue()
+            latsize = len(latticks)
+            latstep = int(latsize / 10)
+
+            lonticks = u.getLongitude().getValue()
+            lonsize = len(lonticks)
+            lonstep = int(lonsize / 10)
+
+            #do the actual splitting using cdmsVar.subRegion
+            i=0
+            regions = []
+            while i < latsize:
+                i2 = i + latstep-1
+                if i2 >= latsize:
+                    i2 = latsize-1
+                j=0
+                while j < lonsize:
+                    j2 = j + lonstep-1
+                    if j2 >= lonsize:
+                        j2 = lonsize-1
+                    regions.append(
+                        u.subRegion(
+                            latitude=(latticks[i],latticks[i2]),
+                            longitude=(lonticks[j],lonticks[j2])))
+                    j = j2+1
+                i = i2 + 1
+
+            try:
+                numcpus = determineNumberOfCPUs()
+            except:
+                numcpus = 8
+
+            #spawn processes
+            comm = MPI.COMM_SELF.Spawn(sys.executable,
+                                       args=[exec_py],
+                                       maxprocs=numcpus)
+            self.comm = comm
+
+            #set up communication objects
+            results = {}
+            requests = {}
+            sizes = {}
+            completed = {}
+            started = {}
+            newregions = {}
+            for k in range(8):
+                cherrypy.log(str(k))
+                comm.send(regions[k],k)
+
+            comm.Barrier()
+
+            Status = MPI.Status()
+
+            if False:
+                for kk in range(8):
+                    comm.Probe(kk, 0, Status)
+                    cherrypy.log("data size from %d: %d" % (kk, Status.Get_count(MPI.CHAR)))
+                    buf = array('c','\0'*Status.Get_count(MPI.CHAR))
+                    comm.Recv(buf,kk)
+                comm.Barrier()
+                comm.Disconnect()
+
+                return;
+
+            #as processes finish, send websocket back to client browser
+            completeCount = 0
+            while completeCount < 8:
+                #probe for messages that haven't been started recieving
+                for ii in range(8):
+                    if ii not in started:
+                        if comm.Iprobe(ii,0,Status):
+                            started[ii] = True
+                            datasize = Status.Get_count(MPI.CHAR)
+                            results[ii] = array('c', '\0'*datasize)
+                            cherrypy.log("Process %d sending" % ii)
+                            requests[ii] = comm.Irecv(results[ii],ii)
+                            self.send(TextMessage("Process %d sending" % ii))
+
+                #test recieving messages for copmleteness
+                for jj in range(8):
+                    if jj in started and jj not in completed:
+                        if(requests[jj].Test()):
+                            newregions[jj] = pickle.loads(results[jj].tostring())
+                            completed[jj] = True
+                            completeCount+=1
+
+                #let other threads have a chance to do something
+                time.sleep(0.1)
+
+            cherrypy.log("All messages completed")
+
+            comm.Barrier()
+            comm.Disconnect()
+            self.comm = None
+            self.processing = False
+
+        except Exception, err:
+            import traceback
+            tb = traceback.format_exc()
+            cherrypy.log(tb)
+            cherrypy.log(str(err))
+            try:
+                self.comm.Barrier()
+                self.comm.Disconnect()
+            except:
+                pass
+
+    def closed(self, code, reason="A client left the room without a proper explanation."):
+        if self.processing and self.comm:
+            self.comm.Abort()
+
+class StreamRoot(object):
+    def __init__(self, host, port, ssl=False):
+        self.host = host
+        self.port = port
+        self.scheme = 'wss' if ssl else 'ws'
+
+    @cherrypy.expose
+    def index(self):
+        return """<html>
+    <head>
+      <script type='application/javascript' src='https://ajax.googleapis.com/ajax/libs/jquery/1.8.3/jquery.min.js'></script>
+      <script type='application/javascript'>
+        $(document).ready(function() {
+
+          websocket = '%(scheme)s://%(host)s:%(port)s/stream/ws';
+          if (window.WebSocket) {
+            ws = new WebSocket(websocket);
+          }
+          else if (window.MozWebSocket) {
+            ws = MozWebSocket(websocket);
+          }
+          else {
+            console.log('WebSocket Not Supported');
+            return;
+          }
+
+          window.onbeforeunload = function(e) {
+            $('#chat').val($('#chat').val() + 'Bye bye...\\n');
+            ws.close(1000, '%(username)s left the room');
+
+            if(!e) e = window.event;
+            e.stopPropagation();
+            e.preventDefault();
+          };
+          ws.onmessage = function (evt) {
+             $('#chat').val($('#chat').val() + evt.data + '\\n');
+          };
+          ws.onopen = function() {
+             ws.onmessage({data: 'Connected to server'})
+             ws.onmessage({data: 'Send anything to start the process.'})
+          };
+          ws.onclose = function(evt) {
+             $('#chat').val($('#chat').val() + 'Connection closed by server: ' + evt.code + ' \"' + evt.reason + '\"\\n');
+          };
+
+          $('#send').click(function() {
+             var val = $('#message').val();
+             ws.onmessage({data: 'Starting Process.'});
+             ws.send(val);
+             $('#message').val("");
+             return false;
+          });
+        });
+      </script>
+    </head>
+    <body>
+    <form action='#' id='chatform' method='get'>
+      <textarea id='chat' cols='35' rows='10'></textarea>
+      <br />
+      <label for='message'>Iterations: </label><input type='text' id='message' />
+      <input id='send' type='submit' value='Send' />
+      </form>
+    </body>
+    </html>
+    """ % {'username': "User%d" % random.randint(0, 100), 'host': self.host, 'port': self.port, 'scheme': self.scheme}
+
+    @cherrypy.expose
+    def ws(self):
+        cherrypy.log("Handler created: %s" % repr(cherrypy.request.ws_handler))
+
+##
+# From http://stackoverflow.com/a/1006301/1114724
+import os,re,subprocess
+def  determineNumberOfCPUs():
+    """ Number of virtual or physical CPUs on this system, i.e.
+    user/real as output by time(1) when called with an optimally scaling
+    userspace-only program"""
+
+    # Python 2.6+
+    try:
+        import multiprocessing
+        return multiprocessing.cpu_count()
+    except (ImportError,NotImplementedError):
+        pass
+
+    # http://code.google.com/p/psutil/
+    try:
+        import psutil
+        return psutil.NUM_CPUS
+    except (ImportError, AttributeError):
+        pass
+
+    # POSIX
+    try:
+        res = int(os.sysconf('SC_NPROCESSORS_ONLN'))
+
+        if res > 0:
+            return res
+    except (AttributeError,ValueError):
+        pass
+
+    # Windows
+    try:
+        res = int(os.environ['NUMBER_OF_PROCESSORS'])
+
+        if res > 0:
+            return res
+    except (KeyError, ValueError):
+        pass
+
+    # jython
+    try:
+        from java.lang import Runtime
+        runtime = Runtime.getRuntime()
+        res = runtime.availableProcessors()
+        if res > 0:
+            return res
+    except ImportError:
+        pass
+
+    # BSD
+    try:
+        sysctl = subprocess.Popen(['sysctl', '-n', 'hw.ncpu'],
+                                      stdout=subprocess.PIPE)
+        scStdout = sysctl.communicate()[0]
+        res = int(scStdout)
+
+        if res > 0:
+            return res
+    except (OSError, ValueError):
+        pass
+
+    # Linux
+    try:
+        res = open('/proc/cpuinfo').read().count('processor\t:')
+
+        if res > 0:
+            return res
+    except IOError:
+        pass
+
+    # Solaris
+    try:
+        pseudoDevices = os.listdir('/devices/pseudo/')
+        expr = re.compile('^cpuid@[0-9]+$')
+
+        res = 0
+        for pd in pseudoDevices:
+            if expr.match(pd) != None:
+                res += 1
+
+        if res > 0:
+            return res
+    except OSError:
+        pass
+
+    # Other UNIXes (heuristic)
+    try:
+        try:
+            dmesg = open('/var/run/dmesg.boot').read()
+        except IOError:
+            dmesgProcess = subprocess.Popen(['dmesg'], stdout=subprocess.PIPE)
+            dmesg = dmesgProcess.communicate()[0]
+
+        res = 0
+        while '\ncpu' + str(res) + ':' in dmesg:
+            res += 1
+
+        if res > 0:
+            return res
+    except OSError:
+        pass
+
+    raise Exception('Can not determine number of CPUs on this system')

-----------------------------------------------------------------------


hooks/post-receive
-- 
OpenGeoscience



More information about the Opengeoscience-developers mailing list