Date Tags Python

Here's a simple wxPython multiprocessing example. The actual calculation is trivial - the hard bit is making sure all queues are emptied and processes terminated properly upon exit.

For an easier introduction, there's a much-simplified version in the wxPyWiki Cookbook.

There are a few key aspects to the design:

  • On Windows, it is important to protect the "entry point" of the program by using if __name__ == '__main__': as recommended in the official documentation.

  • The worker processes must be independent of any object instance (ie. defined as a class method or similar) and started prior to wx.App instantiation (ie. in the main).

  • The task queue should be given tasks only as required to prevent any residual tasks upon the cessation of processing.

#!/usr/bin/env python

"""
Simple wxPython Multiprocessing Example
---------------------------------------

**Command Line Usage**

    wxsimple_mp [options]

**Options**

    -h, --help      Print this help and exit.
    -c, --cmd       Run from the command-line (no gui).
    -n <numproc>    Use n (supplementary) processes. Zero means only a single
                    (main) process is used for task calculation.
    -t <numtasks>   Process t tasks.

This simple example uses a wx.App to control and monitor a pool of workers
instructed to carry out a list of tasks.

The program creates the GUI plus a list of tasks, then starts a pool of workers
(processes) implemented with a classmethod. Within the GUI, the user can start
and stop processing the tasks at any time.

Copyright (c) 2010, Roger Stuckey. All rights reserved.
"""

import getopt, math, random, sys, time, types, wx

from multiprocessing import Process, Queue, cpu_count, current_process, freeze_support


class TaskProcessor:
    """
    The TaskProcessor class provides the functions necessary to process each task.
    """
    def __init__(self, numcalcs):
        """
        Initialise the TaskProcessor.
        """
        self.numcalcs = numcalcs

    def calculate(self, angle_deg):
        """
        Calculate the result of a task.
        """
        result = 0
        for i in range(self.numcalcs):
            angle_rad = math.radians(angle_deg)
            result += math.tanh(angle_rad)/math.cosh(angle_rad)/self.numcalcs
        return ( angle_deg, result )


class Dispatcher:
    """
    The Dispatcher class manages the task and result queues.
    """
    def __init__(self):
        """
        Initialise the Dispatcher.
        """
        self.taskQueue = Queue()
        self.resultQueue = Queue()

    def putTask(self, task):
        """
        Put a task on the task queue.
        """
        self.taskQueue.put(task)

    def getTask(self):
        """
        Get a task from the task queue.
        """
        return self.taskQueue.get()

    def putResult(self, output):
        """
        Put a result on the result queue.
        """
        self.resultQueue.put(output)

    def getResult(self):
        """
        Get a result from the result queue.
        """
        return self.resultQueue.get()


class TaskServerMP:
    """
    The TaskServerMP class provides a target worker class method for queued processes.
    """
    def __init__(self, numprocesses=1, tasks=[ ]):
        """
        Initialise the TaskServerMP and create the dispatcher and processes.
        """
        self.numprocesses = numprocesses
        self.Tasks = tasks
        self.numtasks = len(tasks)

        # Create the dispatcher
        self.dispatcher = Dispatcher()

        self.Processes = [ ]

        # The worker processes must be started here!
        for n in range(numprocesses):
            process = Process(target=TaskServerMP.worker, args=(self.dispatcher,))
            process.start()
            self.Processes.append(process)

        self.timeStart = 0.0
        self.timeElapsed = 0.0
        self.timeRemain = 0.0
        self.processTime = { }

        # Set some program flags
        self.keepgoing = True
        self.i = 0
        self.j = 0

    def run(self):
        """
        Run the TaskServerMP - start, stop & terminate processes.
        """
        sys.stdout.write('Number of processes = %d' % self.numprocesses)
        if (self.numprocesses == 0):
            sys.stdout.write(' (no extra processes)')
        sys.stdout.write('\nUnordered results...\n')
        self.processTasks(self.update)
        if (self.keepgoing):
            sys.stdout.write('Time elapsed: %s\n' % time.strftime('%M:%S', time.gmtime(self.timeElapsed)))
        if (self.numprocesses > 0):
            sys.stdout.write("Waiting for processes to terminate...")
            self.processTerm()

    def processTasks(self, resfunc=None):
        """
        Start the execution of tasks by the processes.
        """
        self.keepgoing = True

        self.timeStart = time.time()
        # Set the initial process time for each
        for n in range(self.numprocesses):
            pid_str = '%d' % self.Processes[n].pid
            self.processTime[pid_str] = 0.0

        # Submit first set of tasks
        if (self.numprocesses == 0):
            numprocstart = 1
        else:
            numprocstart = min(self.numprocesses, self.numtasks)
        for self.i in range(numprocstart):
            self.dispatcher.putTask(self.Tasks[self.i])

        self.j = -1
        self.i = numprocstart - 1
        while (self.j < self.i):
            # Get and print results
            output = self.getOutput()
            # Execute some function (Yield to a wx.Button event)
            if (isinstance(resfunc, (types.FunctionType, types.MethodType))):
                resfunc(output)
            if ((self.keepgoing) and (self.i + 1 < self.numtasks)):
                # Submit another task
                self.i += 1
                self.dispatcher.putTask(self.Tasks[self.i])

    def processStop(self, resfunc=None):
        """
        Stop the execution of tasks by the processes.
        """
        self.keepgoing = False

        while (self.j < self.i):
            # Get and print any results remining in the done queue
            output = self.getOutput()
            if (isinstance(resfunc, (types.FunctionType, types.MethodType))):
                resfunc(output)

    def processTerm(self):
        """
        Stop the execution of tasks by the processes.
        """
        for n in range(self.numprocesses):
            # Terminate any running processes
            self.Processes[n].terminate()

        # Wait for all processes to stop
        while (self.anyAlive()):
            time.sleep(0.5)

    def anyAlive(self):
        """
        Check if any processes are alive.
        """
        isalive = False
        for n in range(self.numprocesses):
            isalive = (isalive or self.Processes[n].is_alive())
        return isalive

    def getOutput(self):
        """
        Get the output from one completed task.
        """
        self.j += 1

        if (self.numprocesses == 0):
            # Use the single-process method
            self.worker_sp()

        output = self.dispatcher.getResult()
        # Calculate the time remaining
        self.timeRemaining(self.j + 1, self.numtasks, output['process']['pid'])

        return(output)

    def timeRemaining(self, tasknum, numtasks, pid):
        """
        Calculate the time remaining for the processes to complete N tasks.
        """
        timeNow = time.time()
        self.timeElapsed = timeNow - self.timeStart

        pid_str = '%d' % pid
        self.processTime[pid_str] = self.timeElapsed

        # Calculate the average time elapsed for all of the processes
        timeElapsedAvg = 0.0
        numprocesses = self.numprocesses
        if (numprocesses == 0): numprocesses = 1
        for pid_str in self.processTime.keys():
            timeElapsedAvg += self.processTime[pid_str]/numprocesses
        self.timeRemain = timeElapsedAvg*(float(numtasks)/float(tasknum) - 1.0)

    def update(self, output):
        """
        Get and print the results from one completed task.
        """
        sys.stdout.write('%s [%d] calculate(%d) = %.2f' % ( output['process']['name'], output['process']['pid'], output['result'][0], output['result'][1] ))
#        sys.stdout.write('  [Complete: %2d / %2d  Time Elapsed: %s  Remaining: %s]' % (self.j + 1, self.numtasks, time.strftime('%M:%S', time.gmtime(self.timeElapsed)), time.strftime('%M:%S', time.gmtime(self.timeRemain))))
        sys.stdout.write('\n')

    def worker(cls, dispatcher):
        """
        The worker creates a TaskProcessor object to calculate the result.
        """
        while True:
            args = dispatcher.getTask()
            taskproc = TaskProcessor(args[0])
            result = taskproc.calculate(args[1])
            output = { 'process' : { 'name' : current_process().name, 'pid' : current_process().pid }, 'result' : result }
            # Put the result on the output queue
            dispatcher.putResult(output)

    # The multiprocessing worker must not require any existing object for execution!
    worker = classmethod(worker)

    def worker_sp(self):
        """
        A single-process version of the worker method.
        """
        args = self.dispatcher.getTask()
        taskproc = TaskProcessor(args[0])
        result = taskproc.calculate(args[1])
        output = { 'process' : { 'name' : 'Process-0', 'pid' : 0 }, 'result' : result }
        # Put the result on the output queue
        self.dispatcher.putResult(output)


class MyFrame(wx.Frame):
    """
    A simple Frame class.
    """
    def __init__(self, parent, id, title, taskserver):
        """
        Initialise the Frame.
        """
        self.taskserver = taskserver

        wx.Frame.__init__(self, parent, id, title, wx.Point(700, 500), wx.Size(300, 200))

        # Create the panel, sizer and controls
        self.panel = wx.Panel(self, wx.ID_ANY)
        self.sizer = wx.GridBagSizer(5, 5)

        self.start_bt = wx.Button(self.panel, wx.ID_ANY, "Start")
        self.Bind(wx.EVT_BUTTON, self.OnStart, self.start_bt)
        self.start_bt.SetDefault()
        self.start_bt.SetToolTipString('Start the execution of tasks')
        self.start_bt.ToolTip.Enable(True)

        self.stop_bt = wx.Button(self.panel, wx.ID_ANY, "Stop")
        self.Bind(wx.EVT_BUTTON, self.OnStop, self.stop_bt)
        self.stop_bt.SetToolTipString('Stop the execution of tasks')
        self.stop_bt.ToolTip.Enable(True)

        self.output_tc = wx.TextCtrl(self.panel, wx.ID_ANY, style=wx.TE_MULTILINE|wx.TE_READONLY)

        self.prog_st = wx.StaticText(self.panel, wx.ID_ANY, 'Complete:')

        self.prog_gg = wx.Gauge(self.panel, id=wx.ID_ANY, range=self.taskserver.numtasks, size=(-1, 15))
        self.prog_gg.SetBezelFace(3)
        self.prog_gg.SetShadowWidth(3)

        # Add the controls to the sizer
        self.sizer.Add(self.start_bt, (0, 0), flag=wx.ALIGN_CENTER|wx.LEFT|wx.TOP, border=5)
        self.sizer.Add(self.stop_bt, (0, 1), flag=wx.ALIGN_CENTER|wx.TOP|wx.RIGHT, border=5)
        self.sizer.Add(self.output_tc, (1, 0), (1, 2), flag=wx.EXPAND|wx.LEFT|wx.RIGHT, border=5)
        self.sizer.Add(self.prog_st, (2, 0), (1, 2), flag=wx.LEFT|wx.RIGHT, border=5)
        self.sizer.Add(self.prog_gg, (3, 0), (1, 2), flag=wx.EXPAND|wx.LEFT|wx.RIGHT|wx.BOTTOM, border=5)
        self.sizer.AddGrowableCol(0)
        self.sizer.AddGrowableCol(1)
        self.sizer.AddGrowableRow(1)

        self.panel.SetSizer(self.sizer)

        self.Bind(wx.EVT_CLOSE, self.OnClose)

        self.output_tc.AppendText('Number of processes = %d' % self.taskserver.numprocesses)
        if (self.taskserver.numprocesses == 0):
            self.output_tc.AppendText(' (no extra processes)')
        self.output_tc.AppendText('\n')

    def OnStart(self, event):
        """
        Start the execution of tasks by the processes.
        """
        self.start_bt.Enable(False)
        self.stop_bt.Enable(True)
        self.output_tc.AppendText('Unordered results...\n')
        # Start processing tasks
        self.taskserver.processTasks(self.update)
        if (self.taskserver.keepgoing):
            self.output_tc.AppendText('Time elapsed: %s\n' % time.strftime('%M:%S', time.gmtime(self.taskserver.timeElapsed)))
            self.start_bt.Enable(True)
            self.stop_bt.Enable(False)

    def OnStop(self, event):
        """
        Stop the execution of tasks by the processes.
        """
        self.stop_bt.Enable(False)
        if (self.taskserver.j < self.taskserver.i):
            self.output_tc.AppendText('Completing queued tasks...\n')
        # Stop processing tasks
        self.taskserver.processStop(self.update)
        self.start_bt.Enable(True)

    def OnClose(self, event):
        """
        Stop the task queue, terminate processes and close the window.
        """
        self.OnStop(event)
        self.start_bt.Enable(False)
        if ((self.taskserver.numprocesses > 0) and self.taskserver.anyAlive()):
            busy = wx.BusyInfo("Waiting for processes to terminate...")
            # Terminate the processes
            self.taskserver.processTerm()
        self.Destroy()

    def update(self, output):
        """
        Get and print the results from one completed task.
        """
        self.output_tc.AppendText('%s [%d] calculate(%d) = %.2f\n' % ( output['process']['name'], output['process']['pid'], output['result'][0], output['result'][1] ))
        self.prog_st.SetLabel('Complete: %2d / %2d  Time Elapsed: %s  Remaining: %s' % (self.taskserver.j + 1, self.taskserver.numtasks, time.strftime('%M:%S', time.gmtime(self.taskserver.timeElapsed)), time.strftime('%M:%S', time.gmtime(self.taskserver.timeRemain))))
        self.prog_gg.SetValue(self.taskserver.j + 1)
        # Give the user an opportunity to interact
        wx.YieldIfNeeded()


class MyApp(wx.App):
    """
    A simple App class, modified to hold the processes and task queues.
    """
    def __init__(self, redirect=True, filename=None, useBestVisual=False, clearSigInt=True, taskserver=None):
        """
        Initialise the App.
        """
        self.taskserver = taskserver
        wx.App.__init__(self, redirect, filename, useBestVisual, clearSigInt)

    def OnInit(self):
        """
        Initialise the App with a Frame.
        """
        self.frame = MyFrame(None, -1, 'wxSimple_MP', self.taskserver)
        self.frame.Show(True)
        return True


if __name__ == '__main__':

    freeze_support()

    try:
        (optlist, args) = getopt.getopt(sys.argv[1:], 'hcn:t:', ['help', 'cmd', 'numproc=', 'numtasks='])
    except getopt.GetoptError, msg:
        sys.stderr.write("wxsimple_mp: Error: %s" % msg)
        sys.stderr.write("See 'wxsimple_mp --help'.\n")
        sys.exit(2)
    else:
        cmdline = False
        numproc = None
        numtasks = 20
        for (opt, optarg) in optlist:
            if opt in ('-h', '--help'):
                sys.stdout.write(__doc__)
                sys.exit(0)
            elif opt in ('-c', '--cmd'):
                cmdline = True
            elif opt in ('-n', '--numproc'):
                numproc = int(optarg)
            elif opt in ('-t', '--numtasks'):
                numtasks = int(optarg)

        if (numproc is None):
            # Determine the number of CPU's/cores
            try:
                numproc = min(cpu_count(), numtasks)
            except NotImplementedError:
                numproc = 1

        # Create the task list
        Tasks = [ (int(1e6), random.randint(0, 45)) for i in range(numtasks) ]

        # The worker processes must be started here!
        tsmp = TaskServerMP(numprocesses=numproc, tasks=Tasks)

        if (cmdline):
            tsmp.run()
        else:
            app = MyApp(redirect=True, filename='wxsimple_mp.stderr.log', taskserver=tsmp)
            app.MainLoop()