Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
flockwave-server / server / model / devices.py
Size: Mime:
"""Device and channel-related model classes."""

from __future__ import annotations

from blinker import Signal
from builtins import str
from collections import defaultdict
from dataclasses import dataclass
from enum import Enum
from itertools import islice
from typing import (
    cast,
    overload,
    Any,
    Counter,
    Generic,
    Iterable,
    Optional,
    Type,
    TypeVar,
    TYPE_CHECKING,
    Union,
)

from flockwave.spec.schema import get_complex_object_schema

from .client import Client
from .errors import ClientNotSubscribedError, NoSuchPathError
from .metamagic import ModelMeta
from .object import ModelObject

if TYPE_CHECKING:
    from flockwave.server.registries.clients import ClientRegistry
    from flockwave.server.message_hub import MessageHub

__all__ = (
    "ChannelNode",
    "ChannelOperation",
    "ChannelType",
    "DeviceClass",
    "DeviceTree",
    "DeviceTreePath",
    "DeviceNode",
    "DeviceTreeNodeType",
    "ObjectNode",
    "DeviceTreeSubscriptionManager",
)


class ChannelOperation(Enum):
    READ = "read"
    WRITE = "write"


_channel_type_mapping: dict[Type, str] = {
    int: "number",
    float: "number",
    str: "string",
    bool: "boolean",
    object: "object",
}


class ChannelType(Enum):
    AUDIO = "audio"
    BOOLEAN = "boolean"
    BYTES = "bytes"
    CARTESIAN = "cartesian"
    COLOR = "color"
    DURATION = "duration"
    GPS = "gps"
    NUMBER = "number"
    OBJECT = "object"
    STRING = "string"
    TIME = "time"
    VIDEO = "video"

    @classmethod
    def from_object(cls, obj: Union["ChannelType", Type]):
        """Converts a Python type object to a corresponding channel type
        object. Also accepts ChannelType objects as input, in which case
        the object is returned as is.

        Parameters:
            obj: the type object to convert to a ChannelType

        Returns:
            ChannelType: the appropriate channel type corresponding to the
                Python type
        """
        if isinstance(obj, ChannelType):
            return obj
        else:
            try:
                name = _channel_type_mapping[obj]
            except KeyError:
                raise TypeError(
                    f"{obj!r} cannot be converted to a ChannelType"
                ) from None
            return cls(name)


class DeviceClass(Enum):
    ACCELEROMETER = "accelerometer"
    ACTUATOR = "actuator"
    ALTIMETER = "altimeter"
    BATTERY = "battery"
    CAMERA = "camera"
    CPU = "cpu"
    CPU_CORE = "cpuCore"
    GPS = "gps"
    GROUP = "group"
    GYROSCOPE = "gyroscope"
    LED = "led"
    MAGNETOMETER = "magnetometer"
    MICROPHONE = "microphone"
    MISC = "misc"
    PYRO = "pyro"
    RADIO = "radio"
    RANGEFINDER = "rangefinder"
    RC = "rc"
    ROTOR = "rotor"
    SENSOR = "sensor"
    SPEAKER = "speaker"
    SPRAYER = "sprayer"


class DeviceTreeNodeType(Enum):
    ROOT = "root"
    OBJECT = "object"
    DEVICE = "device"
    CHANNEL = "channel"


C = TypeVar("C", bound="DeviceTreeNodeBase")


class DeviceTreeNodeBase(metaclass=ModelMeta):
    """Class representing a single node in a Flockwave device tree."""

    class __meta__:
        schema = get_complex_object_schema("deviceTreeNode")

    children: dict[str, "DeviceTreeNodeBase"]

    _subscribers: Optional[Counter[Client]]
    """Mapping that maps clients to the number of times they are subscribed to
    this node in the tree. Created lazily; ``None`` means that the mapping was
    not created yet.
    """

    _parent: Optional["DeviceTreeNodeBase"]
    """The parent of this node; ``None`` if this node is the root of the device
    tree.
    """

    _path: Optional[str]
    """The path string of this node, listing the names of all the nodes leading
    to this node from the root. Calculated lazily; ``None`` means that the
    path string was not calculated yet.
    """

    def __init__(self):
        """Constructor."""
        self._subscribers = None
        self._parent = None
        self._path = None

    def collect_channel_values(self) -> dict[str, Any]:
        """Creates a Python dictionary that maps the IDs of the children of
        this node as follows:

          - channel nodes (i.e. instances of ChannelNode_) will be mapped
            to their current values

          - every other node ``node`` will be mapped to the result of
            ``node.collect_channel_values()``, recursively

        Returns:
            a Python dictionary constructed as described above
        """
        return {
            key: child.collect_channel_values() for key, child in self.children.items()
        }

    def count_subscriptions_of(self, client: Client) -> int:
        """Count how many times the given client is subscribed to changes
        in channel values of this node or any of its sub-nodes.

        Parameters:
            client: the client to test

        Returns:
            the number of times time given client is subscribed to this node
        """
        return self._subscribers[client] if self._subscribers else 0

    @property
    def has_subscribers(self) -> bool:
        """Returns whether this node has at least one subscriber."""
        return bool(self._subscribers)

    def iterchildren(self) -> Iterable[tuple[str, "DeviceTreeNodeBase"]]:
        """Iterates over the children of this node.

        Yields:
            the ID of the child node and the child node itself, for all children
        """
        if hasattr(self, "children"):
            return self.children.items()
        else:
            return iter(())

    def iterparents(self, include_self: bool = False) -> Iterable["DeviceTreeNodeBase"]:
        """Iterates over the parents of this node, in increasing distance
        from the node itself.

        Parameters:
            include_self: whether to yield the node itself in the result

        Yields:
            the parents of this node, in increasing distance from the node itself
        """
        node = self if include_self else self._parent
        while node is not None:
            yield node
            node = node._parent

    def itersubscribers(self) -> Iterable[Client]:
        """Iterates over the subscribers registered at this node,
        reporting each subscriber only once even if it is registered
        multiple times.
        """
        if self._subscribers is not None:
            return self._subscribers.keys()
        else:
            return iter(())

    @property
    def parent(self) -> Optional["DeviceTreeNodeBase"]:
        """Returns the parent node of this node."""
        return self._parent

    @property
    def path(self) -> str:
        """Returns the path that leads to this node from the root node.

        This property is cached and invalidated automatically if the
        parent of this node or any of its parents change.

        Returns:
            a string representation of the path leading to this node from the
            root node. Resolving this path from the tree root should result in
            exactly this node.
        """
        if self._path is None:
            self._path = self._validate_path()
        return self._path

    def _validate_path(self) -> str:
        """Calculates the path string of this node."""
        node = self
        result = []
        while node is not None:
            parent = node._parent
            if parent is not None:
                for child_id, child in parent.iterchildren():
                    if child is node:
                        result.append(child_id)
                        break
                else:
                    raise ValueError(
                        "inconsistent tree: node not found "
                        "among the children of its parent"
                    )
            node = parent
        result.append("")
        result.reverse()
        return "/".join(result)

    @overload
    def traverse_dfs(
        self, own_id: None = None
    ) -> Iterable[tuple[Optional[str], "DeviceTreeNodeBase"]]: ...

    @overload
    def traverse_dfs(
        self, own_id: str
    ) -> Iterable[tuple[str, "DeviceTreeNodeBase"]]: ...

    def traverse_dfs(
        self, own_id: Optional[str] = None
    ) -> Iterable[tuple[Optional[str], "DeviceTreeNodeBase"]]:
        """Returns a generator that yields all the nodes in the subtree of
        this node, including the node itself, in depth-first order.

        Parameters:
            own_id: the ID of this node in its parent, if known. This will be
                yielded in the traversal results for the node itself.

        Yields:
            each node in the subtree of this node, including the node itself,
            and its associated ID in its parent, in depth-first order. The ID
            will be the value of the ``own_id`` parameter for this node.
        """
        queue = [(own_id, self)]
        while queue:
            id, node = queue.pop()
            yield id, node
            queue.extend(node.iterchildren())

    @property
    def tree(self) -> Optional[DeviceTree]:
        """Returns the tree that this node is a part of."""
        node = self
        while node is not None and not isinstance(node, RootNode):
            node = node._parent
        return node if node is None else node.tree

    def _add_child(self, id: str, node: C) -> C:
        """Adds the given node as a child node to this node.

        Parameters:
            id: the ID of the node
            node: the node to add

        Returns:
            the node that was added

        Throws:
            ValueError: if another node with the same ID already exists for
                this node
        """
        if node._parent is not None:
            node._parent._remove_child(node)

        if not hasattr(self, "children"):
            self.children = {}
        if id in self.children:
            raise ValueError(
                "another child node already exists with " "ID={0!r}".format(id)
            )
        self.children[id] = node

        node._parent = self
        node._path = None

        return node

    def _dispose(self) -> None:
        """Marks the node as not being used any more and detaches it from
        its tree.
        """
        self._parent = None
        self._path = None

        if hasattr(self, "children"):
            for child in self.children.values():
                child._dispose()
            self.children = {}

    def _remove_child(self, node: C) -> C:
        """Removes the given child node from this node.

        Parameters:
            node: the node to remove

        Returns:
            the node that was removed

        Throws:
            ValueError: if the node is not a child of this node
        """
        for id, child_node in self.iterchildren():
            if child_node is node:
                return cast(C, self._remove_child_by_id(id))
        raise ValueError("the given node is not a child of this node")

    def _remove_child_by_id(self, id: str) -> "DeviceTreeNodeBase":
        """Removes the child node with the given ID from this node.

        Parameters:
            id: the ID of the node to remove

        Returns:
            the node that was removed

        Throws:
            ValueError: if there is no such child with the given ID
        """
        try:
            node = self.children.pop(id)
        except KeyError:
            raise ValueError(f"no child exists with the given ID: {id!r}") from None
        node._parent = None
        node._path = None
        return node

    def _subscribe(self, client: Client) -> None:
        """Subscribes the given client object to this node and its subtree.
        The client will get notified whenever one of the channels in the
        subtree of this node (or in the node itself if the node is a channel
        node) receives a new value.

        A client may be subscribed to this node multiple times; the node
        will track how many times the client has subscribed to the node and
        the client must unsubscribe exactly the same number of times to stop
        receiving notifications.

        Parameters:
            client (Client): the client to notify when a channel value
                changes in the subtree of this node.
        """
        if self._subscribers is None:
            # Create the subscriber counter lazily because most nodes
            # will not have any subscribers
            self._subscribers = Counter()
        self._subscribers[client] += 1

    def _unsubscribe(self, client: Client, force: bool = False) -> None:
        """Unsubscribes the given client object from this node and its
        subtree.

        Parameters:
            client (Client): the client to unsubscribe
            force (bool): whether to force an unsubscription of the client
                even if it is subscribed multiple times. Setting this
                argument to ``True`` will suppress ClientNotSubscribedError_
                exceptions if the client is not subscribed.

        Throws:
            KeyError: if the client is not subscribed to this node and
                ``force`` is ``False``
        """
        if self.count_subscriptions_of(client) > 0:
            assert self._subscribers is not None

            if force:
                del self._subscribers[client]
            else:
                new_count = self._subscribers[client] - 1
                if new_count == 0:
                    del self._subscribers[client]
                else:
                    self._subscribers[client] = new_count

        elif not force:
            raise KeyError(client)


T = TypeVar("T")


class ChannelNode(DeviceTreeNodeBase, Generic[T]):
    """Class representing a device node in a Flockwave device tree."""

    value: T
    """The value of the channel. Modifying this property will modify the value but
    _not_ notify any interested parties that the channel value was modified. Use
    the context manager returned by the ``create_mutator()`` method of the
    device tree instead if you want to notify interested parties about your
    modifications.
    """

    def __init__(
        self,
        channel_type: ChannelType,
        initial_value: T,
        *,
        operations: Optional[Iterable[ChannelOperation]] = None,
        unit: Optional[str] = None,
    ):
        """Constructor.

        Parameters:
            channel_type: the type of the channel
            operations: the allowed operations of the channel. Defaults to
                ``[ChannelOperation.READ]`` if set to ``None``.
            unit: the unit in which the value of the channel is expressed.
        """
        super().__init__()

        if operations is None:
            operations = [ChannelOperation.READ]

        self.type = DeviceTreeNodeType.CHANNEL
        self.subtype = channel_type
        self.operations = list(operations)
        self.unit = unit
        self.value = initial_value

    def collect_channel_values(self) -> T:
        """Returns the value of the channel itself."""
        return self.value

    @property
    def subtype(self):
        """Alias to ``subType``."""
        return self.subType

    @subtype.setter
    def subtype(self, value):
        self.subType = value


class DeviceNode(DeviceTreeNodeBase):
    """Class representing a device node in a Flockwave device tree."""

    def __init__(self, device_class: DeviceClass = DeviceClass.MISC):
        """Constructor."""
        super().__init__()
        self.type = DeviceTreeNodeType.DEVICE
        self.device_class = device_class

    def add_channel(
        self,
        id: str,
        type: Union[Type, ChannelType],
        *,
        initial_value: Any = None,
        unit: Optional[str] = None,
    ) -> ChannelNode:
        """Adds a new channel with the given identifier to this device
        node.

        Parameters:
            id: the identifier of the channel being added.
            type: the type of the channel
            unit: the unit in which the values of the channel are expressed

        Returns:
            the channel node that was added.
        """
        if (
            initial_value is None
            and isinstance(type, __builtins__["type"])
            and type != object
        ):
            initial_value = type()  # type: ignore

        channel_type = ChannelType.from_object(type)
        node = ChannelNode(channel_type, initial_value, unit=unit)
        return self._add_child(id, node)

    def add_device(
        self, id: str, device_class: DeviceClass = DeviceClass.MISC
    ) -> DeviceNode:
        """Adds a new device with the given identifier as a sub-device
        to this device node.

        Parameters:
            id: the identifier of the device being added.
            device_class: the class of the device being added

        Returns:
            the device tree node that was added.
        """
        return self._add_child(id, DeviceNode(device_class))

    @property
    def device_class(self):
        """Alias to ``deviceClass``."""
        return self.deviceClass

    @device_class.setter
    def device_class(self, value):
        self.deviceClass = value


class RootNode(DeviceTreeNodeBase):
    """Class representing the root node in a Flockwave device tree."""

    def __init__(self, tree: DeviceTree):
        """Constructor.

        Parameters:
            tree: the tree that this node will be placed in
        """
        super().__init__()
        self._tree = tree
        self.type = DeviceTreeNodeType.ROOT

    def add_child(self, id: str, node: ObjectNode) -> ObjectNode:
        """Adds a new child node with the given ID to this root node.

        Parameters:
            id: the ID of the node to add
            node: the node to add; root nodes may only have object nodes as
                children.

        Returns:
            the node that was added

        Throws:
            ValueError: if another node with the same ID already exists for
                the root node
        """
        return self._add_child(id, node)

    def remove_child(self, node: ObjectNode) -> ObjectNode:
        """Removes the given child node from the root node.

        Parameters:
            node: the node to remove

        Returns:
            the node that was removed
        """
        return self._remove_child(node)

    def remove_child_by_id(self, id: str) -> ObjectNode:
        """Removes the child node with the given ID from the root node.

        Parameters:
            id: the ID of the child node to remove

        Returns:
            the node that was removed
        """
        return cast(ObjectNode, self._remove_child_by_id(id))

    @DeviceTreeNodeBase.tree.getter
    def tree(self):
        return self._tree

    def _dispose(self):
        super(RootNode, self)._dispose()
        self._tree = None


class ObjectNode(DeviceTreeNodeBase):
    """Class representing an object node in a Flockwave device tree."""

    def __init__(self):
        """Constructor."""
        super().__init__()
        self.type = DeviceTreeNodeType.OBJECT

    def add_device(
        self, id: str, device_class: DeviceClass = DeviceClass.MISC
    ) -> DeviceNode:
        """Adds a new device with the given identifier to this node.

        Parameters:
            id: the identifier of the device being added.
            device_class: the class of the device being added.

        Returns:
            the device tree node that was added.
        """
        return self._add_child(id, DeviceNode(device_class))


@dataclass
class DeviceTreePath:
    """A path in a device tree from its root to one of its nodes. Leaf and
    branch nodes are both allowed.

    Device tree paths have a natural string representation that looks like
    standard filesystem paths: ``/node1/node2/node3/.../leaf``. This class
    allows you to construct a device tree path from a string. When a device
    tree path is printed as a string, it will also be formatted in this
    style.
    """

    def __init__(self, path: Union[str, "DeviceTreePath"] = "/"):
        """Constructor.

        Parameters:
            path: the string representation of the path, or another path object
                to clone.
        """
        if isinstance(path, DeviceTreePath):
            self._parts = list(path._parts)
        else:
            self.path = path

    def iterparts(self) -> Iterable[str]:
        """Returns a generator that iterates over the parts of the path.

        Yields:
            the parts of the path
        """
        return islice(self._parts, 1, None)

    @property
    def path(self) -> str:
        """The path, formatted as a string.

        Returns:
            the path, formatted as a string
        """
        return "/".join(self._parts)

    @path.setter
    def path(self, value: str) -> None:
        parts = value.split("/")
        if parts[0] != "":
            raise ValueError("path must start with a slash")
        if parts[-1] == "":
            parts.pop()
        try:
            parts.index("", 1)
        except ValueError:
            # This is okay, this is what we wanted
            pass
        else:
            raise ValueError("path must not contain an empty component")
        self._parts = parts

    def __str__(self) -> str:
        return self.path


class DeviceTree:
    """A device tree of an object that lists the devices and channels that
    the object provides.
    """

    channel_nodes_updated = Signal()
    """Signal that is dispatched by the tree when the _values_ of some of the
    channel nodes in the tree have been updated. Updates are detected only if
    they are made in the context of a DeviceTreeMutator_.
    """

    structure_changed = Signal()
    """Signal that is dispatched by the tree when the _structure_ of the tree
    changed.

    Right now we assume that the device trees of UAVs do not change, so we
    dispatch this signal only when a new UAV is added to the tree or when a
    UAV is removed.
    """

    def __init__(self):
        """Constructor. Creates an empty device tree."""
        self._root = RootNode(self)
        self._object_registry = None

    def create_mutator(self) -> "DeviceTreeMutator":
        """Creates a mutator object that provides additional methods to
        modify the values of the channels in the device tree and also notify
        subscribers about all modifications.

        The mutator object returned by this function should be used as a
        context manager as follows::

            with tree.create_mutator() as mutator:
                mutator.update(some_channel, new_value)
        """
        return DeviceTreeMutator(self, self._on_channel_nodes_updated)

    def dispose(self) -> None:
        """Disposes of this tree when it is not needed any more. No further
        operations should be performed on a tree after you have called
        ``dispose()`` on it.
        """
        self.root._dispose()

    @property
    def json(self):
        """The JSON representation of the device tree."""
        return self._root.json  # type: ignore

    @property
    def root(self):
        """The root node of the device tree."""
        return self._root

    def resolve(self, path: Union[str, DeviceTreePath]) -> DeviceTreeNodeBase:
        """Resolves the given path in the tree and returns the node that
        corresponds to the given path.

        Parameters:
            path: the path to resolve. Strings will be converted to a
                DeviceTreePath_ automatically.

        Returns:
            the node at the given path in the tree

        Throws:
            NoSuchPathError: if the given path cannot be resolved in the tree
        """
        if not isinstance(path, DeviceTreePath):
            path = DeviceTreePath(path)

        node = self.root
        for part in path.iterparts():
            try:
                node = node.children[part]
            except (KeyError, AttributeError):
                raise NoSuchPathError(path) from None

        return node

    def traverse_dfs(self) -> Iterable[tuple[Optional[str], DeviceTreeNodeBase]]:
        """Returns a generator that yields all the nodes in the tree in
        depth-first order.

        Yields:
            (str, DeviceTreeNode): each node in the tree and its associated
                ID in its parent, in depth-first order. The ID will be
                ``None`` for the root node.
        """
        return self.root.traverse_dfs()

    @property
    def object_registry(self):
        """The object registry that the device tree watches. The device tree
        will attach new object nodes when a new object is added to the registry,
        and similarly detach old object nodes when objects are removed from the
        registry.
        """
        return self._object_registry

    @object_registry.setter
    def object_registry(self, value):
        if self._object_registry == value:
            return

        if self._object_registry is not None:
            self._object_registry.added.disconnect(
                self._on_object_added, sender=self._object_registry
            )
            self._object_registry.removed.disconnect(
                self._on_object_removed, sender=self._object_registry
            )

        self._object_registry = value

        if self._object_registry is not None:
            self._object_registry.added.connect(
                self._on_object_added, sender=self._object_registry
            )
            self._object_registry.removed.connect(
                self._on_object_removed, sender=self._object_registry
            )

    def _on_channel_nodes_updated(self, nodes):
        """Callback method that a DeviceTreeMutator_ will call when a
        mutation session has ended and some channel nodes were updated.

        Parameters:
            nodes (Iterable[ChannelNode]): the set of channel nodes that
                were updated. It might not be a Python set but it is
                guaranteed to contain each affected node at most once.
        """
        # Just redispatch the set in a channel_nodes_updated signal
        self.channel_nodes_updated.send(self, nodes=nodes)

    def _on_object_added(self, sender, object: ModelObject):
        """Handler called when a new object is registered in the server.

        Parameters:
            sender: the object registry
            object: the object that was added
        """
        node = object.device_tree_node
        if node:
            self.root.add_child(object.id, node)
            self.structure_changed.send(self)

    def _on_object_removed(self, sender, object: ModelObject):
        """Handler called when an object is deregistered from the server.

        Parameters:
            sender: the object registry
            object: the object that was removed
        """
        if object.id in getattr(self.root, "children", ()):
            self.root.remove_child_by_id(object.id)
            self.structure_changed.send(self)


class DeviceTreeMutator:
    """Context manager that provides methods for modifying the values of the
    channel nodes in a device tree, records the modifications and then
    notifies the tree about the set of channel nodes that were modified.

    Attributes:
        tree (DeviceTree): the device tree that the mutator will modify
        callback (callable): an optional callable that the mutator
            will call with the set of updated nodes when the mutator goes
            out of context. The callback will not be called if there were
            no updated nodes.
    """

    def __init__(self, tree, callback):
        self.tree = tree
        self.callback = callback
        self._updated_nodes = set()

    def __enter__(self):
        self._updated_nodes = set()
        return self

    def __exit__(self, *args):
        if self._updated_nodes:
            self.callback(self._updated_nodes)

    def update(self, node, new_value):
        """Updates the value of a channel node at the given path with the
        given new value.

        The new value is compared with the old value using the standard
        Python equality operator. If the two values are equal, the value of
        the channel will _not_ be modified.

        Parameters:
            node (Union[str, DeviceTreePath, ChannelNode]): the path
                of the channel node to modify (either as a string or as a
                DeviceTreePath_), or the channel node itself.
            new_value (object): the new value of the channel
        """
        if not isinstance(node, DeviceTreeNodeBase):
            node = self.tree.resolve(node)

        assert isinstance(node, ChannelNode)

        if node.value == new_value:
            return

        node.value = new_value
        self._updated_nodes.add(node)


class DeviceTreeSubscriptionManager:
    """Object that is responsible for managing the subscriptions of clients
    to the nodes of a device tree and notifying clients when the values of
    the channel nodes change.
    """

    _tree: DeviceTree
    _client_registry: Optional["ClientRegistry"]
    _message_hub: "MessageHub"

    _pending_subscriptions: defaultdict[Client, list[DeviceTreePath]]
    """Dictionary mapping clients to device tree paths that they want to
    subscribe to but the paths do not exist yet.
    """

    def __init__(
        self,
        tree: DeviceTree,
        *,
        client_registry: Optional["ClientRegistry"],
        message_hub: "MessageHub",
    ):
        """Constructor.

        Parameters:
            tree: the tree whose subscriptions this object will manage
            client_registry: the client registry that enables the subscription
                manager to remove subscriptions of clients that have disconnected
        """
        self._tree = tree
        self._tree.channel_nodes_updated.connect(
            self._on_channel_nodes_updated, sender=self._tree
        )
        self._tree.structure_changed.connect(
            self._on_device_tree_structure_changed, sender=self._tree
        )
        self._client_registry = None
        self._message_hub = message_hub
        self._pending_subscriptions = defaultdict(list)

        self.client_registry = client_registry

    @property
    def client_registry(self) -> Optional["ClientRegistry"]:
        """The client registry that the device tree watches. The device tree
        will remove the subscriptions of clients from the tree when a
        client is removed from this registry.
        """
        return self._client_registry

    @client_registry.setter
    def client_registry(self, value: Optional["ClientRegistry"]) -> None:
        if self._client_registry == value:
            return

        if self._client_registry is not None:
            self._client_registry.removed.disconnect(
                self._on_client_removed, sender=self._client_registry
            )

        self._client_registry = value

        if self._client_registry is not None:
            self._client_registry.removed.connect(
                self._on_client_removed, sender=self._client_registry
            )

    @property
    def message_hub(self) -> "MessageHub":
        """The message hub that the subscription manager can use to inform
        subscribers about changes in the values of channel nodes
        """
        return self._message_hub

    def _collect_subscriptions(
        self,
        client: Client,
        path: DeviceTreePath,
        node: DeviceTreeNodeBase,
        result: Counter,
    ) -> None:
        """Finds all the subscriptions of the given client in the subtree
        of the given tree node (including the node itself) and adds tem to
        the given result object.

        Parameters:
            client: the client whose subscriptions we want to collect
            path: the path that leads to the root node. It will be mutated so
                make sure that you clone the original path of the node before
                passing it here.
            node (DeviceTreeNode): the root node that the search starts
                from
            result: the counter object that counts the subscriptions
        """
        count = node.count_subscriptions_of(client)
        if count > 0:
            result[str(path)] += count
        for child_id, child in node.iterchildren():
            path._parts.append(child_id)
            self._collect_subscriptions(client, path, child, result)
            path._parts.pop()

    def _find_device_tree_node_by_path(
        self, path: Union[str, DeviceTreePath], response=None
    ) -> Optional[DeviceTreeNodeBase]:
        """Finds a node in the global device tree based on a device tree
        path or registers a failure in the given response object if there
        is no such entry in the registry.

        Parameters:
            path: the path to find
            response (Optional[FlockwaveResponse]): the response in which
                the failure can be registered

        Returns:
            the device tree node at the given path or ``None`` if there is no
            such path
        """
        try:
            return self._tree.resolve(path)
        except NoSuchPathError:
            if hasattr(response, "add_error"):
                response.add_error(path, "No such device tree path")  # type: ignore
            return None

    def _notify_subscriber(self, subscriber: Client, channel_values: Any) -> None:
        """Notifies a single subscriber about the change in channel values
        in the subtree of a node that the subscriber is subscribed to.

        Parameters:
            subscriber: the client that has to be notified
            channel_values: object containing the channel values
                of all the channels in the subtree of the node, organized
                in exactly the same way as the tree itself is organized
                under the node
        """
        body = {"values": channel_values, "type": "DEV-INF"}
        message = self._message_hub.create_notification(body)

        self._message_hub.enqueue_message(message, to=subscriber)

    def _on_client_removed(self, sender: "ClientRegistry", client: Client) -> None:
        """Handler called when a client disconnected from the server."""
        for _, node in self._tree.traverse_dfs():
            node._unsubscribe(client, force=True)
        self._pending_subscriptions.pop(client, None)

    def _on_channel_nodes_updated(self, sender: DeviceTree, nodes):
        """Handler called when some channel nodes were updated in the
        associated device tree.
        """
        # For each node that was updated during this session, we have to
        # walk up the parent chain and collect all the parents.
        visited_nodes = set()
        for node in nodes:
            visited_nodes.update(node.iterparents(include_self=True))

        # Now, we need to construct the messages to be sent, for each
        # subscriber that was affected. Different subscribers may get
        # different messages so we need to build a message for each
        # subscriber.
        messages_by_subscribers = defaultdict(dict)
        for node in visited_nodes:
            if node.has_subscribers:
                path = node.path
                channel_values = node.collect_channel_values()
                for subscriber in node.itersubscribers():
                    messages_by_subscribers[subscriber][path] = channel_values

        # Now we can send the messages
        for subscriber, message in messages_by_subscribers.items():
            self._notify_subscriber(subscriber, message)

    def _on_device_tree_structure_changed(self, sender: DeviceTree):
        found: list[DeviceTreePath] = []

        for client, paths in self._pending_subscriptions.items():
            found.clear()
            for path in paths:
                try:
                    node = sender.resolve(path)
                except NoSuchPathError:
                    pass
                else:
                    found.append(path)
                    node._subscribe(client)

            if found:
                for path in found:
                    paths.remove(path)

    def create_DEV_INF_message_for(self, paths: Iterable[str], in_response_to=None):
        """Creates a DEV-INF message that contains information regarding
        the current values of the channels in the subtrees of the device
        tree matched by the given device tree paths.

        Parameters:
            paths: list of device tree paths
            in_response_to (Optional[FlockwaveMessage]): the message that the
                constructed message will respond to. ``None`` means that the
                constructed message will be a notification.

        Returns:
            FlockwaveMessage: the DEV-INF message with the current values of
                the channels in the subtrees matched by the given device
                tree paths
        """
        values = {}

        body = {"values": values, "type": "DEV-INF"}
        response = self._message_hub.create_response_or_notification(
            body=body, in_response_to=in_response_to
        )

        for path in paths:
            node = self._find_device_tree_node_by_path(path, response)
            if node:
                values[path] = node.collect_channel_values()

        return response

    def list_subscriptions(self, client, path_filter):
        """Lists all the device tree paths that a client is subscribed
        to.

        Parameters:
            client (Client): the client whose subscriptions we want to
                retrieve
            path_filter (Optional[iterable]): iterable that yields strings
                or DeviceTreePath_ objects. The result will include only
                those subscriptions that are contained in at least one of
                the subtrees matched by the path filters.

        Returns:
            Counter: a counter object mapping device tree paths to the
                number of times the client has subscribed to them,
                multiplied by the number of times they were matched by
                the path filter.
        """
        if path_filter is None:
            path_filter = ("/",)

        result = Counter()
        for path in path_filter:
            node = self._tree.resolve(path)
            path_clone = DeviceTreePath(path)
            self._collect_subscriptions(client, path_clone, node, result)

        return result

    def subscribe(
        self, client: Client, path: Union[str, DeviceTreePath], lazy: bool = False
    ) -> None:
        """Subscribes the given client to the given device tree path.

        The same client may be subscribed to the same node multiple times;
        the same amount of unsubscription requests must follow to ensure
        that the client stops receiving notifications.

        Parameters:
            path: the path to resolve. Strings will be converted to a
                DeviceTreePath_ automatically.
            client: the client to subscribe
            lazy: whether the client is allowed to subscribe to paths that do
                not exist yet.

        Throws:
            NoSuchPathError: if the given path cannot be resolved in the tree
        """
        try:
            self._tree.resolve(path)._subscribe(client)
        except NoSuchPathError:
            if lazy:
                self._pending_subscriptions[client].append(DeviceTreePath(path))
            else:
                raise

    def unsubscribe(
        self, client: Client, path: Union[str, DeviceTreePath], force: bool = False
    ) -> None:
        """Unsubscribes the given client from the given device tree path.

        The same client may be subscribed to the same node multiple times;
        the same amount of unsubscription requests must follow to ensure
        that the client stops receiving notifications. Alternatively, you
        may set the ``force`` argument to ``True`` to force the removal
        of the client from the node no matter how many times it has
        subscribed before.

        Parameters:
            path: the path to resolve. Strings will be converted to a
                DeviceTreePath_ automatically.
            client: the client to unsubscribe
            force: whether to force an unsubscription of the client
                even if it is subscribed multiple times. Setting this
                argument to ``True`` will suppress ClientNotSubscribedError_
                exceptions if the client is not subscribed.

        Throws:
            NoSuchPathError: if the given path cannot be resolved in the tree
            ClientNotSubscribedError: if the given client is not subscribed
                to the node and ``force`` is ``False``
        """
        try:
            self._tree.resolve(path)._unsubscribe(client, force)
        except NoSuchPathError:
            try:
                self._pending_subscriptions[client].remove(DeviceTreePath(path))
            except ValueError:
                raise ClientNotSubscribedError(client, path) from None
        except KeyError:
            raise ClientNotSubscribedError(client, path) from None