Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

beebox / crossover   deb

Repository URL to install this package:

Version: 18.5.0-1 

/ opt / cxoffice / lib / python / pyop.py

# (c) Copyright 2009-2010. CodeWeavers, Inc.

import os
import threading
import traceback

import gobject

import cxdecorators
import cxlog


# FIXME: Use the Queue module instead? The PriorityQueue class would be
# especially interesting. If we avoid it for a good reason, then document that
# reason here.


# FIXME: pyop / PythonOperation is a bad name:
# - This is 100% Python code so 'Python' means nothing
# - It does not mention the notion of 'queue' which is central to this module
# - The notion of operation is relevant to the calling code: i.e. we use this
#   module to perform operations but it could also be used to perform
#   computations in the background or other things. The concept this module
#   manipulates is that of tasks.
#
# cxtaskqueue, CXTask and CXTaskQueue would probably be better names
# The cx prefix being to avoid potential name collisions


#####
#
# The task definition
#
#####

class PythonOperation(object):

    canceled = False
    isRunning = False
    priority = 0

    def __init__(self):
        pass

    def __unicode__(self):
        return self.__class__.__name__

    def enqueued(self):
        """This is run as soon as the operation is queued for execution.

        It is run in whatever thread calls enqueue().
        Any initialization task that should be done if, and only if, the
        operation is going to be run should be performed here rather than in
        __init__(). This way, if one creates an instance of the operation but
        then never queues it for execution there's no harm done. Same this if
        one creates a single instance and then runs it multiple times.
        """
        cxlog.log("queued "+cxlog.to_str(self))

    @cxdecorators.abstractmethod
    def main(self):
        """This function must be overridden and will be run in a background
        thread.

        Do not perform any GUI interaction here!
        """
        raise NotImplementedError()

    def finish(self):
        """This will be run in the main thread after the operation
        completes.

        Because this function runs in the main thread it can perform GUI
        operations.
        """
        cxlog.log("finish "+cxlog.to_str(self))
        self.isRunning = False

    def cancel(self):
        self.canceled = True


#####
#
# The task queue implementation
#
#####

class PythonOperationQueue(object):

    def __init__(self, maxThreads=0):
        if maxThreads == 0:
            try:
                self.maxThreads = os.sysconf('SC_NPROCESSORS_ONLN')
            except:
                self.maxThreads = 2
        else:
            self.maxThreads = maxThreads
        self.threadPool = threading.Semaphore(self.maxThreads)
        self.lock = threading.Lock()
        self.operationList = []

    # FIXME: Seems to be for internal use only -> name should start with '_'
    def spawnOpThread(self):
        """Spawns a new thread to run operations, if necessary"""
        if self.operationList and self.threadPool.acquire(False):
            worker = threading.Thread(target=self.runOperations)
            # Mark it as a daemon thread so it does not prevent the process
            # from exiting
            worker.setDaemon(True)
            worker.start()

    # FIXME: The class is a queue already, i.e. pretty much a list, so append()
    # would be a better name
    def enqueue(self, operation):
        # We must call enqueued() before adding the operation to the queue,
        # or else a thread could run main() first
        operation.enqueued()
        self.lock.acquire()
        try:
            self.operationList.append(operation)
        finally:
            self.lock.release()
        self.spawnOpThread()

    def _popOperation(self):
        self.lock.acquire()
        try:
            index = 0
            priority = self.operationList[0].priority # may raise IndexError
            for i in range(1, len(self.operationList)):
                new_priority = self.operationList[i].priority
                if new_priority > priority:
                    priority = new_priority
                    index = i
            return self.operationList.pop(index)
        finally:
            self.lock.release()

    # FIXME: Seems to be for internal use only -> name should start with '_'
    def runOperations(self):
        tid = threading.currentThread().getName()
        while True:
            try:
                operation = self._popOperation()
            except IndexError:
                # no more operations left to run
                self.threadPool.release()
                # Just in case something was enqueued after the pop() but
                # tried to spawn a thread before release()
                self.spawnOpThread()
                return
            try:
                cxlog.log("%s: running %s" % (tid, cxlog.to_str(operation)))
                operation.isRunning = True
                operation.main()
            except:
                cxlog.err("an operation raised an exception:\n%s" % traceback.format_exc().rstrip('\n'))
            # Use the GTK+ idle loop to run the finish() function in the main
            # thread so it can perform GUI operations.
            gobject.idle_add(operation.finish)

sharedOperationQueue = PythonOperationQueue()


#####
#
# Some test code
#
#####

def main():
    debug_threads = []

    class TestOperation(PythonOperation):
        def main(self):
            import time
            import random
            debug_threads.append(None)
            print "start", len(debug_threads)
            time.sleep(random.SystemRandom().random()*5)
            print "end", len(debug_threads)
            debug_threads.pop()

    opqueue = PythonOperationQueue(maxThreads=5)

    for _unused in range(30):
        opqueue.enqueue(TestOperation())

if __name__ == '__main__':
    main()