Here's a simple wxPython multiprocessing example. The actual calculation is trivial - the hard bit is making sure all queues are emptied and processes terminated properly upon exit.
For an easier introduction, there's a much-simplified version in the wxPyWiki Cookbook.
There are a few key aspects to the design:
-
On Windows, it is important to protect the "entry point" of the program by using
if __name__ == '__main__':
as recommended in the official documentation. -
The worker processes must be independent of any object instance (ie. defined as a class method or similar) and started prior to wx.App instantiation (ie. in the main).
-
The task queue should be given tasks only as required to prevent any residual tasks upon the cessation of processing.
#!/usr/bin/env python
"""
Simple wxPython Multiprocessing Example
---------------------------------------
**Command Line Usage**
wxsimple_mp [options]
**Options**
-h, --help Print this help and exit.
-c, --cmd Run from the command-line (no gui).
-n <numproc> Use n (supplementary) processes. Zero means only a single
(main) process is used for task calculation.
-t <numtasks> Process t tasks.
This simple example uses a wx.App to control and monitor a pool of workers
instructed to carry out a list of tasks.
The program creates the GUI plus a list of tasks, then starts a pool of workers
(processes) implemented with a classmethod. Within the GUI, the user can start
and stop processing the tasks at any time.
Copyright (c) 2010, Roger Stuckey. All rights reserved.
"""
import getopt, math, random, sys, time, types, wx
from multiprocessing import Process, Queue, cpu_count, current_process, freeze_support
class TaskProcessor:
"""
The TaskProcessor class provides the functions necessary to process each task.
"""
def __init__(self, numcalcs):
"""
Initialise the TaskProcessor.
"""
self.numcalcs = numcalcs
def calculate(self, angle_deg):
"""
Calculate the result of a task.
"""
result = 0
for i in range(self.numcalcs):
angle_rad = math.radians(angle_deg)
result += math.tanh(angle_rad)/math.cosh(angle_rad)/self.numcalcs
return ( angle_deg, result )
class Dispatcher:
"""
The Dispatcher class manages the task and result queues.
"""
def __init__(self):
"""
Initialise the Dispatcher.
"""
self.taskQueue = Queue()
self.resultQueue = Queue()
def putTask(self, task):
"""
Put a task on the task queue.
"""
self.taskQueue.put(task)
def getTask(self):
"""
Get a task from the task queue.
"""
return self.taskQueue.get()
def putResult(self, output):
"""
Put a result on the result queue.
"""
self.resultQueue.put(output)
def getResult(self):
"""
Get a result from the result queue.
"""
return self.resultQueue.get()
class TaskServerMP:
"""
The TaskServerMP class provides a target worker class method for queued processes.
"""
def __init__(self, numprocesses=1, tasks=[ ]):
"""
Initialise the TaskServerMP and create the dispatcher and processes.
"""
self.numprocesses = numprocesses
self.Tasks = tasks
self.numtasks = len(tasks)
# Create the dispatcher
self.dispatcher = Dispatcher()
self.Processes = [ ]
# The worker processes must be started here!
for n in range(numprocesses):
process = Process(target=TaskServerMP.worker, args=(self.dispatcher,))
process.start()
self.Processes.append(process)
self.timeStart = 0.0
self.timeElapsed = 0.0
self.timeRemain = 0.0
self.processTime = { }
# Set some program flags
self.keepgoing = True
self.i = 0
self.j = 0
def run(self):
"""
Run the TaskServerMP - start, stop & terminate processes.
"""
sys.stdout.write('Number of processes = %d' % self.numprocesses)
if (self.numprocesses == 0):
sys.stdout.write(' (no extra processes)')
sys.stdout.write('\nUnordered results...\n')
self.processTasks(self.update)
if (self.keepgoing):
sys.stdout.write('Time elapsed: %s\n' % time.strftime('%M:%S', time.gmtime(self.timeElapsed)))
if (self.numprocesses > 0):
sys.stdout.write("Waiting for processes to terminate...")
self.processTerm()
def processTasks(self, resfunc=None):
"""
Start the execution of tasks by the processes.
"""
self.keepgoing = True
self.timeStart = time.time()
# Set the initial process time for each
for n in range(self.numprocesses):
pid_str = '%d' % self.Processes[n].pid
self.processTime[pid_str] = 0.0
# Submit first set of tasks
if (self.numprocesses == 0):
numprocstart = 1
else:
numprocstart = min(self.numprocesses, self.numtasks)
for self.i in range(numprocstart):
self.dispatcher.putTask(self.Tasks[self.i])
self.j = -1
self.i = numprocstart - 1
while (self.j < self.i):
# Get and print results
output = self.getOutput()
# Execute some function (Yield to a wx.Button event)
if (isinstance(resfunc, (types.FunctionType, types.MethodType))):
resfunc(output)
if ((self.keepgoing) and (self.i + 1 < self.numtasks)):
# Submit another task
self.i += 1
self.dispatcher.putTask(self.Tasks[self.i])
def processStop(self, resfunc=None):
"""
Stop the execution of tasks by the processes.
"""
self.keepgoing = False
while (self.j < self.i):
# Get and print any results remining in the done queue
output = self.getOutput()
if (isinstance(resfunc, (types.FunctionType, types.MethodType))):
resfunc(output)
def processTerm(self):
"""
Stop the execution of tasks by the processes.
"""
for n in range(self.numprocesses):
# Terminate any running processes
self.Processes[n].terminate()
# Wait for all processes to stop
while (self.anyAlive()):
time.sleep(0.5)
def anyAlive(self):
"""
Check if any processes are alive.
"""
isalive = False
for n in range(self.numprocesses):
isalive = (isalive or self.Processes[n].is_alive())
return isalive
def getOutput(self):
"""
Get the output from one completed task.
"""
self.j += 1
if (self.numprocesses == 0):
# Use the single-process method
self.worker_sp()
output = self.dispatcher.getResult()
# Calculate the time remaining
self.timeRemaining(self.j + 1, self.numtasks, output['process']['pid'])
return(output)
def timeRemaining(self, tasknum, numtasks, pid):
"""
Calculate the time remaining for the processes to complete N tasks.
"""
timeNow = time.time()
self.timeElapsed = timeNow - self.timeStart
pid_str = '%d' % pid
self.processTime[pid_str] = self.timeElapsed
# Calculate the average time elapsed for all of the processes
timeElapsedAvg = 0.0
numprocesses = self.numprocesses
if (numprocesses == 0): numprocesses = 1
for pid_str in self.processTime.keys():
timeElapsedAvg += self.processTime[pid_str]/numprocesses
self.timeRemain = timeElapsedAvg*(float(numtasks)/float(tasknum) - 1.0)
def update(self, output):
"""
Get and print the results from one completed task.
"""
sys.stdout.write('%s [%d] calculate(%d) = %.2f' % ( output['process']['name'], output['process']['pid'], output['result'][0], output['result'][1] ))
# sys.stdout.write(' [Complete: %2d / %2d Time Elapsed: %s Remaining: %s]' % (self.j + 1, self.numtasks, time.strftime('%M:%S', time.gmtime(self.timeElapsed)), time.strftime('%M:%S', time.gmtime(self.timeRemain))))
sys.stdout.write('\n')
def worker(cls, dispatcher):
"""
The worker creates a TaskProcessor object to calculate the result.
"""
while True:
args = dispatcher.getTask()
taskproc = TaskProcessor(args[0])
result = taskproc.calculate(args[1])
output = { 'process' : { 'name' : current_process().name, 'pid' : current_process().pid }, 'result' : result }
# Put the result on the output queue
dispatcher.putResult(output)
# The multiprocessing worker must not require any existing object for execution!
worker = classmethod(worker)
def worker_sp(self):
"""
A single-process version of the worker method.
"""
args = self.dispatcher.getTask()
taskproc = TaskProcessor(args[0])
result = taskproc.calculate(args[1])
output = { 'process' : { 'name' : 'Process-0', 'pid' : 0 }, 'result' : result }
# Put the result on the output queue
self.dispatcher.putResult(output)
class MyFrame(wx.Frame):
"""
A simple Frame class.
"""
def __init__(self, parent, id, title, taskserver):
"""
Initialise the Frame.
"""
self.taskserver = taskserver
wx.Frame.__init__(self, parent, id, title, wx.Point(700, 500), wx.Size(300, 200))
# Create the panel, sizer and controls
self.panel = wx.Panel(self, wx.ID_ANY)
self.sizer = wx.GridBagSizer(5, 5)
self.start_bt = wx.Button(self.panel, wx.ID_ANY, "Start")
self.Bind(wx.EVT_BUTTON, self.OnStart, self.start_bt)
self.start_bt.SetDefault()
self.start_bt.SetToolTipString('Start the execution of tasks')
self.start_bt.ToolTip.Enable(True)
self.stop_bt = wx.Button(self.panel, wx.ID_ANY, "Stop")
self.Bind(wx.EVT_BUTTON, self.OnStop, self.stop_bt)
self.stop_bt.SetToolTipString('Stop the execution of tasks')
self.stop_bt.ToolTip.Enable(True)
self.output_tc = wx.TextCtrl(self.panel, wx.ID_ANY, style=wx.TE_MULTILINE|wx.TE_READONLY)
self.prog_st = wx.StaticText(self.panel, wx.ID_ANY, 'Complete:')
self.prog_gg = wx.Gauge(self.panel, id=wx.ID_ANY, range=self.taskserver.numtasks, size=(-1, 15))
self.prog_gg.SetBezelFace(3)
self.prog_gg.SetShadowWidth(3)
# Add the controls to the sizer
self.sizer.Add(self.start_bt, (0, 0), flag=wx.ALIGN_CENTER|wx.LEFT|wx.TOP, border=5)
self.sizer.Add(self.stop_bt, (0, 1), flag=wx.ALIGN_CENTER|wx.TOP|wx.RIGHT, border=5)
self.sizer.Add(self.output_tc, (1, 0), (1, 2), flag=wx.EXPAND|wx.LEFT|wx.RIGHT, border=5)
self.sizer.Add(self.prog_st, (2, 0), (1, 2), flag=wx.LEFT|wx.RIGHT, border=5)
self.sizer.Add(self.prog_gg, (3, 0), (1, 2), flag=wx.EXPAND|wx.LEFT|wx.RIGHT|wx.BOTTOM, border=5)
self.sizer.AddGrowableCol(0)
self.sizer.AddGrowableCol(1)
self.sizer.AddGrowableRow(1)
self.panel.SetSizer(self.sizer)
self.Bind(wx.EVT_CLOSE, self.OnClose)
self.output_tc.AppendText('Number of processes = %d' % self.taskserver.numprocesses)
if (self.taskserver.numprocesses == 0):
self.output_tc.AppendText(' (no extra processes)')
self.output_tc.AppendText('\n')
def OnStart(self, event):
"""
Start the execution of tasks by the processes.
"""
self.start_bt.Enable(False)
self.stop_bt.Enable(True)
self.output_tc.AppendText('Unordered results...\n')
# Start processing tasks
self.taskserver.processTasks(self.update)
if (self.taskserver.keepgoing):
self.output_tc.AppendText('Time elapsed: %s\n' % time.strftime('%M:%S', time.gmtime(self.taskserver.timeElapsed)))
self.start_bt.Enable(True)
self.stop_bt.Enable(False)
def OnStop(self, event):
"""
Stop the execution of tasks by the processes.
"""
self.stop_bt.Enable(False)
if (self.taskserver.j < self.taskserver.i):
self.output_tc.AppendText('Completing queued tasks...\n')
# Stop processing tasks
self.taskserver.processStop(self.update)
self.start_bt.Enable(True)
def OnClose(self, event):
"""
Stop the task queue, terminate processes and close the window.
"""
self.OnStop(event)
self.start_bt.Enable(False)
if ((self.taskserver.numprocesses > 0) and self.taskserver.anyAlive()):
busy = wx.BusyInfo("Waiting for processes to terminate...")
# Terminate the processes
self.taskserver.processTerm()
self.Destroy()
def update(self, output):
"""
Get and print the results from one completed task.
"""
self.output_tc.AppendText('%s [%d] calculate(%d) = %.2f\n' % ( output['process']['name'], output['process']['pid'], output['result'][0], output['result'][1] ))
self.prog_st.SetLabel('Complete: %2d / %2d Time Elapsed: %s Remaining: %s' % (self.taskserver.j + 1, self.taskserver.numtasks, time.strftime('%M:%S', time.gmtime(self.taskserver.timeElapsed)), time.strftime('%M:%S', time.gmtime(self.taskserver.timeRemain))))
self.prog_gg.SetValue(self.taskserver.j + 1)
# Give the user an opportunity to interact
wx.YieldIfNeeded()
class MyApp(wx.App):
"""
A simple App class, modified to hold the processes and task queues.
"""
def __init__(self, redirect=True, filename=None, useBestVisual=False, clearSigInt=True, taskserver=None):
"""
Initialise the App.
"""
self.taskserver = taskserver
wx.App.__init__(self, redirect, filename, useBestVisual, clearSigInt)
def OnInit(self):
"""
Initialise the App with a Frame.
"""
self.frame = MyFrame(None, -1, 'wxSimple_MP', self.taskserver)
self.frame.Show(True)
return True
if __name__ == '__main__':
freeze_support()
try:
(optlist, args) = getopt.getopt(sys.argv[1:], 'hcn:t:', ['help', 'cmd', 'numproc=', 'numtasks='])
except getopt.GetoptError, msg:
sys.stderr.write("wxsimple_mp: Error: %s" % msg)
sys.stderr.write("See 'wxsimple_mp --help'.\n")
sys.exit(2)
else:
cmdline = False
numproc = None
numtasks = 20
for (opt, optarg) in optlist:
if opt in ('-h', '--help'):
sys.stdout.write(__doc__)
sys.exit(0)
elif opt in ('-c', '--cmd'):
cmdline = True
elif opt in ('-n', '--numproc'):
numproc = int(optarg)
elif opt in ('-t', '--numtasks'):
numtasks = int(optarg)
if (numproc is None):
# Determine the number of CPU's/cores
try:
numproc = min(cpu_count(), numtasks)
except NotImplementedError:
numproc = 1
# Create the task list
Tasks = [ (int(1e6), random.randint(0, 45)) for i in range(numtasks) ]
# The worker processes must be started here!
tsmp = TaskServerMP(numprocesses=numproc, tasks=Tasks)
if (cmdline):
tsmp.run()
else:
app = MyApp(redirect=True, filename='wxsimple_mp.stderr.log', taskserver=tsmp)
app.MainLoop()