# (c) Copyright 2009-2010, 2015. CodeWeavers, Inc.
import threading
import cxlog
import cxdecorators
import cxobjc
#####
#
# Task definition
#
#####
TASK_QUEUED = 0
TASK_RUNNABLE = 1
TASK_RUNNING = 2
TASK_DONE = 3
STATUS_STR = ["queued", "runnable", "running", "done"]
_TASK_SCHEDULED = set((TASK_RUNNING, TASK_DONE))
class AIETask(object):
"""Represents a task that can be run by the AIEScheduler."""
# If an error occurs, set this attribute to the appropriate error message.
# This is also what tells the caller that the task failed. It can then try
# to take remedial mesures and / or call it again. If continuing the
# installation, then the done() method must still be called.
error = None
#####
#
# Task configuration
#
#####
def __init__(self, scheduler, label):
"""The label specifies how to call this task in the GUI."""
self.scheduler = scheduler
self.scheduler.add(self)
self.label = label
# Track dependencies between the AIETasks.
self.dependencies = set()
self.parents = set()
self.scheduler.add_runnable(self)
self._status = TASK_RUNNABLE
def add_dependency(self, dependency):
"""Makes this task depend on the completion of the specified task."""
self.scheduler.lock()
try:
dependency.parents.add(self)
if not self.dependencies:
self.scheduler.remove_runnable(self)
self.dependencies.add(dependency)
finally:
self.scheduler.unlock()
def remove_dependency(self, dependency):
"""Removes the specified dependency."""
self.scheduler.lock()
try:
dependency.parents.remove(self)
self.dependencies.discard(dependency)
if self._status not in _TASK_SCHEDULED and not self.dependencies:
self.scheduler.add_runnable(self)
finally:
self.scheduler.unlock()
def clear_dependencies(self):
"""Removes all the dependencies."""
self.scheduler.lock()
try:
for dependency in self.dependencies:
dependency.parents.discard(self)
self.dependencies.clear()
if self._status not in _TASK_SCHEDULED:
self.scheduler.add_runnable(self)
finally:
self.scheduler.unlock()
def createsBottle(self):
"""Returns True if this task creates a bottle; False otherwise.
Intended to be overridden by subclasses."""
# pylint: disable=R0201
return False
#####
#
# Task scheduling
#
#####
def is_runnable(self):
"""Returns True is the object can be run, and false otherwise.
Any task that has no unfulfilled dependencies is put on the
scheduler's runnable list, and normally is_runnable() returns True in
that case. But one can override this function to prevent an otherwise
runnable task from being run by checking for some other criteria. See
also schedule().
"""
return self._status not in _TASK_SCHEDULED and not self.dependencies
def can_run(self):
"""Returns True if the task can be run now, False otherwise.
Together with is_runnable() this can be used to ensure that only one
instance of a given type of task is being run at a given time.
"""
return self.is_runnable()
def _getniceness(self):
"""Returns a value to help decide which of two runnable tasks to run
first.
Specifically, the task with the lowest niceness value should be run
first. This is so that a regular sort returns them in the recommended
run order.
"""
# pylint: disable=R0201
return 0
niceness = property(_getniceness)
def schedule(self):
"""Tells the task that it will be run now."""
self._status = TASK_RUNNING
def can_cancel(self):
"""Returns True if the task can be canceled."""
# pylint: disable=R0201
return False
# Wrapper for Obj-C
# An alias doesn't work for subclass overrides
def canCancel(self):
return self.can_cancel()
def cancel(self):
"""Tells the task to stop whatever main() is doing as soon as it can.
Tasks that cannot be interrupted need not do anything.
This raises RuntimeError if the task was not started yet.
"""
if self._status not in _TASK_SCHEDULED:
raise RuntimeError
def done(self):
"""Marks this task as done and update the tasks that depend on it.
Note that the main() method does not mark a task as done. Only
this function does. It's perfectly valid to call done() without
actually running the task if the caller thinks it can replace it (for
downloads for instance), or decides it should not be run after all.
"""
self.scheduler.lock()
try:
if self._status not in _TASK_SCHEDULED:
self.schedule()
self._status = TASK_DONE
self.scheduler.done(self)
#cxlog.log_("aie", "Done " + cxlog.to_str(self))
unblocked = [self]
for task in unblocked:
for parent in task.parents:
parent.dependencies.discard(self)
if parent.dependencies:
continue
if parent.status == TASK_DONE:
# The parent was already marked as done, so its
# parents may need to be made runnable
unblocked.append(parent)
else:
self.scheduler.add_runnable(parent)
task.parents.clear()
finally:
self.scheduler.unlock()
def _getstatus(self):
if self._status == TASK_QUEUED and not self.dependencies:
return TASK_RUNNABLE
return self._status
status = property(_getstatus)
#####
#
# Task execution
#
#####
@cxdecorators.abstractmethod
def main(self):
"""Performs the task and returns True for success and False for
failure.
Note that this method does not call done(). Doing so is the
responsibility of the caller. This makes it possible to call this
method again in case of failure.
"""
raise NotImplementedError()
def tasks_cmp(task1, task2):
"""Compares the niceness of a task pair.
This function can be used to sort the tasks so the one to run first comes
first.
"""
delta = task1.niceness - task2.niceness
if delta == 0:
return 0
if delta < 0:
return -1
return 1
def tasks_str(iterable):
"""Converts a sequence of tasks into a human-readable string."""
if not iterable:
return "None"
tasks = []
for task in iterable:
tasks.append(str(task))
tasks_desc = ", ".join(tasks)
if isinstance(iterable, set):
return "set([" + tasks_desc + "])"
if isinstance(iterable, list):
return "[" + tasks_desc + "]"
return "(" + tasks_desc + ")"
#####
#
# A Task that does nothing
#
#####
class AIENop(AIETask):
"""This task does nothing.
Its use is as a focal point for dependencies. For instance if you have two
or maybe three tasks that need to be done before a bunch of others, you
can create an AIENop instance, make all the other tasks depend on it, and
then make the AIENop instance depend on the tasks that must be done first
as you determine what those are.
"""
def __init__(self, scheduler, logname=""):
AIETask.__init__(self, scheduler, "")
self.logname = logname
def __unicode__(self):
return "%s(%s)" % (self.__class__.__name__, self.logname)
def main(self):
"""Does nothing."""
return True
#####
#
# AIETaskGroup
#
#####
class AIETaskGroup(AIENop):
"""Makes it possible to manipulate a group of tasks as if they were a
single task.
To add a task to the group, make it depend on group.first, and make
group.last depend on it. Only the first task in a chain need to depend on
group.first, and similarly only the last one needs to be depended upon by
group.last.
"""
def __init__(self, scheduler, logname="", first=None, last=None):
AIENop.__init__(self, scheduler, logname + " (Group)")
if not first:
first = AIENop(scheduler, logname + " (First)")
self.first = first
if not last:
last = AIENop(scheduler, logname + " (Last)")
self.last = last
last.add_dependency(first)
AIENop.add_dependency(self, self.last)
def add_dependency(self, dependency):
"""Makes the first task of the group depend on the specified task."""
self.first.add_dependency(dependency)
def remove_dependency(self, dependency):
"""Removes the specified dependency."""
self.first.add_dependency(dependency)
def insert(self, task):
"""Inserts a task between the group's first and last tasks."""
task.add_dependency(self.first)
self.last.add_dependency(task)
#####
#
# The task scheduler
#
#####
class AIEScheduler(cxobjc.Proxy):
"""Schedules AIETasks for execution in an order that respects their
dependencies.
Note that this class does not actually run the tasks; this is the
responsibility of the caller. Also it provides a userdata mapping attribute
that third parties can use to store or cache data related to this
scheduler.
"""
def __init__(self):
cxobjc.Proxy.__init__(self)
self._tasks = set()
self._runnable = set()
self._lock = threading.Lock()
self.userdata = {}
#####
#
# API for the tasks
#
#####
def lock(self):
"""Locks the scheduler to make some operations thread-safe."""
self._lock.acquire()
def unlock(self):
"""Unlocks the scheduler."""
self._lock.release()
def add(self, task):
"""Registers a new task to be run."""
self._tasks.add(task)
def done(self, task):
"""Tasks use this function to tell the scheduler they are done."""
self._runnable.discard(task)
self._tasks.discard(task)
def add_runnable(self, task):
"""Moves the task to the set of runnable tasks.
A task calls this function when all its dependencies have been
fulfilled. Note that this does not mean that it can actually be run
right away as it can impose other constraints through its can_run()
method.
"""
self._runnable.add(task)
def remove_runnable(self, task):
"""Removes a task from the set of runnable tasks.
This can either be called by a task when a dependency is added to it,
or by the scheduler itself when a runnable task is scheduled.
"""
self._runnable.discard(task)
#####
#
# Functions to schedule the tasks
#
#####
def sortedRunnable(self):
"""A convenience method for objc. It returns the current
Runnable set sorted by niceness.
This is doubly helpful in objc because it returns a list
rather than a set (which is what runnable is, ordinarily.)
Sets are opaque in objc.
"""
# pylint: disable=C0103
return sorted(self.runnable(), tasks_cmp)
def runnable(self):
"""Iterates over the runnable tasks.
Note this iterator may raise StopIteration even though there are still
tasks to be run. In particular this happens if none of the remaining
tasks are runnable because they are waiting for other tasks to
complete.
"""
seen = set()
while True:
try:
runnable = None
for task in self._runnable:
if task not in seen and task.can_run():
seen.add(task)
runnable = task
break
if runnable:
yield runnable
else:
cxlog.log_("aie", "No more runnable tasks")
raise StopIteration()
except RuntimeError:
# The _runnable set changed, probably because task has just
# become runnable or the caller scheduled the last task we
# returned.
pass
def schedule(self, task):
"""This function MUST be called before starting to run a task. It
tells the task that it is about to be run, removes it from the set of
runnable tasks and returns True.
If it turns out the task cannot be run, then returns False.
"""
self.lock()
try:
if task not in self._runnable or not task.can_run():
cxlog.log_("aie", "%d: %s cannot run" % (task.niceness, cxlog.to_str(task)))
return False
self.remove_runnable(task)
task.schedule()
cxlog.log_("aie", "Scheduled %d: %s" % (task.niceness, cxlog.to_str(task)))
return True
finally:
self.unlock()
schedule_ = schedule
def all_done(self):
"""Returns True if there is no more task to run."""
if self._tasks:
return False
cxlog.log_("aie", "All Done ")
return True
# Wrapper for Obj-C
# An alias doesn't work for subclass overrides
def allDone(self):
return self.all_done()
#####
#
# Some debugging functions
#
#####
def _sort_tasks(self, task, tasks, seen):
# The task graph may contain loops that will be resolved at run time.
# So mark the tasks we've seen to avoid infinite loops.
if task not in seen:
seen.add(task)
for child in sorted(task.dependencies, cmp=tasks_cmp):
self._sort_tasks(child, tasks, seen)
tasks.append(task)
def get_sorted_tasks(self):
"""Returns a list of the tasks roughly sorted in their execution order.
"""
tasks = []
seen = set()
self.lock()
try:
for task in self._tasks:
while True:
if task in seen:
break
for parent in task.parents:
# Try to start with a task which is at the 'top' of
# the graph. Of course there are usually multiple such
# tasks!
task = parent
break
else:
# Note that any task we have not seen yet either
# depends on one we have seen, or is in a completely
# separate graph. In either case it's ok to append it
# after those we have seen.
self._sort_tasks(task, tasks, seen)
return tasks
finally:
self.unlock()
sortedTasks = get_sorted_tasks
#####
#
# Some debugging functions
#
#####
def dump_task(self, prefix, task, dumped):
flags = []
if task.status == TASK_DONE:
flags.append("done")
if task.status == TASK_RUNNING:
flags.append("running")
if task in self._runnable:
flags.append("runnable")
if not task.can_run():
flags.append("cannot-run")
if task in dumped:
flags.append("dumped already")
if flags:
flags_desc = " (" + ", ".join(flags) + ")"
else:
flags_desc = ""
cxlog.log_("aie", prefix + cxlog.to_str(task) + flags_desc)
if task not in dumped:
dumped.add(task)
for dep in task.dependencies:
self.dump_task(prefix + "| ", dep, dumped)
def dump_tasks(self):
cxlog.log_("aie", "+-- Dumping %d tasks" % len(self._tasks))
seen = set()
dumped = set()
for task in self._tasks:
while True:
if task in seen:
break
seen.add(task)
for parent in task.parents:
task = parent
break
else:
self.dump_task("| ", task, dumped)
break
if len(dumped) != len(self._tasks):
cxlog.log_("aie", "| There are some phantom tasks")
cxlog.log_("aie", "+-- %d tasks dumped" % len(dumped))
def check(self, label, dump=False):
buggy = False
for task in self._tasks:
if (not task.dependencies and \
task.status not in _TASK_SCHEDULED and \
task not in self._runnable) or \
((task.dependencies or task.status in _TASK_SCHEDULED) and \
task in self._runnable):
if not buggy:
cxlog.log_("aie", cxlog.to_str(label) + ": Found AIEScheduler bugs")
if task in self._runnable:
runnable = "in "
else:
runnable = ""
cxlog.err("%s (status=%s, deps=%d) is %srunnable" % (cxlog.to_str(task), STATUS_STR[task.status], len(task.dependencies), runnable))
buggy = True
if buggy or dump:
self.dump_tasks()