Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

edgify / psutil   python

Repository URL to install this package:

/ tests / test_system.py

#!/usr/bin/env python3

# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

"""Tests for system APIS."""

import contextlib
import datetime
import errno
import os
import platform
import pprint
import shutil
import signal
import socket
import sys
import time
import unittest

import psutil
from psutil import AIX
from psutil import BSD
from psutil import FREEBSD
from psutil import LINUX
from psutil import MACOS
from psutil import NETBSD
from psutil import OPENBSD
from psutil import POSIX
from psutil import SUNOS
from psutil import WINDOWS
from psutil._compat import FileNotFoundError
from psutil._compat import long
from psutil.tests import ASCII_FS
from psutil.tests import CI_TESTING
from psutil.tests import DEVNULL
from psutil.tests import GITHUB_ACTIONS
from psutil.tests import GLOBAL_TIMEOUT
from psutil.tests import HAS_BATTERY
from psutil.tests import HAS_CPU_FREQ
from psutil.tests import HAS_GETLOADAVG
from psutil.tests import HAS_NET_IO_COUNTERS
from psutil.tests import HAS_SENSORS_BATTERY
from psutil.tests import HAS_SENSORS_FANS
from psutil.tests import HAS_SENSORS_TEMPERATURES
from psutil.tests import IS_64BIT
from psutil.tests import MACOS_12PLUS
from psutil.tests import PYPY
from psutil.tests import UNICODE_SUFFIX
from psutil.tests import PsutilTestCase
from psutil.tests import check_net_address
from psutil.tests import enum
from psutil.tests import mock
from psutil.tests import retry_on_failure


# ===================================================================
# --- System-related API tests
# ===================================================================


class TestProcessAPIs(PsutilTestCase):

    def test_process_iter(self):
        self.assertIn(os.getpid(), [x.pid for x in psutil.process_iter()])
        sproc = self.spawn_testproc()
        self.assertIn(sproc.pid, [x.pid for x in psutil.process_iter()])
        p = psutil.Process(sproc.pid)
        p.kill()
        p.wait()
        self.assertNotIn(sproc.pid, [x.pid for x in psutil.process_iter()])

        with mock.patch('psutil.Process',
                        side_effect=psutil.NoSuchProcess(os.getpid())):
            self.assertEqual(list(psutil.process_iter()), [])
        with mock.patch('psutil.Process',
                        side_effect=psutil.AccessDenied(os.getpid())):
            with self.assertRaises(psutil.AccessDenied):
                list(psutil.process_iter())

    def test_prcess_iter_w_attrs(self):
        for p in psutil.process_iter(attrs=['pid']):
            self.assertEqual(list(p.info.keys()), ['pid'])
        with self.assertRaises(ValueError):
            list(psutil.process_iter(attrs=['foo']))
        with mock.patch("psutil._psplatform.Process.cpu_times",
                        side_effect=psutil.AccessDenied(0, "")) as m:
            for p in psutil.process_iter(attrs=["pid", "cpu_times"]):
                self.assertIsNone(p.info['cpu_times'])
                self.assertGreaterEqual(p.info['pid'], 0)
            assert m.called
        with mock.patch("psutil._psplatform.Process.cpu_times",
                        side_effect=psutil.AccessDenied(0, "")) as m:
            flag = object()
            for p in psutil.process_iter(
                    attrs=["pid", "cpu_times"], ad_value=flag):
                self.assertIs(p.info['cpu_times'], flag)
                self.assertGreaterEqual(p.info['pid'], 0)
            assert m.called

    @unittest.skipIf(PYPY and WINDOWS,
                     "spawn_testproc() unreliable on PYPY + WINDOWS")
    def test_wait_procs(self):
        def callback(p):
            pids.append(p.pid)

        pids = []
        sproc1 = self.spawn_testproc()
        sproc2 = self.spawn_testproc()
        sproc3 = self.spawn_testproc()
        procs = [psutil.Process(x.pid) for x in (sproc1, sproc2, sproc3)]
        self.assertRaises(ValueError, psutil.wait_procs, procs, timeout=-1)
        self.assertRaises(TypeError, psutil.wait_procs, procs, callback=1)
        t = time.time()
        gone, alive = psutil.wait_procs(procs, timeout=0.01, callback=callback)

        self.assertLess(time.time() - t, 0.5)
        self.assertEqual(gone, [])
        self.assertEqual(len(alive), 3)
        self.assertEqual(pids, [])
        for p in alive:
            self.assertFalse(hasattr(p, 'returncode'))

        @retry_on_failure(30)
        def test(procs, callback):
            gone, alive = psutil.wait_procs(procs, timeout=0.03,
                                            callback=callback)
            self.assertEqual(len(gone), 1)
            self.assertEqual(len(alive), 2)
            return gone, alive

        sproc3.terminate()
        gone, alive = test(procs, callback)
        self.assertIn(sproc3.pid, [x.pid for x in gone])
        if POSIX:
            self.assertEqual(gone.pop().returncode, -signal.SIGTERM)
        else:
            self.assertEqual(gone.pop().returncode, 1)
        self.assertEqual(pids, [sproc3.pid])
        for p in alive:
            self.assertFalse(hasattr(p, 'returncode'))

        @retry_on_failure(30)
        def test(procs, callback):
            gone, alive = psutil.wait_procs(procs, timeout=0.03,
                                            callback=callback)
            self.assertEqual(len(gone), 3)
            self.assertEqual(len(alive), 0)
            return gone, alive

        sproc1.terminate()
        sproc2.terminate()
        gone, alive = test(procs, callback)
        self.assertEqual(set(pids), set([sproc1.pid, sproc2.pid, sproc3.pid]))
        for p in gone:
            self.assertTrue(hasattr(p, 'returncode'))

    @unittest.skipIf(PYPY and WINDOWS,
                     "spawn_testproc() unreliable on PYPY + WINDOWS")
    def test_wait_procs_no_timeout(self):
        sproc1 = self.spawn_testproc()
        sproc2 = self.spawn_testproc()
        sproc3 = self.spawn_testproc()
        procs = [psutil.Process(x.pid) for x in (sproc1, sproc2, sproc3)]
        for p in procs:
            p.terminate()
        gone, alive = psutil.wait_procs(procs)

    def test_pid_exists(self):
        sproc = self.spawn_testproc()
        self.assertTrue(psutil.pid_exists(sproc.pid))
        p = psutil.Process(sproc.pid)
        p.kill()
        p.wait()
        self.assertFalse(psutil.pid_exists(sproc.pid))
        self.assertFalse(psutil.pid_exists(-1))
        self.assertEqual(psutil.pid_exists(0), 0 in psutil.pids())

    def test_pid_exists_2(self):
        pids = psutil.pids()
        for pid in pids:
            try:
                assert psutil.pid_exists(pid)
            except AssertionError:
                # in case the process disappeared in meantime fail only
                # if it is no longer in psutil.pids()
                time.sleep(.1)
                self.assertNotIn(pid, psutil.pids())
        pids = range(max(pids) + 5000, max(pids) + 6000)
        for pid in pids:
            self.assertFalse(psutil.pid_exists(pid), msg=pid)


class TestMiscAPIs(PsutilTestCase):

    def test_boot_time(self):
        bt = psutil.boot_time()
        self.assertIsInstance(bt, float)
        self.assertGreater(bt, 0)
        self.assertLess(bt, time.time())

    @unittest.skipIf(CI_TESTING and not psutil.users(), "unreliable on CI")
    def test_users(self):
        users = psutil.users()
        self.assertNotEqual(users, [])
        for user in users:
            assert user.name, user
            self.assertIsInstance(user.name, str)
            self.assertIsInstance(user.terminal, (str, type(None)))
            if user.host is not None:
                self.assertIsInstance(user.host, (str, type(None)))
            user.terminal
            user.host
            assert user.started > 0.0, user
            datetime.datetime.fromtimestamp(user.started)
            if WINDOWS or OPENBSD:
                self.assertIsNone(user.pid)
            else:
                psutil.Process(user.pid)

    def test_test(self):
        # test for psutil.test() function
        stdout = sys.stdout
        sys.stdout = DEVNULL
        try:
            psutil.test()
        finally:
            sys.stdout = stdout

    def test_os_constants(self):
        names = ["POSIX", "WINDOWS", "LINUX", "MACOS", "FREEBSD", "OPENBSD",
                 "NETBSD", "BSD", "SUNOS"]
        for name in names:
            self.assertIsInstance(getattr(psutil, name), bool, msg=name)

        if os.name == 'posix':
            assert psutil.POSIX
            assert not psutil.WINDOWS
            names.remove("POSIX")
            if "linux" in sys.platform.lower():
                assert psutil.LINUX
                names.remove("LINUX")
            elif "bsd" in sys.platform.lower():
                assert psutil.BSD
                self.assertEqual([psutil.FREEBSD, psutil.OPENBSD,
                                  psutil.NETBSD].count(True), 1)
                names.remove("BSD")
                names.remove("FREEBSD")
                names.remove("OPENBSD")
                names.remove("NETBSD")
            elif "sunos" in sys.platform.lower() or \
                    "solaris" in sys.platform.lower():
                assert psutil.SUNOS
                names.remove("SUNOS")
            elif "darwin" in sys.platform.lower():
                assert psutil.MACOS
                names.remove("MACOS")
        else:
            assert psutil.WINDOWS
            assert not psutil.POSIX
            names.remove("WINDOWS")

        # assert all other constants are set to False
        for name in names:
            self.assertIs(getattr(psutil, name), False, msg=name)


class TestMemoryAPIs(PsutilTestCase):

    def test_virtual_memory(self):
        mem = psutil.virtual_memory()
        assert mem.total > 0, mem
        assert mem.available > 0, mem
        assert 0 <= mem.percent <= 100, mem
        assert mem.used > 0, mem
        assert mem.free >= 0, mem
        for name in mem._fields:
            value = getattr(mem, name)
            if name != 'percent':
                self.assertIsInstance(value, (int, long))
            if name != 'total':
                if not value >= 0:
                    raise self.fail("%r < 0 (%s)" % (name, value))
                if value > mem.total:
                    raise self.fail("%r > total (total=%s, %s=%s)"
                                    % (name, mem.total, name, value))

    def test_swap_memory(self):
        mem = psutil.swap_memory()
        self.assertEqual(
            mem._fields, ('total', 'used', 'free', 'percent', 'sin', 'sout'))

        assert mem.total >= 0, mem
        assert mem.used >= 0, mem
        if mem.total > 0:
            # likely a system with no swap partition
            assert mem.free > 0, mem
        else:
            assert mem.free == 0, mem
        assert 0 <= mem.percent <= 100, mem
        assert mem.sin >= 0, mem
        assert mem.sout >= 0, mem


class TestCpuAPIs(PsutilTestCase):

    def test_cpu_count_logical(self):
        logical = psutil.cpu_count()
        self.assertIsNotNone(logical)
        self.assertEqual(logical, len(psutil.cpu_times(percpu=True)))
        self.assertGreaterEqual(logical, 1)
        #
        if os.path.exists("/proc/cpuinfo"):
            with open("/proc/cpuinfo") as fd:
                cpuinfo_data = fd.read()
            if "physical id" not in cpuinfo_data:
                raise unittest.SkipTest("cpuinfo doesn't include physical id")

    def test_cpu_count_cores(self):
        logical = psutil.cpu_count()
        cores = psutil.cpu_count(logical=False)
        if cores is None:
            raise self.skipTest("cpu_count_cores() is None")
        if WINDOWS and sys.getwindowsversion()[:2] <= (6, 1):  # <= Vista
            self.assertIsNone(cores)
        else:
            self.assertGreaterEqual(cores, 1)
            self.assertGreaterEqual(logical, cores)

    def test_cpu_count_none(self):
        # https://github.com/giampaolo/psutil/issues/1085
        for val in (-1, 0, None):
            with mock.patch('psutil._psplatform.cpu_count_logical',
                            return_value=val) as m:
                self.assertIsNone(psutil.cpu_count())
                assert m.called
            with mock.patch('psutil._psplatform.cpu_count_cores',
                            return_value=val) as m:
                self.assertIsNone(psutil.cpu_count(logical=False))
                assert m.called

    def test_cpu_times(self):
        # Check type, value >= 0, str().
        total = 0
Loading ...