Date Tags Python

Here's a simple distributed computing example, which uses Pyro.

The client is listed directly below, with the dispatcher and worker scripts following.

#!/usr/bin/env python

"""
Simple wxPython Distributed Computing Example
---------------------------------------------

**Command Line Usage**

    wxsimple_pyro [options]

**Options**

    -h, --help      Print this help and exit.
    -c, --cmd       Run from the command-line (no gui).
    -t <numtasks>   Process t tasks.

This simple example uses a wx.App to control and monitor a pool of workers
instructed to carry out a list of tasks.

The program creates the GUI plus a list of tasks, then connects to a a pool of
workers through a dispatcher. Within the GUI, the user can start and stop
processing the tasks at any time.

Copyright (c) 2010, Roger Stuckey. All rights reserved.
"""

import getopt, math, random, sys, time, types, wx

import Pyro.core

from wxsimple_mp import TaskProcessor, TaskServerMP, MyFrame, MyApp


class EmptyProcess:
    """
    This class provides a process-like object to hold each worker's PID.
    """
    def __init__(self, pid):
        """
        Initialise the EmptyProcess.
        """
        self.pid = pid

    def is_alive(self):
        """
        Mimic a negative response from the process.
        """
        return False


class TaskServerPyro(TaskServerMP):
    """
    The TaskServerMP class provides a target worker class method for queued processes.
    """
    def __init__(self, tasks=[ ]):
        """
        Initialise the TaskServerMP and create the dispatcher.
        """
        TaskServerMP.__init__(self, numprocesses=0, tasks=tasks)

        # Create the dispatcher
        self.dispatcher = Pyro.core.getProxyForURI("PYRONAME://:wxSimple.dispatcher")

        self.numprocesses = self.dispatcher.getNumWorkers()
        self.PIDs = self.dispatcher.getWorkerPIDs()

        # The empty processes are instantiated here
        for n in range(self.numprocesses):
            process = EmptyProcess(self.PIDs[n])
            self.Processes.append(process)


if __name__ == '__main__':

    try:
        (optlist, args) = getopt.getopt(sys.argv[1:], 'hct:', ['help', 'cmd', 'numtasks='])
    except getopt.GetoptError, msg:
        sys.stderr.write("wxsimple_pyro: Error: %s" % msg)
        sys.stderr.write("See 'wxsimple_pyro --help'.\n")
        sys.exit(2)
    else:
        cmdline = False
        numproc = None
        numtasks = 20
        for (opt, optarg) in optlist:
            if opt in ('-h', '--help'):
                sys.stdout.write(__doc__)
                sys.exit(0)
            elif opt in ('-c', '--cmd'):
                cmdline = True
            elif opt in ('-t', '--numtasks'):
                numtasks = int(optarg)

        # Create the task list
        Tasks = [ (int(1e6), random.randint(0, 45)) for i in range(numtasks) ]

        # The pyro client is started here
        tsmp = TaskServerPyro(tasks=Tasks)

        if (cmdline):
            tsmp.run()
        else:
            app = MyApp(redirect=True, filename='wxsimple_pyro.stderr.log', taskserver=tsmp)
            app.MainLoop()

This is the dispatcher script:

#!/usr/bin/env python
#
# dispatcher

import Pyro.core
import Pyro.naming
from Pyro.errors import NamingError
from Queue import Queue


class DispatcherQueue(Pyro.core.ObjBase):
    """
    The Dispatcher class manages the task and result queues.
    """
    def __init__(self):
        """
        Initialise the Dispatcher.
        """
        Pyro.core.ObjBase.__init__(self)

        self.taskQueue = Queue()
        self.resultQueue = Queue()

        self.WorkerPIDs = [ ]
        self.numWorkers = 0

    def putTask(self, task):
        """
        Put a task on the task queue.
        """
        self.taskQueue.put(task)

    def getTask(self, timeout=5):
        """
        Get a task from the task queue.
        """
        return self.taskQueue.get(block=True, timeout=timeout)

    def putResult(self, output):
        """
        Put a result on the result queue.
        """
        self.resultQueue.put(output)

    def getResult(self, timeout=5):
        """
        Get a result from the result queue.
        """
        return self.resultQueue.get(block=True, timeout=timeout)

    def taskQueueSize(self):
        """
        Return the task queue size.
        """
        return self.taskQueue.qsize()

    def resultQueueSize(self):
        """
        Return the result queue size.
        """
        return self.resultQueue.qsize()

    def addWorker(self, pid):
        """
        Add a worker's PID to the list.
        """
        self.WorkerPIDs.append(pid)
        self.numWorkers += 1

    def getWorkerPIDs(self):
        """
        Return the list of worker PID's.
        """
        return self.WorkerPIDs

    def getNumWorkers(self):
        """
        Return the number of workers.
        """
        return self.numWorkers


# Main program

Pyro.core.initServer()
ns = Pyro.naming.NameServerLocator().getNS()
daemon = Pyro.core.Daemon()
daemon.useNameServer(ns)

try:
    ns.createGroup(":wxSimple")
except NamingError:
    pass
try:
    ns.unregister(":wxSimple.dispatcher")
except NamingError:
    pass
uri = daemon.connect(DispatcherQueue(),":wxSimple.dispatcher")

print "Dispatcher is ready."
daemon.requestLoop()

And this is the worker script:

#!/usr/bin/env python
#
# worker

import Pyro.core
#import Pyro.errors
import Queue
import math, os, socket
from wxsimple_pyro import TaskProcessor

HOSTNAME = socket.gethostname()
HOSTPID = os.getpid()

Pyro.core.initClient()


def main():

    dispatcher = Pyro.core.getProxyForURI("PYRONAME://:wxSimple.dispatcher")
    dispatcher.addWorker(HOSTPID)

    print "This is worker %d on %s (PID = %d)" % (dispatcher.getNumWorkers(), HOSTNAME, HOSTPID)

    while True:
        try:
            args = dispatcher.getTask()
        except Queue.Empty:
            print "no work available yet."
        else:
            taskproc = TaskProcessor(args[0])
            result = taskproc.calculate(args[1])
            output = { 'process' : { 'name' : HOSTNAME, 'pid' : HOSTPID }, 'result' : result }
            # Put the result on the output queue
            dispatcher.putResult(output)

if __name__=="__main__":
    main()