Sleigh

To this point we have more or less taken for granted that the computing components of the ensemble were just there. One could appeal to an external mechanism for launching and managing processes, but because this is so fundamental NWS includes its own, Sleigh. Sleigh also provides functionality to handle certain kinds of “embarrassingly parallel” master/worker-style problems directly, absolving the user from any direct engagement with NWS.

Preliminaries

  1. Have to have ssh access to worker nodes.
  2. Test:
     	for n in hippo newt python rhino; do ssh $n hostname; done
    
  3. Setup correct environment:
    #As usual:
    export PYTHONPATH=~/myInstalls/python/lib/python
    
    #Needed to find start up scripts:
    export PATH=~/myInstalls/python/bin:$PATH
    

Sleigh proper

Simplest scenario:
>>> from nws.sleigh import Sleigh, sshcmd
>>> s = Sleigh()
This creates three “generic” workers. Let's see where the processes are actually running:
>>> from socket import gethostname
>>> s.eachWorker(gethostname)
['newt', 'newt', 'newt']
eachWorker runs the specified function once on each worker process. We see, not surprisingly, that by default the three worker processes are started on the local machine (why three? (i) good test number; (ii) two CPUs with two cores are common on clusters and they or quad cores will soon be common on desktops).

Obviously, we would like to be able to use other machines too. So let's shut this down, and try something a little different.

>>> s.stop()
>>> s = Sleigh(nodeList=['hippo', 'newt', 'python', 'rhino'], launch=sshcmd)
>>> s.eachWorker(gethostname)
['rhino', 'hippo', 'newt', 'python']
To take a peek under the covers for more details, browse around with the web interface.

Note that eachWorker is not usually run to carry out a computation per se, so no effort is made to return the results in any particular order. E.g.:

>>> s.eachWorker(gethostname)
['python', 'hippo', 'rhino', 'newt']
We can use eachWorker to build up appropriate state and to start compute servers (a bit like a specialized Linda eval).

We can use a sleigh to carry out a master/worker-style computation by using eachElem. This method takes a function and a list, and returns a list of the results of applying the function to each element in the input list.

>>> r = s.eachElem(lambda x: x*x*x, range(100))
>>> len(r)
100
>>> r[2]
8
>>> r = s.eachElem(lambda x: x*x*x, range(100))
>>> len(r)
100
>>> r[2:5]
[8, 27, 64]
We see that the results are returned in order (and, for that matter, the tasks are evaluated in order too — write a little code that checks this). We won't go into the details here, but eachElem is capable of handling quite general functions with interleaved varying and fixed arguments.

This example illustrates how sleigh can be used to simplify running computations like that seen in the previous lecture:

# worker
def f(x): return x*x*x

ws = nws.client.NetWorkSpace('table test')
while 1: ws.store('r', f(ws.fetch('x')))

# master
ws = nws.client.NetWorkSpace('table test')
for x in range(10): ws.store('x', x)

for x in range(10): print 'f(%d) = %d'%(x, ws.fetch('r'))
Let's now look at an example with a more variable (synthetic) workload.
>>> from time import sleep
>>> from random import randint
>>> def nap(x):
...     t = randint(0, 5)
...     sleep(t)
...     return (x, t)
... 
>>> r = s.eachElem(nap, range(10))
>>> r
[SleighTaskException("Task invocation failed: global name 'randint' is not defined"), ... , SleighTaskException("Task invocation failed: global name 'randint' is not defined")]
That's not what we wanted.

The workers need to import added functionality just like the master (there are exception — we've already seen one: gethostname; since this was the function invoked, sleigh took care of the “bookkeeping” for us, but it doesn't do this recursively).

We need to have each worker import appropriate support functions:

>>> s.eachWorker('from time import sleep')
[None, None, None, None]
>>> s.eachWorker('from random import randint')
[None, None, None, None]
Two notes: Let's try it now:
>>> r = s.eachElem(nap, range(10))
>>> r
[(0, 2), (1, 5), (2, 0), (3, 0), (4, 0), (5, 3), (6, 3), (7, 0), (8, 4), (9, 4)]
That's better! But who wants to wait around?
>>> r = s.eachElem(nap, range(10), blocking=False)
>>> r.check()
7
>>> r.check()
6
>>> r.check()
4
>>> r.check()
1
>>> r.check()
0
>>> r = r.wait()
>>> r
[(0, 4), (1, 2), (2, 0), (3, 2), (4, 4), (5, 5), (6, 1), (7, 3), (8, 5), (9, 3)]
When run with blocking=False, the sleigh immediately returns a SleighPending object. Its check method returns the count of outstanding tasks. Its wait method returns the results (blocking if need be to wait for laggards).

Workers have available a unique identifier, their rank:

>>> s.eachWorker('SleighRank')
[1, 2, 3, 0]
>>> s.eachWorker('SleighRank*111')
[111, 333, 222, 0]
This can be used for a variety of purposes. It is a global variable and may be referenced in the code snippet or function of either a eachWorker or eachElem computation.

Use SleighRank with the example above to test out dynamic load balancing of eachElem computations.

Here's an example illustrating coordination between a non-blocking (or asynchronous) sleigh execution and the controlling python session that makes use of another global, SleighNws, the object encapsulating the workspace in which the sleigh runs:

>>> r = s.eachWorker('SleighNws.find("wake up!")', blocking=False)
>>> r.check()
4
>>> r.check()
4
>>> s.nws.store('wake up!', 123)
>>> r.check()
0
>>> r.wait()
[123, 123, 123, 123]

The combination of eachWorker or eachElem (with a one element vector) and blocking=False can be used to provide functionality very similar to Linda's eval.

Using eachWorker and SleighNws, write a new version of the cubing example from the previous lecture — one that makes use of the sleigh's workers to carry out the computation (do not simply copy the eachElem above: make explicit use of store and fetch, but use your sleigh's workspace and compute servers).