Here's a simple distributed computing example, which uses Pyro.
The client is listed directly below, with the dispatcher and worker scripts following.
#!/usr/bin/env python
"""
Simple wxPython Distributed Computing Example
---------------------------------------------
**Command Line Usage**
wxsimple_pyro [options]
**Options**
-h, --help Print this help and exit.
-c, --cmd Run from the command-line (no gui).
-t <numtasks> Process t tasks.
This simple example uses a wx.App to control and monitor a pool of workers
instructed to carry out a list of tasks.
The program creates the GUI plus a list of tasks, then connects to a a pool of
workers through a dispatcher. Within the GUI, the user can start and stop
processing the tasks at any time.
Copyright (c) 2010, Roger Stuckey. All rights reserved.
"""
import getopt, math, random, sys, time, types, wx
import Pyro.core
from wxsimple_mp import TaskProcessor, TaskServerMP, MyFrame, MyApp
class EmptyProcess:
"""
This class provides a process-like object to hold each worker's PID.
"""
def __init__(self, pid):
"""
Initialise the EmptyProcess.
"""
self.pid = pid
def is_alive(self):
"""
Mimic a negative response from the process.
"""
return False
class TaskServerPyro(TaskServerMP):
"""
The TaskServerMP class provides a target worker class method for queued processes.
"""
def __init__(self, tasks=[ ]):
"""
Initialise the TaskServerMP and create the dispatcher.
"""
TaskServerMP.__init__(self, numprocesses=0, tasks=tasks)
# Create the dispatcher
self.dispatcher = Pyro.core.getProxyForURI("PYRONAME://:wxSimple.dispatcher")
self.numprocesses = self.dispatcher.getNumWorkers()
self.PIDs = self.dispatcher.getWorkerPIDs()
# The empty processes are instantiated here
for n in range(self.numprocesses):
process = EmptyProcess(self.PIDs[n])
self.Processes.append(process)
if __name__ == '__main__':
try:
(optlist, args) = getopt.getopt(sys.argv[1:], 'hct:', ['help', 'cmd', 'numtasks='])
except getopt.GetoptError, msg:
sys.stderr.write("wxsimple_pyro: Error: %s" % msg)
sys.stderr.write("See 'wxsimple_pyro --help'.\n")
sys.exit(2)
else:
cmdline = False
numproc = None
numtasks = 20
for (opt, optarg) in optlist:
if opt in ('-h', '--help'):
sys.stdout.write(__doc__)
sys.exit(0)
elif opt in ('-c', '--cmd'):
cmdline = True
elif opt in ('-t', '--numtasks'):
numtasks = int(optarg)
# Create the task list
Tasks = [ (int(1e6), random.randint(0, 45)) for i in range(numtasks) ]
# The pyro client is started here
tsmp = TaskServerPyro(tasks=Tasks)
if (cmdline):
tsmp.run()
else:
app = MyApp(redirect=True, filename='wxsimple_pyro.stderr.log', taskserver=tsmp)
app.MainLoop()
This is the dispatcher script:
#!/usr/bin/env python
#
# dispatcher
import Pyro.core
import Pyro.naming
from Pyro.errors import NamingError
from Queue import Queue
class DispatcherQueue(Pyro.core.ObjBase):
"""
The Dispatcher class manages the task and result queues.
"""
def __init__(self):
"""
Initialise the Dispatcher.
"""
Pyro.core.ObjBase.__init__(self)
self.taskQueue = Queue()
self.resultQueue = Queue()
self.WorkerPIDs = [ ]
self.numWorkers = 0
def putTask(self, task):
"""
Put a task on the task queue.
"""
self.taskQueue.put(task)
def getTask(self, timeout=5):
"""
Get a task from the task queue.
"""
return self.taskQueue.get(block=True, timeout=timeout)
def putResult(self, output):
"""
Put a result on the result queue.
"""
self.resultQueue.put(output)
def getResult(self, timeout=5):
"""
Get a result from the result queue.
"""
return self.resultQueue.get(block=True, timeout=timeout)
def taskQueueSize(self):
"""
Return the task queue size.
"""
return self.taskQueue.qsize()
def resultQueueSize(self):
"""
Return the result queue size.
"""
return self.resultQueue.qsize()
def addWorker(self, pid):
"""
Add a worker's PID to the list.
"""
self.WorkerPIDs.append(pid)
self.numWorkers += 1
def getWorkerPIDs(self):
"""
Return the list of worker PID's.
"""
return self.WorkerPIDs
def getNumWorkers(self):
"""
Return the number of workers.
"""
return self.numWorkers
# Main program
Pyro.core.initServer()
ns = Pyro.naming.NameServerLocator().getNS()
daemon = Pyro.core.Daemon()
daemon.useNameServer(ns)
try:
ns.createGroup(":wxSimple")
except NamingError:
pass
try:
ns.unregister(":wxSimple.dispatcher")
except NamingError:
pass
uri = daemon.connect(DispatcherQueue(),":wxSimple.dispatcher")
print "Dispatcher is ready."
daemon.requestLoop()
And this is the worker script:
#!/usr/bin/env python
#
# worker
import Pyro.core
#import Pyro.errors
import Queue
import math, os, socket
from wxsimple_pyro import TaskProcessor
HOSTNAME = socket.gethostname()
HOSTPID = os.getpid()
Pyro.core.initClient()
def main():
dispatcher = Pyro.core.getProxyForURI("PYRONAME://:wxSimple.dispatcher")
dispatcher.addWorker(HOSTPID)
print "This is worker %d on %s (PID = %d)" % (dispatcher.getNumWorkers(), HOSTNAME, HOSTPID)
while True:
try:
args = dispatcher.getTask()
except Queue.Empty:
print "no work available yet."
else:
taskproc = TaskProcessor(args[0])
result = taskproc.calculate(args[1])
output = { 'process' : { 'name' : HOSTNAME, 'pid' : HOSTPID }, 'result' : result }
# Put the result on the output queue
dispatcher.putResult(output)
if __name__=="__main__":
main()