[Opengeoscience-developers] OpenGeoscience branch, test-mpi-stream, created. 38ffba64e072f76e0ac712c1fa1455696c4e4d11
Ben Burnett
bburnett at poly.edu
Tue Mar 5 16:32:13 EST 2013
This is an automated email from the git hooks/post-receive script. It was
generated because a ref change was pushed to the repository containing
the project "OpenGeoscience".
The branch, test-mpi-stream has been created
at 38ffba64e072f76e0ac712c1fa1455696c4e4d11 (commit)
- Log -----------------------------------------------------------------
http://public.kitware.com/gitweb?p=OpenGeoscience/opengeoscience.git;a=commitdiff;h=38ffba64e072f76e0ac712c1fa1455696c4e4d11
commit 38ffba64e072f76e0ac712c1fa1455696c4e4d11
Author: Ben Burnett <bburnett at poly.edu>
AuthorDate: Tue Mar 5 16:28:11 2013 -0500
Commit: Ben Burnett <bburnett at poly.edu>
CommitDate: Tue Mar 5 16:28:11 2013 -0500
Initial commit testing data streaming
It now opens a cdms file, splits it into 100 subregions,
sends some regions to different processes, which then send
it back.
TODO:
-set up logic to process all regions asynchronously
-perform some operations on the data in each process
-send data to browser
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..90746ea
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,2 @@
+#temporary emacs files
+*~
\ No newline at end of file
diff --git a/CMakeLists.txt b/CMakeLists.txt
index abbf47d..6ab4a13 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -110,6 +110,8 @@ set(SOURCES
ogs.py
websocket_chat.py
websocket_pi.py
+ websocket_stream.py
+ cdmsProcess.py
cpi.py
web/index.html
web/lib/app.js
diff --git a/cdmsProcess.py b/cdmsProcess.py
new file mode 100644
index 0000000..fbeb0ac
--- /dev/null
+++ b/cdmsProcess.py
@@ -0,0 +1,35 @@
+from array import array
+from mpi4py import MPI
+try:
+ import cPickle as pickle
+except:
+ import pickle
+
+try:
+ comm = MPI.Comm.Get_parent()
+ rank = comm.Get_rank()
+ size = comm.Get_size()
+
+ print 'Child rank: %d size: %d' % (rank,size)
+
+ comm.Barrier()
+ region = comm.recv()
+
+ print 'Child %d recieved %s' % (rank, region.typecode())
+
+ data = array('c',pickle.dumps(region))
+ comm.Send(data)
+
+ print 'sent data to master'
+
+ comm.Barrier()
+ comm.Disconnect()
+
+except Exception, err:
+ import traceback
+ tb = traceback.format_exc()
+ print tb
+ try:
+ comm.Disconnect()
+ except:
+ pass
diff --git a/experimental/mpi/child.py b/experimental/mpi/child.py
new file mode 100644
index 0000000..10efceb
--- /dev/null
+++ b/experimental/mpi/child.py
@@ -0,0 +1,23 @@
+from mpi4py import MPI
+
+try:
+ comm = MPI.Comm.Get_parent()
+ rank = comm.Get_rank()
+ size = comm.Get_size()
+
+ print 'Child rank: %d size: %d' % (rank,size)
+
+ if rank >-1:
+ comm.Barrier()
+ number = comm.recv()
+
+ print 'Child %d recieved %d' % (rank, number)
+ else:
+ comm.Barrier()
+
+ comm.Disconnect()
+
+except Exception, err:
+ import traceback
+ tb = traceback.format_exc()
+ print tb
diff --git a/experimental/mpi/master.py b/experimental/mpi/master.py
new file mode 100644
index 0000000..4bacf2e
--- /dev/null
+++ b/experimental/mpi/master.py
@@ -0,0 +1,15 @@
+from mpi4py import MPI
+import sys
+
+comm = MPI.COMM_SELF.Spawn(sys.executable,
+ args=['child.py'],
+ maxprocs=8)
+
+print 'master rank %d' % comm.Get_rank()
+print 'master size %d' % comm.Get_size()
+for i in range(8):
+ print 'sending %d to process %d' % (i,i)
+ comm.send(i,i)
+
+comm.Barrier()
+comm.Disconnect()
diff --git a/ogs.py b/ogs.py
index e434884..92e9c98 100644
--- a/ogs.py
+++ b/ogs.py
@@ -9,6 +9,7 @@ import os
from ws4py.server.cherrypyserver import WebSocketPlugin, WebSocketTool
from websocket_chat import ChatRoot
from websocket_pi import PiRoot
+from websocket_stream import StreamRoot
current_dir = os.path.dirname(os.path.abspath(__file__))
@@ -35,6 +36,9 @@ class Root(object):
# access at http://localhost:8080/pi
pi = PiRoot(host='127.0.0.1', port=8080, ssl=False)
+ # access at http://localhost:8080/stream
+ stream = StreamRoot(host='127.0.0.1', port=8080, ssl=False)
+
@cherrypy.expose
def index(self):
# Redirect user to the index
diff --git a/server.conf.in b/server.conf.in
index 1714d99..a2f2d11 100644
--- a/server.conf.in
+++ b/server.conf.in
@@ -18,4 +18,8 @@ tools.websocket.handler_cls: websocket_chat.ChatWebSocketHandler
[/pi/ws]
tools.websocket.on: True
-tools.websocket.handler_cls: websocket_pi.PiWebSocketHandler
\ No newline at end of file
+tools.websocket.handler_cls: websocket_pi.PiWebSocketHandler
+
+[/stream/ws]
+tools.websocket.on: True
+tools.websocket.handler_cls: websocket_stream.StreamWebSocketHandler
\ No newline at end of file
diff --git a/websocket_stream.py b/websocket_stream.py
new file mode 100644
index 0000000..dd838a8
--- /dev/null
+++ b/websocket_stream.py
@@ -0,0 +1,326 @@
+# -*- coding: utf-8 -*-
+import argparse
+import inspect
+import os
+import random
+import sys
+import time
+
+#third party libs
+import numpy, cherrypy, cdms2
+from array import array
+from mpi4py import MPI
+try:
+ import cPickle as pickle
+except:
+ import pickle
+
+from ws4py.server.cherrypyserver import WebSocketPlugin, WebSocketTool
+from ws4py.websocket import WebSocket
+from ws4py.messaging import TextMessage
+
+class StreamWebSocketHandler(WebSocket):
+
+ def received_message(self, m):
+ try:
+ if hasattr(self, 'processing') and self.processing:
+ return
+
+ self.processing = True
+ import ogs
+ thisdir = ogs.current_dir
+ exec_py = os.path.join(thisdir, 'cdmsProcess.py')
+ data_file = os.path.join(thisdir, 'data/clt.nc')
+
+ #read data file
+ f = cdms2.open(data_file)
+ u = f['u']
+
+ #split data into regions
+ #need some hueristics to determine size of bins
+ #just split to 10x10 for test
+ latticks = u.getLatitude().getValue()
+ latsize = len(latticks)
+ latstep = int(latsize / 10)
+
+ lonticks = u.getLongitude().getValue()
+ lonsize = len(lonticks)
+ lonstep = int(lonsize / 10)
+
+ #do the actual splitting using cdmsVar.subRegion
+ i=0
+ regions = []
+ while i < latsize:
+ i2 = i + latstep-1
+ if i2 >= latsize:
+ i2 = latsize-1
+ j=0
+ while j < lonsize:
+ j2 = j + lonstep-1
+ if j2 >= lonsize:
+ j2 = lonsize-1
+ regions.append(
+ u.subRegion(
+ latitude=(latticks[i],latticks[i2]),
+ longitude=(lonticks[j],lonticks[j2])))
+ j = j2+1
+ i = i2 + 1
+
+ try:
+ numcpus = determineNumberOfCPUs()
+ except:
+ numcpus = 8
+
+ #spawn processes
+ comm = MPI.COMM_SELF.Spawn(sys.executable,
+ args=[exec_py],
+ maxprocs=numcpus)
+ self.comm = comm
+
+ #set up communication objects
+ results = {}
+ requests = {}
+ sizes = {}
+ completed = {}
+ started = {}
+ newregions = {}
+ for k in range(8):
+ cherrypy.log(str(k))
+ comm.send(regions[k],k)
+
+ comm.Barrier()
+
+ Status = MPI.Status()
+
+ if False:
+ for kk in range(8):
+ comm.Probe(kk, 0, Status)
+ cherrypy.log("data size from %d: %d" % (kk, Status.Get_count(MPI.CHAR)))
+ buf = array('c','\0'*Status.Get_count(MPI.CHAR))
+ comm.Recv(buf,kk)
+ comm.Barrier()
+ comm.Disconnect()
+
+ return;
+
+ #as processes finish, send websocket back to client browser
+ completeCount = 0
+ while completeCount < 8:
+ #probe for messages that haven't been started recieving
+ for ii in range(8):
+ if ii not in started:
+ if comm.Iprobe(ii,0,Status):
+ started[ii] = True
+ datasize = Status.Get_count(MPI.CHAR)
+ results[ii] = array('c', '\0'*datasize)
+ cherrypy.log("Process %d sending" % ii)
+ requests[ii] = comm.Irecv(results[ii],ii)
+ self.send(TextMessage("Process %d sending" % ii))
+
+ #test recieving messages for copmleteness
+ for jj in range(8):
+ if jj in started and jj not in completed:
+ if(requests[jj].Test()):
+ newregions[jj] = pickle.loads(results[jj].tostring())
+ completed[jj] = True
+ completeCount+=1
+
+ #let other threads have a chance to do something
+ time.sleep(0.1)
+
+ cherrypy.log("All messages completed")
+
+ comm.Barrier()
+ comm.Disconnect()
+ self.comm = None
+ self.processing = False
+
+ except Exception, err:
+ import traceback
+ tb = traceback.format_exc()
+ cherrypy.log(tb)
+ cherrypy.log(str(err))
+ try:
+ self.comm.Barrier()
+ self.comm.Disconnect()
+ except:
+ pass
+
+ def closed(self, code, reason="A client left the room without a proper explanation."):
+ if self.processing and self.comm:
+ self.comm.Abort()
+
+class StreamRoot(object):
+ def __init__(self, host, port, ssl=False):
+ self.host = host
+ self.port = port
+ self.scheme = 'wss' if ssl else 'ws'
+
+ @cherrypy.expose
+ def index(self):
+ return """<html>
+ <head>
+ <script type='application/javascript' src='https://ajax.googleapis.com/ajax/libs/jquery/1.8.3/jquery.min.js'></script>
+ <script type='application/javascript'>
+ $(document).ready(function() {
+
+ websocket = '%(scheme)s://%(host)s:%(port)s/stream/ws';
+ if (window.WebSocket) {
+ ws = new WebSocket(websocket);
+ }
+ else if (window.MozWebSocket) {
+ ws = MozWebSocket(websocket);
+ }
+ else {
+ console.log('WebSocket Not Supported');
+ return;
+ }
+
+ window.onbeforeunload = function(e) {
+ $('#chat').val($('#chat').val() + 'Bye bye...\\n');
+ ws.close(1000, '%(username)s left the room');
+
+ if(!e) e = window.event;
+ e.stopPropagation();
+ e.preventDefault();
+ };
+ ws.onmessage = function (evt) {
+ $('#chat').val($('#chat').val() + evt.data + '\\n');
+ };
+ ws.onopen = function() {
+ ws.onmessage({data: 'Connected to server'})
+ ws.onmessage({data: 'Send anything to start the process.'})
+ };
+ ws.onclose = function(evt) {
+ $('#chat').val($('#chat').val() + 'Connection closed by server: ' + evt.code + ' \"' + evt.reason + '\"\\n');
+ };
+
+ $('#send').click(function() {
+ var val = $('#message').val();
+ ws.onmessage({data: 'Starting Process.'});
+ ws.send(val);
+ $('#message').val("");
+ return false;
+ });
+ });
+ </script>
+ </head>
+ <body>
+ <form action='#' id='chatform' method='get'>
+ <textarea id='chat' cols='35' rows='10'></textarea>
+ <br />
+ <label for='message'>Iterations: </label><input type='text' id='message' />
+ <input id='send' type='submit' value='Send' />
+ </form>
+ </body>
+ </html>
+ """ % {'username': "User%d" % random.randint(0, 100), 'host': self.host, 'port': self.port, 'scheme': self.scheme}
+
+ @cherrypy.expose
+ def ws(self):
+ cherrypy.log("Handler created: %s" % repr(cherrypy.request.ws_handler))
+
+##
+# From http://stackoverflow.com/a/1006301/1114724
+import os,re,subprocess
+def determineNumberOfCPUs():
+ """ Number of virtual or physical CPUs on this system, i.e.
+ user/real as output by time(1) when called with an optimally scaling
+ userspace-only program"""
+
+ # Python 2.6+
+ try:
+ import multiprocessing
+ return multiprocessing.cpu_count()
+ except (ImportError,NotImplementedError):
+ pass
+
+ # http://code.google.com/p/psutil/
+ try:
+ import psutil
+ return psutil.NUM_CPUS
+ except (ImportError, AttributeError):
+ pass
+
+ # POSIX
+ try:
+ res = int(os.sysconf('SC_NPROCESSORS_ONLN'))
+
+ if res > 0:
+ return res
+ except (AttributeError,ValueError):
+ pass
+
+ # Windows
+ try:
+ res = int(os.environ['NUMBER_OF_PROCESSORS'])
+
+ if res > 0:
+ return res
+ except (KeyError, ValueError):
+ pass
+
+ # jython
+ try:
+ from java.lang import Runtime
+ runtime = Runtime.getRuntime()
+ res = runtime.availableProcessors()
+ if res > 0:
+ return res
+ except ImportError:
+ pass
+
+ # BSD
+ try:
+ sysctl = subprocess.Popen(['sysctl', '-n', 'hw.ncpu'],
+ stdout=subprocess.PIPE)
+ scStdout = sysctl.communicate()[0]
+ res = int(scStdout)
+
+ if res > 0:
+ return res
+ except (OSError, ValueError):
+ pass
+
+ # Linux
+ try:
+ res = open('/proc/cpuinfo').read().count('processor\t:')
+
+ if res > 0:
+ return res
+ except IOError:
+ pass
+
+ # Solaris
+ try:
+ pseudoDevices = os.listdir('/devices/pseudo/')
+ expr = re.compile('^cpuid@[0-9]+$')
+
+ res = 0
+ for pd in pseudoDevices:
+ if expr.match(pd) != None:
+ res += 1
+
+ if res > 0:
+ return res
+ except OSError:
+ pass
+
+ # Other UNIXes (heuristic)
+ try:
+ try:
+ dmesg = open('/var/run/dmesg.boot').read()
+ except IOError:
+ dmesgProcess = subprocess.Popen(['dmesg'], stdout=subprocess.PIPE)
+ dmesg = dmesgProcess.communicate()[0]
+
+ res = 0
+ while '\ncpu' + str(res) + ':' in dmesg:
+ res += 1
+
+ if res > 0:
+ return res
+ except OSError:
+ pass
+
+ raise Exception('Can not determine number of CPUs on this system')
-----------------------------------------------------------------------
hooks/post-receive
--
OpenGeoscience
More information about the Opengeoscience-developers
mailing list