Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

beebox / crossover   deb

Repository URL to install this package:

Version: 18.5.0-1 

/ opt / cxoffice / lib / python / cxaiebase.py

# (c) Copyright 2009-2010, 2015. CodeWeavers, Inc.

import threading

import cxlog
import cxdecorators

import cxobjc



#####
#
# Task definition
#
#####

TASK_QUEUED = 0
TASK_RUNNABLE = 1
TASK_RUNNING = 2
TASK_DONE = 3

STATUS_STR = ["queued", "runnable", "running", "done"]
_TASK_SCHEDULED = set((TASK_RUNNING, TASK_DONE))


class AIETask(object):
    """Represents a task that can be run by the AIEScheduler."""

    # If an error occurs, set this attribute to the appropriate error message.
    # This is also what tells the caller that the task failed. It can then try
    # to take remedial mesures and / or call it again. If continuing the
    # installation, then the done() method must still be called.
    error = None


    #####
    #
    # Task configuration
    #
    #####

    def __init__(self, scheduler, label):
        """The label specifies how to call this task in the GUI."""
        self.scheduler = scheduler
        self.scheduler.add(self)
        self.label = label

        # Track dependencies between the AIETasks.
        self.dependencies = set()
        self.parents = set()

        self.scheduler.add_runnable(self)
        self._status = TASK_RUNNABLE


    def add_dependency(self, dependency):
        """Makes this task depend on the completion of the specified task."""
        self.scheduler.lock()
        try:
            dependency.parents.add(self)
            if not self.dependencies:
                self.scheduler.remove_runnable(self)
            self.dependencies.add(dependency)
        finally:
            self.scheduler.unlock()


    def remove_dependency(self, dependency):
        """Removes the specified dependency."""
        self.scheduler.lock()
        try:
            dependency.parents.remove(self)
            self.dependencies.discard(dependency)
            if self._status not in _TASK_SCHEDULED and not self.dependencies:
                self.scheduler.add_runnable(self)
        finally:
            self.scheduler.unlock()


    def clear_dependencies(self):
        """Removes all the dependencies."""
        self.scheduler.lock()
        try:
            for dependency in self.dependencies:
                dependency.parents.discard(self)
            self.dependencies.clear()
            if self._status not in _TASK_SCHEDULED:
                self.scheduler.add_runnable(self)
        finally:
            self.scheduler.unlock()


    def createsBottle(self):
        """Returns True if this task creates a bottle; False otherwise.

        Intended to be overridden by subclasses."""
        # pylint: disable=R0201
        return False


    #####
    #
    # Task scheduling
    #
    #####

    def is_runnable(self):
        """Returns True is the object can be run, and false otherwise.

        Any task that has no unfulfilled dependencies is put on the
        scheduler's runnable list, and normally is_runnable() returns True in
        that case. But one can override this function to prevent an otherwise
        runnable task from being run by checking for some other criteria. See
        also schedule().
        """
        return self._status not in _TASK_SCHEDULED and not self.dependencies


    def can_run(self):
        """Returns True if the task can be run now, False otherwise.

        Together with is_runnable() this can be used to ensure that only one
        instance of a given type of task is being run at a given time.
        """
        return self.is_runnable()


    def _getniceness(self):
        """Returns a value to help decide which of two runnable tasks to run
        first.

        Specifically, the task with the lowest niceness value should be run
        first. This is so that a regular sort returns them in the recommended
        run order.
        """
        # pylint: disable=R0201
        return 0

    niceness = property(_getniceness)


    def schedule(self):
        """Tells the task that it will be run now."""
        self._status = TASK_RUNNING


    def can_cancel(self):
        """Returns True if the task can be canceled."""
        # pylint: disable=R0201
        return False

    # Wrapper for Obj-C
    # An alias doesn't work for subclass overrides
    def canCancel(self):
        return self.can_cancel()


    def cancel(self):
        """Tells the task to stop whatever main() is doing as soon as it can.

        Tasks that cannot be interrupted need not do anything.
        This raises RuntimeError if the task was not started yet.
        """
        if self._status not in _TASK_SCHEDULED:
            raise RuntimeError


    def done(self):
        """Marks this task as done and update the tasks that depend on it.

        Note that the main() method does not mark a task as done. Only
        this function does. It's perfectly valid to call done() without
        actually running the task if the caller thinks it can replace it (for
        downloads for instance), or decides it should not be run after all.
        """
        self.scheduler.lock()
        try:
            if self._status not in _TASK_SCHEDULED:
                self.schedule()
            self._status = TASK_DONE
            self.scheduler.done(self)
            #cxlog.log_("aie", "Done " + cxlog.to_str(self))
            unblocked = [self]
            for task in unblocked:
                for parent in task.parents:
                    parent.dependencies.discard(self)
                    if parent.dependencies:
                        continue
                    if parent.status == TASK_DONE:
                        # The parent was already marked as done, so its
                        # parents may need to be made runnable
                        unblocked.append(parent)
                    else:
                        self.scheduler.add_runnable(parent)
                task.parents.clear()
        finally:
            self.scheduler.unlock()


    def _getstatus(self):
        if self._status == TASK_QUEUED and not self.dependencies:
            return TASK_RUNNABLE
        return self._status

    status = property(_getstatus)



    #####
    #
    # Task execution
    #
    #####

    @cxdecorators.abstractmethod
    def main(self):
        """Performs the task and returns True for success and False for
        failure.

        Note that this method does not call done(). Doing so is the
        responsibility of the caller. This makes it possible to call this
        method again in case of failure.
        """
        raise NotImplementedError()


def tasks_cmp(task1, task2):
    """Compares the niceness of a task pair.

    This function can be used to sort the tasks so the one to run first comes
    first.
    """
    delta = task1.niceness - task2.niceness
    if delta == 0:
        return 0
    if delta < 0:
        return -1
    return 1


def tasks_str(iterable):
    """Converts a sequence of tasks into a human-readable string."""
    if not iterable:
        return "None"
    tasks = []
    for task in iterable:
        tasks.append(str(task))
    tasks_desc = ", ".join(tasks)
    if isinstance(iterable, set):
        return "set([" + tasks_desc + "])"
    if isinstance(iterable, list):
        return "[" + tasks_desc + "]"
    return "(" + tasks_desc + ")"



#####
#
# A Task that does nothing
#
#####

class AIENop(AIETask):
    """This task does nothing.

    Its use is as a focal point for dependencies. For instance if you have two
    or maybe three tasks that need to be done before a bunch of others, you
    can create an AIENop instance, make all the other tasks depend on it, and
    then make the AIENop instance depend on the tasks that must be done first
    as you determine what those are.
    """

    def __init__(self, scheduler, logname=""):
        AIETask.__init__(self, scheduler, "")
        self.logname = logname

    def __unicode__(self):
        return "%s(%s)" % (self.__class__.__name__, self.logname)

    def main(self):
        """Does nothing."""
        return True



#####
#
# AIETaskGroup
#
#####

class AIETaskGroup(AIENop):
    """Makes it possible to manipulate a group of tasks as if they were a
    single task.

    To add a task to the group, make it depend on group.first, and make
    group.last depend on it. Only the first task in a chain need to depend on
    group.first, and similarly only the last one needs to be depended upon by
    group.last.
    """

    def __init__(self, scheduler, logname="", first=None, last=None):
        AIENop.__init__(self, scheduler, logname + " (Group)")
        if not first:
            first = AIENop(scheduler, logname + " (First)")
        self.first = first
        if not last:
            last = AIENop(scheduler, logname + " (Last)")
        self.last = last
        last.add_dependency(first)
        AIENop.add_dependency(self, self.last)

    def add_dependency(self, dependency):
        """Makes the first task of the group depend on the specified task."""
        self.first.add_dependency(dependency)

    def remove_dependency(self, dependency):
        """Removes the specified dependency."""
        self.first.add_dependency(dependency)

    def insert(self, task):
        """Inserts a task between the group's first and last tasks."""
        task.add_dependency(self.first)
        self.last.add_dependency(task)



#####
#
# The task scheduler
#
#####

class AIEScheduler(cxobjc.Proxy):
    """Schedules AIETasks for execution in an order that respects their
    dependencies.

    Note that this class does not actually run the tasks; this is the
    responsibility of the caller. Also it provides a userdata mapping attribute
    that third parties can use to store or cache data related to this
    scheduler.
    """

    def __init__(self):
        cxobjc.Proxy.__init__(self)
        self._tasks = set()
        self._runnable = set()
        self._lock = threading.Lock()
        self.userdata = {}


    #####
    #
    # API for the tasks
    #
    #####

    def lock(self):
        """Locks the scheduler to make some operations thread-safe."""
        self._lock.acquire()


    def unlock(self):
        """Unlocks the scheduler."""
        self._lock.release()


    def add(self, task):
        """Registers a new task to be run."""
        self._tasks.add(task)


    def done(self, task):
        """Tasks use this function to tell the scheduler they are done."""
        self._runnable.discard(task)
        self._tasks.discard(task)


    def add_runnable(self, task):
        """Moves the task to the set of runnable tasks.

        A task calls this function when all its dependencies have been
        fulfilled. Note that this does not mean that it can actually be run
        right away as it can impose other constraints through its can_run()
        method.
        """
        self._runnable.add(task)


    def remove_runnable(self, task):
        """Removes a task from the set of runnable tasks.

        This can either be called by a task when a dependency is added to it,
        or by the scheduler itself when a runnable task is scheduled.
        """
        self._runnable.discard(task)


    #####
    #
    # Functions to schedule the tasks
    #
    #####

    def sortedRunnable(self):
        """A convenience method for objc. It returns the current
           Runnable set sorted by niceness.

           This is doubly helpful in objc because it returns a list
           rather than a set (which is what runnable is, ordinarily.)
           Sets are opaque in objc.
        """
        # pylint: disable=C0103
        return sorted(self.runnable(), tasks_cmp)

    def runnable(self):
        """Iterates over the runnable tasks.

        Note this iterator may raise StopIteration even though there are still
        tasks to be run. In particular this happens if none of the remaining
        tasks are runnable because they are waiting for other tasks to
        complete.
        """
        seen = set()
        while True:
            try:
                runnable = None
                for task in self._runnable:
                    if task not in seen and task.can_run():
                        seen.add(task)
                        runnable = task
                        break
                if runnable:
                    yield runnable
                else:
                    cxlog.log_("aie", "No more runnable tasks")
                    raise StopIteration()
            except RuntimeError:
                # The _runnable set changed, probably because task has just
                # become runnable or the caller scheduled the last task we
                # returned.
                pass


    def schedule(self, task):
        """This function MUST be called before starting to run a task. It
        tells the task that it is about to be run, removes it from the set of
        runnable tasks and returns True.

        If it turns out the task cannot be run, then returns False.
        """
        self.lock()
        try:
            if task not in self._runnable or not task.can_run():
                cxlog.log_("aie", "%d: %s cannot run" % (task.niceness, cxlog.to_str(task)))
                return False
            self.remove_runnable(task)
            task.schedule()
            cxlog.log_("aie", "Scheduled %d: %s" % (task.niceness, cxlog.to_str(task)))
            return True
        finally:
            self.unlock()

    schedule_ = schedule


    def all_done(self):
        """Returns True if there is no more task to run."""
        if self._tasks:
            return False
        cxlog.log_("aie", "All Done ")
        return True

    # Wrapper for Obj-C
    # An alias doesn't work for subclass overrides
    def allDone(self):
        return self.all_done()


    #####
    #
    # Some debugging functions
    #
    #####

    def _sort_tasks(self, task, tasks, seen):
        # The task graph may contain loops that will be resolved at run time.
        # So mark the tasks we've seen to avoid infinite loops.
        if task not in seen:
            seen.add(task)
            for child in sorted(task.dependencies, cmp=tasks_cmp):
                self._sort_tasks(child, tasks, seen)
            tasks.append(task)


    def get_sorted_tasks(self):
        """Returns a list of the tasks roughly sorted in their execution order.
        """
        tasks = []
        seen = set()
        self.lock()
        try:
            for task in self._tasks:
                while True:
                    if task in seen:
                        break
                    for parent in task.parents:
                        # Try to start with a task which is at the 'top' of
                        # the graph. Of course there are usually multiple such
                        # tasks!
                        task = parent
                        break
                    else:
                        # Note that any task we have not seen yet either
                        # depends on one we have seen, or is in a completely
                        # separate graph. In either case it's ok to append it
                        # after those we have seen.
                        self._sort_tasks(task, tasks, seen)
            return tasks
        finally:
            self.unlock()


    sortedTasks = get_sorted_tasks


    #####
    #
    # Some debugging functions
    #
    #####

    def dump_task(self, prefix, task, dumped):
        flags = []
        if task.status == TASK_DONE:
            flags.append("done")
        if task.status == TASK_RUNNING:
            flags.append("running")
        if task in self._runnable:
            flags.append("runnable")
            if not task.can_run():
                flags.append("cannot-run")
        if task in dumped:
            flags.append("dumped already")
        if flags:
            flags_desc = " (" + ", ".join(flags) + ")"
        else:
            flags_desc = ""
        cxlog.log_("aie", prefix + cxlog.to_str(task) + flags_desc)
        if task not in dumped:
            dumped.add(task)
            for dep in task.dependencies:
                self.dump_task(prefix + "| ", dep, dumped)


    def dump_tasks(self):
        cxlog.log_("aie", "+-- Dumping %d tasks" % len(self._tasks))
        seen = set()
        dumped = set()
        for task in self._tasks:
            while True:
                if task in seen:
                    break
                seen.add(task)
                for parent in task.parents:
                    task = parent
                    break
                else:
                    self.dump_task("| ", task, dumped)
                    break
        if len(dumped) != len(self._tasks):
            cxlog.log_("aie", "| There are some phantom tasks")
        cxlog.log_("aie", "+-- %d tasks dumped" % len(dumped))


    def check(self, label, dump=False):
        buggy = False
        for task in self._tasks:
            if (not task.dependencies and \
                task.status not in _TASK_SCHEDULED and \
                task not in self._runnable) or \
               ((task.dependencies or task.status in _TASK_SCHEDULED) and \
                task in self._runnable):
                if not buggy:
                    cxlog.log_("aie", cxlog.to_str(label) + ": Found AIEScheduler bugs")
                if task in self._runnable:
                    runnable = "in "
                else:
                    runnable = ""
                cxlog.err("%s (status=%s, deps=%d) is %srunnable" % (cxlog.to_str(task), STATUS_STR[task.status], len(task.dependencies), runnable))
                buggy = True
        if buggy or dump:
            self.dump_tasks()