Repository URL to install this package:
|
Version:
0.8.1 ▾
|
# ThreadTaskManager and the hooks it deploys are responsible to let brain know which tasks/threads get opened and
# closed.
# We track this in brain so we can tell when we can share data between spans and when we can't.
#
# Some words about the request context model: the context model is very different between threads and async tasks.
# With threads, it's common to have a worker pool (e.g. flask) where a thread is handling a request and after it
# finishes processing the request it starts processing another request from a queue.
# With async tasks, it's much more common to see that the task that received the request spawns other async tasks to do
# parts of the processing for that request.
#
# The context model is an issue because on one hand we want to share data between related spans so important information
# like "tenant" can be deduced but on the other hand not to mistake and share data between unrelated spans.
#
# In ThreadTaskManager's case, it's responsible to notify brain on new tasks, their relationship to their parent task,
# when tasks complete and also to let the scope manager know what's the current task so it can add a reference to it in
# in a new span when it gets opened.
#
# There's no standard or simple method to identify a "task" since it can be a thread, an async task or whatnot and for
# our purposes we don't really need to know an external (real) identifier like the OS's thread ID - what we need to make
# sure is we have a unique identifier for each task. To achieve that, we either use existing Python objets that
# represent tasks like in asyncio's case, or create synthetic objects when there are no tasks (threads).
# We then use the id() function to get the memory address of an object as a "task ID" - in asyncio's case it's the task
# object and in a thread model we instantiate a synthetic object using thread local storage.
#
# This way we have a unified and unique task identifiers even though the tasks can come from different worlds.
#
# Note that we don't make sure that two "tasks" can't share the same ID - we rely on the fact that two objects won't
# have the same address (i.e. id() result) while both "tasks" are alive, but the same address may get reused if the
# interpreter reuses the address. It's worth noting that in the thraedlocal solution, since the cleanup
# happens in a finalizer, which gets called during gc and not immediately when the thread exits, it actuallly reduces
# the risk that two "task ID objects" will share the same memory address.
#
# TODO: we now have a hierarchy of tasks that we can use to link spans, and once we let brain know the type of each task
# it can decide how to link them:
#
# 1. Thread.
# This is the first task. New threads will not be linked to their parent thread.
# 2. Async task.
# Linked to a thread and can spawn other async tasks, which may get linked to it or to a framework task if exists.
# 3. Framework task.
# This is a request that its processing can take time, async tasks and other requests.
#
# TODO if asgiref is available, maybe we can use its implementation for "thread local"?
# See here: https://github.com/django/asgiref/blob/main/asgiref/local.py
try:
from typing import Tuple
except ImportError:
pass
import threading
from supertenant import consts
from supertenant.supermeter import _get_brain
# The ID and task_holder objects are used to assign a synthetic number for each thread.
class ID(object):
def __init__(self, task_manager):
# type: ("ThreadTaskManager") -> None
self.task_manager = task_manager
def __del__(self):
# type: () -> None
self.task_manager.on_thread_task_done(self.get_id())
def get_id(self):
# type: () -> int
return id(self)
task_holder = threading.local()
class ThreadTaskManager(object):
# with threads, we don't have a good way to know when new threads get created. We could hook the Thread class and
# _thread.start_new_thread, but unlike asyncio we can't 100% guarantee we catch all new threads, so we settle for
# understanding when we discovered a new thread so we can then notify brain.
# This is why the function returns two results - our synthetic "thread ID" and a boolean marking whether this is the
# first time we see this thread or not.
def get_thread_current_task_id(self):
# type: () -> Tuple[int, bool]
if getattr(task_holder, "id", None) is None:
task_holder.id = ID(self)
return task_holder.id.get_id(), True
return task_holder.id.get_id(), False
def get_current_task_id(self, create_missing=True):
# type: (bool) -> int
thread_task_id, is_new = self.get_thread_current_task_id()
if is_new:
self.on_thread_task_create(thread_task_id)
return thread_task_id
def on_thread_task_create(self, id):
# type: (int) -> None
brain = _get_brain()
if id != 0 and brain is not None:
brain.create_task(0, id, consts.TASK_THREAD)
def on_thread_task_done(self, id):
# type: (int) -> None
# this is really a corner case, but apparently Python can be in the process of shutting down and during this
# process our thread-local object gets deleted, but _after_ other objects like the function _get_brain.
if _get_brain is not None:
brain = _get_brain()
if id != 0 and brain is not None:
brain.task_done(id)