Repository URL to install this package:
Version:
2.4.1 ▾
|
# mypy: disallow-untyped-defs
import functools
import logging
import os
import re
import subprocess
import time
from threading import Lock
from typing import Any, List, Optional, Sequence
logger = logging.getLogger("strobelight_function_profiler")
console_handler = logging.StreamHandler()
formatter = logging.Formatter(
"%(name)s, line %(lineno)d, %(asctime)s, %(levelname)s: %(message)s"
)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
logger.setLevel(logging.INFO)
logger.propagate = False
class StrobelightCLIProfilerError(Exception):
"""
Raised when an error happens during strobelight profiling
"""
def _pid_namespace_link(pid: Optional[int] = None) -> str:
"""Returns the link to the process's namespace, example: pid:[4026531836]"""
PID_NAMESPACE_PATH = "/proc/{}/ns/pid"
pid = pid or os.getpid()
return os.readlink(PID_NAMESPACE_PATH.format(pid))
def _pid_namespace(pid: Optional[int] = None) -> int:
"""Returns the process's namespace id"""
pid = pid or os.getpid()
link = _pid_namespace_link(pid)
return int(link[link.find("[") + 1 : -1])
def _command_to_string(command: Sequence[str]) -> str:
return " ".join(command)
class StrobelightCLIFunctionProfiler:
"""
Note: this is a meta only tool.
StrobelightCLIFunctionProfiler can be used to profile a python function and
generate a strobelight link with the results. It works on meta servers but
does not requries an fbcode target.
When stop_at_error is false(default), error during profiling does not prevent
the work function from running.
Check function_profiler_example.py for an example.
"""
# This lock is used to make sure only one thread is running the profiler at any point.
_lock = Lock()
def __init__(
self,
*,
stop_at_error: bool = False,
max_profile_duration_sec: int = 60 * 10,
sample_each: float = 1e7, # sample each sample_each cycles.
run_user_name: str = "pytorch-strobelight-ondemand",
timeout_wait_for_running_sec: int = 60,
timeout_wait_for_finished_sec: int = 60,
recorded_env_variables: Optional[List[str]] = None,
sample_tags: Optional[List[str]] = None,
stack_max_len: int = 127,
async_stack_max_len: int = 127,
):
self.stop_at_error = stop_at_error
self.max_profile_duration_sec = max_profile_duration_sec
self.sample_each = sample_each
self.run_user_name = run_user_name
self.timeout_wait_for_running_sec = timeout_wait_for_running_sec
self.timeout_wait_for_finished_sec = timeout_wait_for_finished_sec
# Results of the most recent run.
# Tracks the strobelight run id of the most recent run
self.current_run_id: Optional[int] = None
self.sample_tags = sample_tags
def _run_async(self) -> None:
processId = os.getpid()
namespace = _pid_namespace(processId)
command = [
"strobeclient",
"run",
"--profiler",
"pyperf",
"--event",
"cycles",
"--async",
"--sample-interval",
f"{int(self.sample_each)}",
"--duration-ms",
f"{int(self.max_profile_duration_sec * 1000)}",
"--pid",
f"{namespace}:{processId}",
]
if self.sample_tags:
command.append("--sample-tags")
command.append(",".join(self.sample_tags))
logger.debug("running command: %s", _command_to_string(command))
result = subprocess.run(command, capture_output=True)
output = result.stderr.decode("utf-8")
logger.debug("output:\n{%s}", output)
if result.returncode != 0:
raise StrobelightCLIProfilerError(
f"failed to start strobelight profiling, error in run_async:{output}"
)
if match := re.search(r"INFO Run Id: (-?\d+)", output):
self.current_run_id = int(match.group(1))
return
raise StrobelightCLIProfilerError(
f"failed to start strobelight profiling, unexpected result {output}"
)
def _wait_for_running(self, counter: int = 0) -> None:
if counter > 20:
raise StrobelightCLIProfilerError(
"wait_for_running called more than 20 times"
)
command = ["strobeclient", "getRunStatus", "--run-id", f"{self.current_run_id}"]
logger.debug("running command: %s", _command_to_string(command))
result = subprocess.run(command, capture_output=True)
output = result.stderr.decode("utf-8")
logger.debug("output:\n{%s}", output)
if result.returncode != 0:
raise StrobelightCLIProfilerError(
f"failed to start strobelight profiling, error in wait_for_running:{output}"
)
if match := re.search("Profile run status: (.*)", output):
current_status = match.group(1)
if current_status == "RUNNING":
return
elif current_status == "PREPARING":
time.sleep(10)
self._wait_for_running(counter + 1)
return
else:
raise StrobelightCLIProfilerError(f"unexpected {current_status} phase")
raise StrobelightCLIProfilerError(f"unexpected output\n: {output} ")
def _stop_run(self) -> None:
command = ["strobeclient", "stopRun", "--run-id", str(self.current_run_id)]
logger.debug("running command: %s", _command_to_string(command))
result = subprocess.run(command, capture_output=True)
output = result.stderr.decode("utf-8")
logger.debug("output:\n{%s}", output)
if result.returncode != 0:
raise StrobelightCLIProfilerError(
f"failed to stop strobelight profiling, return code is not 0 :{output}"
)
if match := re.search("INFO ::1:(.*)", output):
current_status = match.group(1)
if current_status.__contains__("Success!"):
return
else:
raise StrobelightCLIProfilerError(
f"failed to stop strobelight profiling, got {current_status} result"
)
raise StrobelightCLIProfilerError(f"unexpected output\n: {output} ")
def _get_results(self) -> None:
command = ["strobeclient", "getRunStatus", "--run-id", str(self.current_run_id)]
logger.debug("running command: %s", _command_to_string(command))
result = subprocess.run(command, capture_output=True)
output = result.stderr.decode("utf-8")
logger.debug("output:\n{%s}", output)
if result.returncode != 0:
raise StrobelightCLIProfilerError(
f"failed to extract profiling results, return code is not 0 : {output}"
)
if match := re.search("INFO ::1:(.*)", output):
current_status = match.group(1)
if current_status.__contains__("Profile run status: PROCESSING"):
time.sleep(10)
self._get_results()
return
elif not current_status.__contains__("Profile run finished with SUCCESS"):
raise StrobelightCLIProfilerError(
f"failed to extract profiling results, unexpected response {output}"
)
for item in re.findall(
r"(Total samples(.*)|GraphProfiler(.*)|Icicle view \(python stack\)(.*))",
output,
):
logger.info(item[0])
def _stop_strobelight_no_throw(
self,
collect_results: bool,
) -> None:
try:
# call stop run
self._stop_run()
logger.info("strobelight profiling stopped")
logger.debug("collection stopped")
if not collect_results:
return
self._get_results()
except Exception as error:
logger.warning("error during stop_strobelight", exc_info=True)
# Return true if strobelight started and is running. Never throw.
def _start_strobelight(self) -> bool:
strobelight_started = False
try:
self._run_async()
strobelight_started = True
logger.info("strobelight run id is: %s", self.current_run_id)
self._wait_for_running()
logger.info("strobelight profiling running")
return True
except Exception as error:
logger.warning("error during start_strobelight:", exc_info=True)
if strobelight_started:
self._stop_strobelight_no_throw(collect_results=False)
return False
def profile(self, work_function: Any, *args: Any, **kwargs: Any) -> Any:
self.current_run_id = None
if locked := StrobelightCLIFunctionProfiler._lock.acquire(False):
if not locked:
if self.stop_at_error:
raise StrobelightCLIProfilerError("concurrent runs not supported")
logger.warning("concurrent runs not supported")
return work_function(*args, **kwargs)
started = self._start_strobelight()
if not started:
if self.stop_at_error:
StrobelightCLIFunctionProfiler._lock.release()
raise StrobelightCLIProfilerError(
"failed to start strobelight profiling"
)
result = work_function(*args, **kwargs)
StrobelightCLIFunctionProfiler._lock.release()
return result
try:
logger.debug("collection started")
result = work_function(*args, **kwargs)
self._stop_strobelight_no_throw(collect_results=True)
StrobelightCLIFunctionProfiler._lock.release()
return result
except Exception as error:
logger.warning("work function throw exception", exc_info=True)
self._stop_strobelight_no_throw(collect_results=False)
StrobelightCLIFunctionProfiler._lock.release()
raise error
# A function decorator that wraps profile, if no profiler is provided one with
# default args is created. A function can be annotated as:
# @strobelight()
# @strobelight(profiler = StrobelightFunctionProfiler(stop_at_error=True,..))
# @strobelight(stop_at_error=True,...)
def strobelight(
profiler: Optional[StrobelightCLIFunctionProfiler] = None, **kwargs: Any
) -> Any:
if not profiler:
profiler = StrobelightCLIFunctionProfiler(**kwargs)
def strobelight_inner(work_function: Any) -> Any:
@functools.wraps(work_function)
def wrapper_function(*args: Any, **kwargs: Any) -> Any:
return profiler.profile(work_function, *args, **kwargs)
return wrapper_function
return strobelight_inner