Repository URL to install this package:
|
Version:
2.0.0rc1 ▾
|
"""This file implements a threaded stream controller to return logs back from
the ray clientserver.
"""
import sys
import logging
import queue
import threading
import time
import grpc
from typing import TYPE_CHECKING
import ray.core.generated.ray_client_pb2 as ray_client_pb2
import ray.core.generated.ray_client_pb2_grpc as ray_client_pb2_grpc
from ray.util.debug import log_once
if TYPE_CHECKING:
from ray.util.client.worker import Worker
logger = logging.getLogger(__name__)
# TODO(barakmich): Running a logger in a logger causes loopback.
# The client logger need its own root -- possibly this one.
# For the moment, let's just not propogate beyond this point.
logger.propagate = False
class LogstreamClient:
def __init__(self, client_worker: "Worker", metadata: list):
"""Initializes a thread-safe log stream over a Ray Client gRPC channel.
Args:
client_worker: The Ray Client worker that manages this client
metadata: metadata to pass to gRPC requests
"""
self.client_worker = client_worker
self._metadata = metadata
self.request_queue = queue.Queue()
self.log_thread = self._start_logthread()
self.log_thread.start()
self.last_req = None
def _start_logthread(self) -> threading.Thread:
return threading.Thread(target=self._log_main, args=(), daemon=True)
def _log_main(self) -> None:
reconnecting = False
while not self.client_worker._in_shutdown:
if reconnecting:
# Refresh queue and retry last request
self.request_queue = queue.Queue()
if self.last_req:
self.request_queue.put(self.last_req)
stub = ray_client_pb2_grpc.RayletLogStreamerStub(self.client_worker.channel)
try:
log_stream = stub.Logstream(
iter(self.request_queue.get, None), metadata=self._metadata
)
except ValueError:
# Trying to use the stub on a cancelled channel will raise
# ValueError. This should only happen when the data client
# is attempting to reset the connection -- sleep and try
# again.
time.sleep(0.5)
continue
try:
for record in log_stream:
if record.level < 0:
self.stdstream(level=record.level, msg=record.msg)
self.log(level=record.level, msg=record.msg)
return
except grpc.RpcError as e:
reconnecting = self._process_rpc_error(e)
if not reconnecting:
return
def _process_rpc_error(self, e: grpc.RpcError) -> bool:
"""
Processes RPC errors that occur while reading from data stream.
Returns True if the error can be recovered from, False otherwise.
"""
if self.client_worker._can_reconnect(e):
if log_once("lost_reconnect_logs"):
logger.warning(
"Log channel is reconnecting. Logs produced while "
"the connection was down can be found on the head "
"node of the cluster in "
"`ray_client_server_[port].out`"
)
logger.debug("Log channel dropped, retrying.")
time.sleep(0.5)
return True
logger.debug("Shutting down log channel.")
if not self.client_worker._in_shutdown:
logger.exception("Unexpected exception:")
return False
def log(self, level: int, msg: str):
"""Log the message from the log stream.
By default, calls logger.log but this can be overridden.
Args:
level: The loglevel of the received log message
msg: The content of the message
"""
logger.log(level=level, msg=msg)
def stdstream(self, level: int, msg: str):
"""Log the stdout/stderr entry from the log stream.
By default, calls print but this can be overridden.
Args:
level: The loglevel of the received log message
msg: The content of the message
"""
print_file = sys.stderr if level == -2 else sys.stdout
print(msg, file=print_file, end="")
def set_logstream_level(self, level: int):
logger.setLevel(level)
req = ray_client_pb2.LogSettingsRequest()
req.enabled = True
req.loglevel = level
self.request_queue.put(req)
self.last_req = req
def close(self) -> None:
self.request_queue.put(None)
if self.log_thread is not None:
self.log_thread.join()
def disable_logs(self) -> None:
req = ray_client_pb2.LogSettingsRequest()
req.enabled = False
self.request_queue.put(req)
self.last_req = req