Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

edgify / psutil   python

Repository URL to install this package:

/ tests / test_process.py

#!/usr/bin/env python3

# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

"""Tests for psutil.Process class."""

import collections
import errno
import getpass
import itertools
import os
import signal
import socket
import stat
import subprocess
import sys
import textwrap
import time
import types
import unittest

import psutil
from psutil import AIX
from psutil import BSD
from psutil import LINUX
from psutil import MACOS
from psutil import NETBSD
from psutil import OPENBSD
from psutil import OSX
from psutil import POSIX
from psutil import SUNOS
from psutil import WINDOWS
from psutil._common import open_text
from psutil._compat import PY3
from psutil._compat import FileNotFoundError
from psutil._compat import long
from psutil._compat import super
from psutil.tests import APPVEYOR
from psutil.tests import CI_TESTING
from psutil.tests import GITHUB_ACTIONS
from psutil.tests import GLOBAL_TIMEOUT
from psutil.tests import HAS_CPU_AFFINITY
from psutil.tests import HAS_ENVIRON
from psutil.tests import HAS_IONICE
from psutil.tests import HAS_MEMORY_MAPS
from psutil.tests import HAS_PROC_CPU_NUM
from psutil.tests import HAS_PROC_IO_COUNTERS
from psutil.tests import HAS_RLIMIT
from psutil.tests import HAS_THREADS
from psutil.tests import MACOS_11PLUS
from psutil.tests import PYPY
from psutil.tests import PYTHON_EXE
from psutil.tests import PsutilTestCase
from psutil.tests import ThreadTask
from psutil.tests import call_until
from psutil.tests import copyload_shared_lib
from psutil.tests import create_exe
from psutil.tests import mock
from psutil.tests import process_namespace
from psutil.tests import reap_children
from psutil.tests import retry_on_failure
from psutil.tests import sh
from psutil.tests import skip_on_access_denied
from psutil.tests import skip_on_not_implemented
from psutil.tests import wait_for_pid


# ===================================================================
# --- psutil.Process class tests
# ===================================================================


class TestProcess(PsutilTestCase):
    """Tests for psutil.Process class."""

    def spawn_psproc(self, *args, **kwargs):
        sproc = self.spawn_testproc(*args, **kwargs)
        return psutil.Process(sproc.pid)

    # ---

    def test_pid(self):
        p = psutil.Process()
        self.assertEqual(p.pid, os.getpid())
        with self.assertRaises(AttributeError):
            p.pid = 33

    def test_kill(self):
        p = self.spawn_psproc()
        p.kill()
        code = p.wait()
        if WINDOWS:
            self.assertEqual(code, signal.SIGTERM)
        else:
            self.assertEqual(code, -signal.SIGKILL)
        self.assertProcessGone(p)

    def test_terminate(self):
        p = self.spawn_psproc()
        p.terminate()
        code = p.wait()
        if WINDOWS:
            self.assertEqual(code, signal.SIGTERM)
        else:
            self.assertEqual(code, -signal.SIGTERM)
        self.assertProcessGone(p)

    def test_send_signal(self):
        sig = signal.SIGKILL if POSIX else signal.SIGTERM
        p = self.spawn_psproc()
        p.send_signal(sig)
        code = p.wait()
        if WINDOWS:
            self.assertEqual(code, sig)
        else:
            self.assertEqual(code, -sig)
        self.assertProcessGone(p)

    @unittest.skipIf(not POSIX, "not POSIX")
    def test_send_signal_mocked(self):
        sig = signal.SIGTERM
        p = self.spawn_psproc()
        with mock.patch('psutil.os.kill',
                        side_effect=OSError(errno.ESRCH, "")):
            self.assertRaises(psutil.NoSuchProcess, p.send_signal, sig)

        p = self.spawn_psproc()
        with mock.patch('psutil.os.kill',
                        side_effect=OSError(errno.EPERM, "")):
            self.assertRaises(psutil.AccessDenied, p.send_signal, sig)

    def test_wait_exited(self):
        # Test waitpid() + WIFEXITED -> WEXITSTATUS.
        # normal return, same as exit(0)
        cmd = [PYTHON_EXE, "-c", "pass"]
        p = self.spawn_psproc(cmd)
        code = p.wait()
        self.assertEqual(code, 0)
        self.assertProcessGone(p)
        # exit(1), implicit in case of error
        cmd = [PYTHON_EXE, "-c", "1 / 0"]
        p = self.spawn_psproc(cmd, stderr=subprocess.PIPE)
        code = p.wait()
        self.assertEqual(code, 1)
        self.assertProcessGone(p)
        # via sys.exit()
        cmd = [PYTHON_EXE, "-c", "import sys; sys.exit(5);"]
        p = self.spawn_psproc(cmd)
        code = p.wait()
        self.assertEqual(code, 5)
        self.assertProcessGone(p)
        # via os._exit()
        cmd = [PYTHON_EXE, "-c", "import os; os._exit(5);"]
        p = self.spawn_psproc(cmd)
        code = p.wait()
        self.assertEqual(code, 5)
        self.assertProcessGone(p)

    def test_wait_stopped(self):
        p = self.spawn_psproc()
        if POSIX:
            # Test waitpid() + WIFSTOPPED and WIFCONTINUED.
            # Note: if a process is stopped it ignores SIGTERM.
            p.send_signal(signal.SIGSTOP)
            self.assertRaises(psutil.TimeoutExpired, p.wait, timeout=0.001)
            p.send_signal(signal.SIGCONT)
            self.assertRaises(psutil.TimeoutExpired, p.wait, timeout=0.001)
            p.send_signal(signal.SIGTERM)
            self.assertEqual(p.wait(), -signal.SIGTERM)
            self.assertEqual(p.wait(), -signal.SIGTERM)
        else:
            p.suspend()
            self.assertRaises(psutil.TimeoutExpired, p.wait, timeout=0.001)
            p.resume()
            self.assertRaises(psutil.TimeoutExpired, p.wait, timeout=0.001)
            p.terminate()
            self.assertEqual(p.wait(), signal.SIGTERM)
            self.assertEqual(p.wait(), signal.SIGTERM)

    def test_wait_non_children(self):
        # Test wait() against a process which is not our direct
        # child.
        child, grandchild = self.spawn_children_pair()
        self.assertRaises(psutil.TimeoutExpired, child.wait, 0.01)
        self.assertRaises(psutil.TimeoutExpired, grandchild.wait, 0.01)
        # We also terminate the direct child otherwise the
        # grandchild will hang until the parent is gone.
        child.terminate()
        grandchild.terminate()
        child_ret = child.wait()
        grandchild_ret = grandchild.wait()
        if POSIX:
            self.assertEqual(child_ret, -signal.SIGTERM)
            # For processes which are not our children we're supposed
            # to get None.
            self.assertEqual(grandchild_ret, None)
        else:
            self.assertEqual(child_ret, signal.SIGTERM)
            self.assertEqual(child_ret, signal.SIGTERM)

    def test_wait_timeout(self):
        p = self.spawn_psproc()
        p.name()
        self.assertRaises(psutil.TimeoutExpired, p.wait, 0.01)
        self.assertRaises(psutil.TimeoutExpired, p.wait, 0)
        self.assertRaises(ValueError, p.wait, -1)

    def test_wait_timeout_nonblocking(self):
        p = self.spawn_psproc()
        self.assertRaises(psutil.TimeoutExpired, p.wait, 0)
        p.kill()
        stop_at = time.time() + GLOBAL_TIMEOUT
        while time.time() < stop_at:
            try:
                code = p.wait(0)
                break
            except psutil.TimeoutExpired:
                pass
        else:
            raise self.fail('timeout')
        if POSIX:
            self.assertEqual(code, -signal.SIGKILL)
        else:
            self.assertEqual(code, signal.SIGTERM)
        self.assertProcessGone(p)

    def test_cpu_percent(self):
        p = psutil.Process()
        p.cpu_percent(interval=0.001)
        p.cpu_percent(interval=0.001)
        for x in range(100):
            percent = p.cpu_percent(interval=None)
            self.assertIsInstance(percent, float)
            self.assertGreaterEqual(percent, 0.0)
        with self.assertRaises(ValueError):
            p.cpu_percent(interval=-1)

    def test_cpu_percent_numcpus_none(self):
        # See: https://github.com/giampaolo/psutil/issues/1087
        with mock.patch('psutil.cpu_count', return_value=None) as m:
            psutil.Process().cpu_percent()
            assert m.called

    def test_cpu_times(self):
        times = psutil.Process().cpu_times()
        assert (times.user > 0.0) or (times.system > 0.0), times
        assert (times.children_user >= 0.0), times
        assert (times.children_system >= 0.0), times
        if LINUX:
            assert times.iowait >= 0.0, times
        # make sure returned values can be pretty printed with strftime
        for name in times._fields:
            time.strftime("%H:%M:%S", time.localtime(getattr(times, name)))

    def test_cpu_times_2(self):
        user_time, kernel_time = psutil.Process().cpu_times()[:2]
        utime, ktime = os.times()[:2]

        # Use os.times()[:2] as base values to compare our results
        # using a tolerance  of +/- 0.1 seconds.
        # It will fail if the difference between the values is > 0.1s.
        if (max([user_time, utime]) - min([user_time, utime])) > 0.1:
            raise self.fail("expected: %s, found: %s" % (utime, user_time))

        if (max([kernel_time, ktime]) - min([kernel_time, ktime])) > 0.1:
            raise self.fail("expected: %s, found: %s" % (ktime, kernel_time))

    @unittest.skipIf(not HAS_PROC_CPU_NUM, "not supported")
    def test_cpu_num(self):
        p = psutil.Process()
        num = p.cpu_num()
        self.assertGreaterEqual(num, 0)
        if psutil.cpu_count() == 1:
            self.assertEqual(num, 0)
        self.assertIn(p.cpu_num(), range(psutil.cpu_count()))

    def test_create_time(self):
        p = self.spawn_psproc()
        now = time.time()
        create_time = p.create_time()

        # Use time.time() as base value to compare our result using a
        # tolerance of +/- 1 second.
        # It will fail if the difference between the values is > 2s.
        difference = abs(create_time - now)
        if difference > 2:
            raise self.fail("expected: %s, found: %s, difference: %s"
                            % (now, create_time, difference))

        # make sure returned value can be pretty printed with strftime
        time.strftime("%Y %m %d %H:%M:%S", time.localtime(p.create_time()))

    @unittest.skipIf(not POSIX, 'POSIX only')
    def test_terminal(self):
        terminal = psutil.Process().terminal()
        if terminal is not None:
            tty = os.path.realpath(sh('tty'))
            self.assertEqual(terminal, tty)

    @unittest.skipIf(not HAS_PROC_IO_COUNTERS, 'not supported')
    @skip_on_not_implemented(only_if=LINUX)
    def test_io_counters(self):
        p = psutil.Process()
        # test reads
        io1 = p.io_counters()
        with open(PYTHON_EXE, 'rb') as f:
            f.read()
        io2 = p.io_counters()
        if not BSD and not AIX:
            self.assertGreater(io2.read_count, io1.read_count)
            self.assertEqual(io2.write_count, io1.write_count)
            if LINUX:
                self.assertGreater(io2.read_chars, io1.read_chars)
                self.assertEqual(io2.write_chars, io1.write_chars)
        else:
            self.assertGreaterEqual(io2.read_bytes, io1.read_bytes)
            self.assertGreaterEqual(io2.write_bytes, io1.write_bytes)

        # test writes
        io1 = p.io_counters()
        with open(self.get_testfn(), 'wb') as f:
            if PY3:
                f.write(bytes("x" * 1000000, 'ascii'))
            else:
                f.write("x" * 1000000)
        io2 = p.io_counters()
        self.assertGreaterEqual(io2.write_count, io1.write_count)
        self.assertGreaterEqual(io2.write_bytes, io1.write_bytes)
        self.assertGreaterEqual(io2.read_count, io1.read_count)
        self.assertGreaterEqual(io2.read_bytes, io1.read_bytes)
        if LINUX:
            self.assertGreater(io2.write_chars, io1.write_chars)
            self.assertGreaterEqual(io2.read_chars, io1.read_chars)

        # sanity check
        for i in range(len(io2)):
            if BSD and i >= 2:
                # On BSD read_bytes and write_bytes are always set to -1.
                continue
            self.assertGreaterEqual(io2[i], 0)
            self.assertGreaterEqual(io2[i], 0)

    @unittest.skipIf(not HAS_IONICE, "not supported")
Loading ...