![]() |
Available news archives:
comp.lang.tcl
-
comp.lang.python
-
comp.security.firewalls
-
sci.crypt -
comp.lang.php -
comp.lang.javascript
|
|
comp.lang.python archiveMultithreaded class with queues
From: Antal Rutz <arutz@mimoza.pantel.net>
Date: Fri Jul 22 2005 - 12:12:41 CEST
Hi!
I wrote a little class to make multihreading easier. It's based on one
It spawns up number of CollectorThreads and one ProcessThread. The
it seems to work with test functions but when I use a network-intensive
Any help?
--
--arutz
#!/usr/local/bin/python
"""
Multithreaded class for the task: multiple collector - one dataprocessor
Usage:
collector = Collector(data, colfunc, prfunc, maxThreads)
collector.run()
Internals:
Collector spawns up the CollectorThreads and a ProcessThread and puts
the data onto the inputQueue. The CollectorThread reads the data from
inputQueue and processes it through 'colfunc()'. Then puts
the result onto the outputQueue. The ProcessThread only listens on the
outputQueue (blocks on it) and feeds the data to `prfunc()`.
Thread shutdown: collectorthreads: inputQueue.put(shutdown=True)
processthread: outputQueue.put(shutdown=True)
"""
import threading
import Queue
#from operator import truth as _truth
#def _xor(a,b):
# return _truth(a) ^ _truth(b)
class _Token:
def __init__(self, data=None, shutdown=None):
#if not _xor(data, shutdown):
# raise "Tsk, tsk, need to set either URL or shutdown (not both)"
self.data = data
self.shutdown = shutdown
class _CollectorThread(threading.Thread):
"""Worker thread blocking on inputQueue.
The result goes to outputQueue after processed by self.func.
"""
def __init__(self, inQueue, outQueue, func):
threading.Thread.__init__(self)
self.inQ = inQueue
self.outQ = outQueue
self.func = func
def run(self):
while True:
token = self.inQ.get()
if token.shutdown is not None:
break
else:
#collect data from the routers
#print token.data
result = self.func(token.data)
self.outQ.put_nowait(_Token(data=result))
class _ProcessThread(threading.Thread):
"""'Reader-only' thread processing outputQueue."""
def __init__(self, outQueue, func):
threading.Thread.__init__(self)
self.outQ = outQueue
self.func = func
def run(self):
while True:
token = self.outQ.get()
if token.shutdown is not None:
break
else:
#insert into db or do anything
self.func(token.data)
class Collector:
"""Spawns up the threadpool (worker and processthreads)
and puts tha data onto the inputQueue of the worker threads.
Then shuts them down."""
def __init__(self, data, colfunc, prfunc, maxThreads=5):
"""Parameters:
- data: data for collectfunc (type of sequence)
- colfunc: function to process inputQueue into outputQueue
- prfunc: function to process outputQueue
- maxThreads: MAX_THREADS
"""
self.data = data
self.inputQueue = Queue.Queue()
self.outputQueue = Queue.Queue()
self.threadPool = []
#Start the worker threads
for i in range(maxThreads):
collector = _CollectorThread(self.inputQueue,
self.outputQueue,
colfunc)
collector.start()
self.threadPool.append(collector)
#Start the db thread
self.processthread = _ProcessThread(self.outputQueue, prfunc)
self.processthread.start()
def run(self):
"""Queue the data and shutdown the threads."""
self._queueData()
self._shutdown()
def _queueData(self):
"""Put data onto the inputQueue."""
for d in self.data:
self.inputQueue.put_nowait(_Token(data=d))
def _shutdown(self):
for i in self.threadPool:
self.inputQueue.put(_Token(shutdown=True))
for thread in self.threadPool:
thread.join()
self.outputQueue.put(_Token(shutdown=True))
self.processthread.join()
if __name__ == '__main__':
def myprint(s):
print s
def hashdata(a):
return a + ': OK'
MAX_THREADS = 5
data = ['1', '2', 'asd', 'qwe']
collect = Collector(data=data, colfunc=hashdata, prfunc=myprint, maxThreads=MAX_THREADS)
collect.run()
[plaintext C.py]
Received on Thu Sep 29 17:05:16 2005
|