Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
supermeter / supermeter / task_manager / thread_task_manager.py
Size: Mime:
# ThreadTaskManager and the hooks it deploys are responsible to let brain know which tasks/threads get opened and
# closed.
# We track this in brain so we can tell when we can share data between spans and when we can't.
#
# Some words about the request context model: the context model is very different between threads and async tasks.
# With threads, it's common to have a worker pool (e.g. flask) where a thread is handling a request and after it
# finishes processing the request it starts processing another request from a queue.
# With async tasks, it's much more common to see that the task that received the request spawns other async tasks to do
# parts of the processing for that request.
#
# The context model is an issue because on one hand we want to share data between related spans so important information
# like "tenant" can be deduced but on the other hand not to mistake and share data between unrelated spans.
#
# In ThreadTaskManager's case, it's responsible to notify brain on new tasks, their relationship to their parent task,
# when tasks complete and also to let the scope manager know what's the current task so it can add a reference to it in
# in a new span when it gets opened.
#
# There's no standard or simple method to identify a "task" since it can be a thread, an async task or whatnot and for
# our purposes we don't really need to know an external (real) identifier like the OS's thread ID - what we need to make
# sure is we have a unique identifier for each task. To achieve that, we either use existing Python objets that
# represent tasks like in asyncio's case, or create synthetic objects when there are no tasks (threads).
# We then use the id() function to get the memory address of an object as a "task ID" - in asyncio's case it's the task
# object and in a thread model we instantiate a synthetic object using thread local storage.
#
# This way we have a unified and unique task identifiers even though the tasks can come from different worlds.
#
# Note that we don't make sure that two "tasks" can't share the same ID - we rely on the fact that two objects won't
# have the same address (i.e. id() result) while both "tasks" are alive, but the same address may get reused if the
# interpreter reuses the address. It's worth noting that in the thraedlocal solution, since the cleanup
# happens in a finalizer, which gets called during gc and not immediately when the thread exits, it actuallly reduces
# the risk that two "task ID objects" will share the same memory address.
#
# TODO: we now have a hierarchy of tasks that we can use to link spans, and once we let brain know the type of each task
# it can decide how to link them:
#
# 1. Thread.
#    This is the first task. New threads will not be linked to their parent thread.
# 2. Async task.
#    Linked to a thread and can spawn other async tasks, which may get linked to it or to a framework task if exists.
# 3. Framework task.
#    This is a request that its processing can take time, async tasks and other requests.
#
# TODO if asgiref is available, maybe we can use its implementation for "thread local"?
#      See here: https://github.com/django/asgiref/blob/main/asgiref/local.py
try:
    from typing import Tuple
except ImportError:
    pass

import threading

from supertenant import consts
from supertenant.supermeter import _get_brain


# The ID and task_holder objects are used to assign a synthetic number for each thread.
class ID(object):
    def __init__(self, task_manager):
        # type: ("ThreadTaskManager") -> None
        self.task_manager = task_manager

    def __del__(self):
        # type: () -> None
        self.task_manager.on_thread_task_done(self.get_id())

    def get_id(self):
        # type: () -> int
        return id(self)


task_holder = threading.local()


class ThreadTaskManager(object):
    # with threads, we don't have a good way to know when new threads get created. We could hook the Thread class and
    # _thread.start_new_thread, but unlike asyncio we can't 100% guarantee we catch all new threads, so we settle for
    # understanding when we discovered a new thread so we can then notify brain.
    # This is why the function returns two results - our synthetic "thread ID" and a boolean marking whether this is the
    # first time we see this thread or not.
    def get_thread_current_task_id(self):
        # type: () -> Tuple[int, bool]
        if getattr(task_holder, "id", None) is None:
            task_holder.id = ID(self)
            return task_holder.id.get_id(), True
        return task_holder.id.get_id(), False

    def get_current_task_id(self, create_missing=True):
        # type: (bool) -> int
        thread_task_id, is_new = self.get_thread_current_task_id()
        if is_new:
            self.on_thread_task_create(thread_task_id)
        return thread_task_id

    def on_thread_task_create(self, id):
        # type: (int) -> None
        brain = _get_brain()
        if id != 0 and brain is not None:
            brain.create_task(0, id, consts.TASK_THREAD)

    def on_thread_task_done(self, id):
        # type: (int) -> None
        # this is really a corner case, but apparently Python can be in the process of shutting down and during this
        # process our thread-local object gets deleted, but _after_ other objects like the function _get_brain.
        if _get_brain is not None:
            brain = _get_brain()
            if id != 0 and brain is not None:
                brain.task_done(id)