#!/usr/bin/env python3
# Copyright (c) Facebook, Inc. and its affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
import logging
import os
import time
from concurrent.futures._base import Future
from concurrent.futures.thread import ThreadPoolExecutor
from threading import Event
from typing import Dict, List, TextIO
__all__ = ["tail_logfile", "TailLog"]
log = logging.getLogger(__name__)
def tail_logfile(
header: str, file: str, dst: TextIO, finished: Event, interval_sec: float
):
while not os.path.exists(file):
if finished.is_set():
return
time.sleep(interval_sec)
with open(file, "r") as fp:
while True:
line = fp.readline()
if line:
dst.write(f"{header}{line}")
else: # reached EOF
if finished.is_set():
# log line producer is finished
break
else:
# log line producer is still going
# wait for a bit before looping again
time.sleep(interval_sec)
class TailLog:
"""
Tails the given log files. The log files do not have to exist when the
``start()`` method is called. The tail-er will gracefully wait until the
log files are created by the producer and will tail the contents of the
log files until the ``stop()`` method is called.
.. warning:: ``TailLog`` will wait indefinitely for the log file to be created!
Each log file's line will be suffixed with a header of the form: ``[{name}{idx}]:``,
where the ``name`` is user-provided and ``idx`` is the index of the log file
in the ``log_files`` mapping.
Usage:
::
log_files = {0: "/tmp/0_stdout.log", 1: "/tmp/1_stdout.log"}
tailer = TailLog("trainer", log_files, sys.stdout).start()
# actually run the trainers to produce 0_stdout.log and 1_stdout.log
run_trainers()
tailer.stop()
# once run_trainers() start writing the ##_stdout.log files
# the tailer will print to sys.stdout:
# >>> [trainer0]:log_line1
# >>> [trainer1]:log_line1
# >>> [trainer0]:log_line2
# >>> [trainer0]:log_line3
# >>> [trainer1]:log_line2
.. note:: Due to buffering log lines between files may not necessarily
be printed out in order. You should configure your application's
logger to suffix each log line with a proper timestamp.
"""
def __init__(
self,
name: str,
log_files: Dict[int, str],
dst: TextIO,
interval_sec: float = 0.1,
):
n = len(log_files)
self._threadpool = None
if n > 0:
self._threadpool = ThreadPoolExecutor(
max_workers=n,
thread_name_prefix=f"{self.__class__.__qualname__}_{name}",
)
self._name = name
self._dst = dst
self._log_files = log_files
self._finished_events: Dict[int, Event] = {
local_rank: Event() for local_rank in log_files.keys()
}
self._futs: List[Future] = []
self._interval_sec = interval_sec
self._stopped = False
def start(self) -> "TailLog":
if not self._threadpool:
return self
for local_rank, file in self._log_files.items():
self._futs.append(
self._threadpool.submit(
tail_logfile,
header=f"[{self._name}{local_rank}]:",
file=file,
dst=self._dst,
finished=self._finished_events[local_rank],
interval_sec=self._interval_sec,
)
)
return self
def stop(self) -> None:
for finished in self._finished_events.values():
finished.set()
for local_rank, f in enumerate(self._futs):
try:
f.result()
except Exception as e:
log.error(
f"error in log tailor for {self._name}{local_rank}."
f" {e.__class__.__qualname__}: {e}",
)
if self._threadpool:
self._threadpool.shutdown(wait=True)
self._stopped = True
def stopped(self) -> bool:
return self._stopped