#!/usr/bin/env python3
# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Tests for psutil.Process class."""
import collections
import errno
import getpass
import itertools
import os
import signal
import socket
import stat
import subprocess
import sys
import textwrap
import time
import types
import unittest
import psutil
from psutil import AIX
from psutil import BSD
from psutil import LINUX
from psutil import MACOS
from psutil import NETBSD
from psutil import OPENBSD
from psutil import OSX
from psutil import POSIX
from psutil import SUNOS
from psutil import WINDOWS
from psutil._common import open_text
from psutil._compat import PY3
from psutil._compat import FileNotFoundError
from psutil._compat import long
from psutil._compat import super
from psutil.tests import APPVEYOR
from psutil.tests import CI_TESTING
from psutil.tests import GITHUB_ACTIONS
from psutil.tests import GLOBAL_TIMEOUT
from psutil.tests import HAS_CPU_AFFINITY
from psutil.tests import HAS_ENVIRON
from psutil.tests import HAS_IONICE
from psutil.tests import HAS_MEMORY_MAPS
from psutil.tests import HAS_PROC_CPU_NUM
from psutil.tests import HAS_PROC_IO_COUNTERS
from psutil.tests import HAS_RLIMIT
from psutil.tests import HAS_THREADS
from psutil.tests import MACOS_11PLUS
from psutil.tests import PYPY
from psutil.tests import PYTHON_EXE
from psutil.tests import PsutilTestCase
from psutil.tests import ThreadTask
from psutil.tests import call_until
from psutil.tests import copyload_shared_lib
from psutil.tests import create_exe
from psutil.tests import mock
from psutil.tests import process_namespace
from psutil.tests import reap_children
from psutil.tests import retry_on_failure
from psutil.tests import sh
from psutil.tests import skip_on_access_denied
from psutil.tests import skip_on_not_implemented
from psutil.tests import wait_for_pid
# ===================================================================
# --- psutil.Process class tests
# ===================================================================
class TestProcess(PsutilTestCase):
"""Tests for psutil.Process class."""
def spawn_psproc(self, *args, **kwargs):
sproc = self.spawn_testproc(*args, **kwargs)
return psutil.Process(sproc.pid)
# ---
def test_pid(self):
p = psutil.Process()
self.assertEqual(p.pid, os.getpid())
with self.assertRaises(AttributeError):
p.pid = 33
def test_kill(self):
p = self.spawn_psproc()
p.kill()
code = p.wait()
if WINDOWS:
self.assertEqual(code, signal.SIGTERM)
else:
self.assertEqual(code, -signal.SIGKILL)
self.assertProcessGone(p)
def test_terminate(self):
p = self.spawn_psproc()
p.terminate()
code = p.wait()
if WINDOWS:
self.assertEqual(code, signal.SIGTERM)
else:
self.assertEqual(code, -signal.SIGTERM)
self.assertProcessGone(p)
def test_send_signal(self):
sig = signal.SIGKILL if POSIX else signal.SIGTERM
p = self.spawn_psproc()
p.send_signal(sig)
code = p.wait()
if WINDOWS:
self.assertEqual(code, sig)
else:
self.assertEqual(code, -sig)
self.assertProcessGone(p)
@unittest.skipIf(not POSIX, "not POSIX")
def test_send_signal_mocked(self):
sig = signal.SIGTERM
p = self.spawn_psproc()
with mock.patch('psutil.os.kill',
side_effect=OSError(errno.ESRCH, "")):
self.assertRaises(psutil.NoSuchProcess, p.send_signal, sig)
p = self.spawn_psproc()
with mock.patch('psutil.os.kill',
side_effect=OSError(errno.EPERM, "")):
self.assertRaises(psutil.AccessDenied, p.send_signal, sig)
def test_wait_exited(self):
# Test waitpid() + WIFEXITED -> WEXITSTATUS.
# normal return, same as exit(0)
cmd = [PYTHON_EXE, "-c", "pass"]
p = self.spawn_psproc(cmd)
code = p.wait()
self.assertEqual(code, 0)
self.assertProcessGone(p)
# exit(1), implicit in case of error
cmd = [PYTHON_EXE, "-c", "1 / 0"]
p = self.spawn_psproc(cmd, stderr=subprocess.PIPE)
code = p.wait()
self.assertEqual(code, 1)
self.assertProcessGone(p)
# via sys.exit()
cmd = [PYTHON_EXE, "-c", "import sys; sys.exit(5);"]
p = self.spawn_psproc(cmd)
code = p.wait()
self.assertEqual(code, 5)
self.assertProcessGone(p)
# via os._exit()
cmd = [PYTHON_EXE, "-c", "import os; os._exit(5);"]
p = self.spawn_psproc(cmd)
code = p.wait()
self.assertEqual(code, 5)
self.assertProcessGone(p)
def test_wait_stopped(self):
p = self.spawn_psproc()
if POSIX:
# Test waitpid() + WIFSTOPPED and WIFCONTINUED.
# Note: if a process is stopped it ignores SIGTERM.
p.send_signal(signal.SIGSTOP)
self.assertRaises(psutil.TimeoutExpired, p.wait, timeout=0.001)
p.send_signal(signal.SIGCONT)
self.assertRaises(psutil.TimeoutExpired, p.wait, timeout=0.001)
p.send_signal(signal.SIGTERM)
self.assertEqual(p.wait(), -signal.SIGTERM)
self.assertEqual(p.wait(), -signal.SIGTERM)
else:
p.suspend()
self.assertRaises(psutil.TimeoutExpired, p.wait, timeout=0.001)
p.resume()
self.assertRaises(psutil.TimeoutExpired, p.wait, timeout=0.001)
p.terminate()
self.assertEqual(p.wait(), signal.SIGTERM)
self.assertEqual(p.wait(), signal.SIGTERM)
def test_wait_non_children(self):
# Test wait() against a process which is not our direct
# child.
child, grandchild = self.spawn_children_pair()
self.assertRaises(psutil.TimeoutExpired, child.wait, 0.01)
self.assertRaises(psutil.TimeoutExpired, grandchild.wait, 0.01)
# We also terminate the direct child otherwise the
# grandchild will hang until the parent is gone.
child.terminate()
grandchild.terminate()
child_ret = child.wait()
grandchild_ret = grandchild.wait()
if POSIX:
self.assertEqual(child_ret, -signal.SIGTERM)
# For processes which are not our children we're supposed
# to get None.
self.assertEqual(grandchild_ret, None)
else:
self.assertEqual(child_ret, signal.SIGTERM)
self.assertEqual(child_ret, signal.SIGTERM)
def test_wait_timeout(self):
p = self.spawn_psproc()
p.name()
self.assertRaises(psutil.TimeoutExpired, p.wait, 0.01)
self.assertRaises(psutil.TimeoutExpired, p.wait, 0)
self.assertRaises(ValueError, p.wait, -1)
def test_wait_timeout_nonblocking(self):
p = self.spawn_psproc()
self.assertRaises(psutil.TimeoutExpired, p.wait, 0)
p.kill()
stop_at = time.time() + GLOBAL_TIMEOUT
while time.time() < stop_at:
try:
code = p.wait(0)
break
except psutil.TimeoutExpired:
pass
else:
raise self.fail('timeout')
if POSIX:
self.assertEqual(code, -signal.SIGKILL)
else:
self.assertEqual(code, signal.SIGTERM)
self.assertProcessGone(p)
def test_cpu_percent(self):
p = psutil.Process()
p.cpu_percent(interval=0.001)
p.cpu_percent(interval=0.001)
for x in range(100):
percent = p.cpu_percent(interval=None)
self.assertIsInstance(percent, float)
self.assertGreaterEqual(percent, 0.0)
with self.assertRaises(ValueError):
p.cpu_percent(interval=-1)
def test_cpu_percent_numcpus_none(self):
# See: https://github.com/giampaolo/psutil/issues/1087
with mock.patch('psutil.cpu_count', return_value=None) as m:
psutil.Process().cpu_percent()
assert m.called
def test_cpu_times(self):
times = psutil.Process().cpu_times()
assert (times.user > 0.0) or (times.system > 0.0), times
assert (times.children_user >= 0.0), times
assert (times.children_system >= 0.0), times
if LINUX:
assert times.iowait >= 0.0, times
# make sure returned values can be pretty printed with strftime
for name in times._fields:
time.strftime("%H:%M:%S", time.localtime(getattr(times, name)))
def test_cpu_times_2(self):
user_time, kernel_time = psutil.Process().cpu_times()[:2]
utime, ktime = os.times()[:2]
# Use os.times()[:2] as base values to compare our results
# using a tolerance of +/- 0.1 seconds.
# It will fail if the difference between the values is > 0.1s.
if (max([user_time, utime]) - min([user_time, utime])) > 0.1:
raise self.fail("expected: %s, found: %s" % (utime, user_time))
if (max([kernel_time, ktime]) - min([kernel_time, ktime])) > 0.1:
raise self.fail("expected: %s, found: %s" % (ktime, kernel_time))
@unittest.skipIf(not HAS_PROC_CPU_NUM, "not supported")
def test_cpu_num(self):
p = psutil.Process()
num = p.cpu_num()
self.assertGreaterEqual(num, 0)
if psutil.cpu_count() == 1:
self.assertEqual(num, 0)
self.assertIn(p.cpu_num(), range(psutil.cpu_count()))
def test_create_time(self):
p = self.spawn_psproc()
now = time.time()
create_time = p.create_time()
# Use time.time() as base value to compare our result using a
# tolerance of +/- 1 second.
# It will fail if the difference between the values is > 2s.
difference = abs(create_time - now)
if difference > 2:
raise self.fail("expected: %s, found: %s, difference: %s"
% (now, create_time, difference))
# make sure returned value can be pretty printed with strftime
time.strftime("%Y %m %d %H:%M:%S", time.localtime(p.create_time()))
@unittest.skipIf(not POSIX, 'POSIX only')
def test_terminal(self):
terminal = psutil.Process().terminal()
if terminal is not None:
tty = os.path.realpath(sh('tty'))
self.assertEqual(terminal, tty)
@unittest.skipIf(not HAS_PROC_IO_COUNTERS, 'not supported')
@skip_on_not_implemented(only_if=LINUX)
def test_io_counters(self):
p = psutil.Process()
# test reads
io1 = p.io_counters()
with open(PYTHON_EXE, 'rb') as f:
f.read()
io2 = p.io_counters()
if not BSD and not AIX:
self.assertGreater(io2.read_count, io1.read_count)
self.assertEqual(io2.write_count, io1.write_count)
if LINUX:
self.assertGreater(io2.read_chars, io1.read_chars)
self.assertEqual(io2.write_chars, io1.write_chars)
else:
self.assertGreaterEqual(io2.read_bytes, io1.read_bytes)
self.assertGreaterEqual(io2.write_bytes, io1.write_bytes)
# test writes
io1 = p.io_counters()
with open(self.get_testfn(), 'wb') as f:
if PY3:
f.write(bytes("x" * 1000000, 'ascii'))
else:
f.write("x" * 1000000)
io2 = p.io_counters()
self.assertGreaterEqual(io2.write_count, io1.write_count)
self.assertGreaterEqual(io2.write_bytes, io1.write_bytes)
self.assertGreaterEqual(io2.read_count, io1.read_count)
self.assertGreaterEqual(io2.read_bytes, io1.read_bytes)
if LINUX:
self.assertGreater(io2.write_chars, io1.write_chars)
self.assertGreaterEqual(io2.read_chars, io1.read_chars)
# sanity check
for i in range(len(io2)):
if BSD and i >= 2:
# On BSD read_bytes and write_bytes are always set to -1.
continue
self.assertGreaterEqual(io2[i], 0)
self.assertGreaterEqual(io2[i], 0)
@unittest.skipIf(not HAS_IONICE, "not supported")
Loading ...