# (c) Copyright 2009-2010. CodeWeavers, Inc.
import os
import threading
import traceback
import gobject
import cxdecorators
import cxlog
# FIXME: Use the Queue module instead? The PriorityQueue class would be
# especially interesting. If we avoid it for a good reason, then document that
# reason here.
# FIXME: pyop / PythonOperation is a bad name:
# - This is 100% Python code so 'Python' means nothing
# - It does not mention the notion of 'queue' which is central to this module
# - The notion of operation is relevant to the calling code: i.e. we use this
# module to perform operations but it could also be used to perform
# computations in the background or other things. The concept this module
# manipulates is that of tasks.
#
# cxtaskqueue, CXTask and CXTaskQueue would probably be better names
# The cx prefix being to avoid potential name collisions
#####
#
# The task definition
#
#####
class PythonOperation(object):
canceled = False
isRunning = False
priority = 0
def __init__(self):
pass
def __unicode__(self):
return self.__class__.__name__
def enqueued(self):
"""This is run as soon as the operation is queued for execution.
It is run in whatever thread calls enqueue().
Any initialization task that should be done if, and only if, the
operation is going to be run should be performed here rather than in
__init__(). This way, if one creates an instance of the operation but
then never queues it for execution there's no harm done. Same this if
one creates a single instance and then runs it multiple times.
"""
cxlog.log("queued "+cxlog.to_str(self))
@cxdecorators.abstractmethod
def main(self):
"""This function must be overridden and will be run in a background
thread.
Do not perform any GUI interaction here!
"""
raise NotImplementedError()
def finish(self):
"""This will be run in the main thread after the operation
completes.
Because this function runs in the main thread it can perform GUI
operations.
"""
cxlog.log("finish "+cxlog.to_str(self))
self.isRunning = False
def cancel(self):
self.canceled = True
#####
#
# The task queue implementation
#
#####
class PythonOperationQueue(object):
def __init__(self, maxThreads=0):
if maxThreads == 0:
try:
self.maxThreads = os.sysconf('SC_NPROCESSORS_ONLN')
except:
self.maxThreads = 2
else:
self.maxThreads = maxThreads
self.threadPool = threading.Semaphore(self.maxThreads)
self.lock = threading.Lock()
self.operationList = []
# FIXME: Seems to be for internal use only -> name should start with '_'
def spawnOpThread(self):
"""Spawns a new thread to run operations, if necessary"""
if self.operationList and self.threadPool.acquire(False):
worker = threading.Thread(target=self.runOperations)
# Mark it as a daemon thread so it does not prevent the process
# from exiting
worker.setDaemon(True)
worker.start()
# FIXME: The class is a queue already, i.e. pretty much a list, so append()
# would be a better name
def enqueue(self, operation):
# We must call enqueued() before adding the operation to the queue,
# or else a thread could run main() first
operation.enqueued()
self.lock.acquire()
try:
self.operationList.append(operation)
finally:
self.lock.release()
self.spawnOpThread()
def _popOperation(self):
self.lock.acquire()
try:
index = 0
priority = self.operationList[0].priority # may raise IndexError
for i in range(1, len(self.operationList)):
new_priority = self.operationList[i].priority
if new_priority > priority:
priority = new_priority
index = i
return self.operationList.pop(index)
finally:
self.lock.release()
# FIXME: Seems to be for internal use only -> name should start with '_'
def runOperations(self):
tid = threading.currentThread().getName()
while True:
try:
operation = self._popOperation()
except IndexError:
# no more operations left to run
self.threadPool.release()
# Just in case something was enqueued after the pop() but
# tried to spawn a thread before release()
self.spawnOpThread()
return
try:
cxlog.log("%s: running %s" % (tid, cxlog.to_str(operation)))
operation.isRunning = True
operation.main()
except:
cxlog.err("an operation raised an exception:\n%s" % traceback.format_exc().rstrip('\n'))
# Use the GTK+ idle loop to run the finish() function in the main
# thread so it can perform GUI operations.
gobject.idle_add(operation.finish)
sharedOperationQueue = PythonOperationQueue()
#####
#
# Some test code
#
#####
def main():
debug_threads = []
class TestOperation(PythonOperation):
def main(self):
import time
import random
debug_threads.append(None)
print "start", len(debug_threads)
time.sleep(random.SystemRandom().random()*5)
print "end", len(debug_threads)
debug_threads.pop()
opqueue = PythonOperationQueue(maxThreads=5)
for _unused in range(30):
opqueue.enqueue(TestOperation())
if __name__ == '__main__':
main()