Repository URL to install this package:
|
Version:
2.0.0rc1 ▾
|
from typing import List, Set, Tuple
from ray.autoscaler._private import constants
class NodeTracker:
"""Map nodes to their corresponding logs.
We need to be a little careful here. At an given point in time, node_id <->
ip can be interchangeably used, but the node_id -> ip relation is not
bijective _across time_ since IP addresses can be reused. Therefore, we
should treat node_id as the only unique identifier.
"""
def __init__(self):
# Mapping from node_id -> (ip, node type, stdout_path, process runner)
self.node_mapping = {}
# A quick, inefficient FIFO cache implementation.
self.lru_order = []
def _add_node_mapping(self, node_id: str, value: str):
if node_id in self.node_mapping:
return
assert len(self.lru_order) == len(self.node_mapping)
if len(self.lru_order) >= constants.AUTOSCALER_MAX_NODES_TRACKED:
# The LRU eviction case
node_id = self.lru_order.pop(0)
del self.node_mapping[node_id]
self.node_mapping[node_id] = value
self.lru_order.append(node_id)
def track(self, node_id: str, ip: str, node_type: str):
"""
Begin to track a new node.
Args:
node_id: The node id.
ip: The node ip address.
node_type: The node type.
"""
if node_id not in self.node_mapping:
self._add_node_mapping(node_id, (ip, node_type))
def untrack(self, node_id: str):
"""Gracefully stop tracking a node. If a node is intentionally removed from
the cluster, we should stop tracking it so we don't mistakenly mark it
as failed.
Args:
node_id: The node id which failed.
"""
if node_id in self.node_mapping:
self.lru_order.remove(node_id)
del self.node_mapping[node_id]
def get_all_failed_node_info(
self, non_failed_ids: Set[str]
) -> List[Tuple[str, str]]:
"""Get the information about all failed nodes. A failed node is any node which
we began to track that is not pending or alive (i.e. not failed).
Args:
non_failed_ids: Nodes are failed unless they are in this set.
Returns:
List[Tuple[str, str]]: A list of tuples. Each tuple is the ip
address and type of a failed node.
"""
failed_nodes = self.node_mapping.keys() - non_failed_ids
failed_info = []
# Returning the list in order is important for display purposes.
for node_id in filter(lambda node_id: node_id in failed_nodes, self.lru_order):
failed_info.append(self.node_mapping[node_id])
return failed_info