Repository URL to install this package:
|
Version:
1:7.26.0-1 ▾
|
datadog-agent
/
opt
/
datadog-agent
/
embedded
/
lib
/
python3.8
/
site-packages
/
datadog_checks
/
process
/
cache.py
|
|---|
# (C) Datadog, Inc. 2020-present
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)
import time
import psutil
from .lock import ReadWriteLock
DEFAULT_SHARED_PROCESS_LIST_CACHE_DURATION = 120
class ProcessListCache(object):
"""Process list to be shared among all instances."""
elements = []
lock = ReadWriteLock()
last_ts = 0
cache_duration = DEFAULT_SHARED_PROCESS_LIST_CACHE_DURATION
def read_lock(self):
return self.lock.read_lock()
def write_lock(self):
return self.lock.write_lock()
def _should_refresh(self):
now = time.time()
return now - self.last_ts > self.cache_duration
def refresh(self):
"""Checks if cache should be refreshed, and refreshes it if needed.
Returns True if the cache was refreshed, False otherwise."""
# Acquire the write lock to check whether to refresh because we're
# going to keep it to do the refresh, AND, we don't want multiple
# threads getting a `yes` result at once
with self.write_lock():
if self._should_refresh():
self.elements = [proc for proc in psutil.process_iter(attrs=['pid', 'name'])]
self.last_ts = time.time()
return True
else:
return False
def reset(self):
"""Resets the cache."""
self.last_ts = 0