Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
PSI / tests / process_test.py
Size: Mime:
# The MIT License
#
# Copyright (C) 2007 Chris Miles
#
# Copyright (C) 2008-2009 Floris Bruynooghe
#
# Copyright (C) 2008-2009 Abilisoft Ltd.
#
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation files
# (the "Software"), to deal in the Software without restriction,
# including without limitation the rights to use, copy, modify, merge,
# publish, distribute, sublicense, and/or sell copies of the Software,
# and to permit persons to whom the Software is furnished to do so,
# subject to the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

"""Tests for the psi.process.Process class"""


import grp
import math
import os
import pwd
import signal
import sys
import time
import unittest
import warnings

from apphelper import *

try:
    import subprocess
except ImportError:
    HAVE_SUBPROCESS = False
else:
    HAVE_SUBPROCESS = True

import psi
import psi.arch
import psi.process


# Ignore FutureWarnings, we test those bits
if sys.hexversion >= 0x02030000: # >= 2.3
    warnings.simplefilter('ignore', FutureWarning)


# Patch old versions of unittest
if not hasattr(unittest.TestCase, 'assertTrue'):
    unittest.TestCase.assertTrue = unittest.TestCase.failUnless
if not hasattr(unittest.TestCase, 'assertFalse'):
    unittest.TestCase.assertFalse = unittest.TestCase.failIf
if not hasattr(unittest.TestCase, 'assertAlmostEqual'):
    def assertAlmostEqual(self, first, second, places=7, msg=None):
        """Stolen from Python 2.5"""
        if round(second-first, places) != 0:
            raise self.failureException(
                msg or '%r != %r within %r places'
                % (first, second, places))
    unittest.TestCase.assertAlmostEqual = assertAlmostEqual


def do_work():
    """Make sure total CPU time of our process increases"""
    t = 0
    e = time.clock() + 0.05
    if isinstance(psi.arch.arch_type(), psi.arch.ArchAIX):
        e += 0.8
    while t < e:
        for i in range(100):
            math.sin(i)
        t = time.clock()


class ExceptionTest(unittest.TestCase):
    def test_class(self):
        self.assert_(issubclass(psi.process.NoSuchProcessError,
                                psi.MissingResourceError))

    def test_instance(self):
        np = psi.process.NoSuchProcessError('foo')
        self.assert_(isinstance(np, psi.MissingResourceError))


class ProcessInitTest(unittest.TestCase):
    def setUp(self):
        self.pid = os.getpid()
        self.p = psi.process.Process(self.pid)

    def test_type(self):
        self.assert_(isinstance(self.p, psi.process.Process))

    def test_bad_kw_arg(self):
        self.assertRaises(TypeError, psi.process.Process, foo=1)

    def test_bad_pos_arg(self):
        self.assertRaises(TypeError, psi.process.Process, 'foo')

    def test_no_such_pid(self):
        # may not work in rare circumstances ... see how we go
        self.assertRaises(psi.process.NoSuchProcessError,
                          psi.process.Process, pid=-1)

    def test_pid(self):
        self.assertEqual(self.p.pid, self.pid)


class ProcessSpecialMethods(unittest.TestCase):
    def setUp(self):
        self.pid = os.getpid()
        self.p = psi.process.Process(self.pid)

    def test_repr(self):
        self.assertEqual('psi.process.Process(pid=%d)'%self.pid, repr(self.p))

    def test_hash_works(self):
        self.assertTrue(hash(self.p))

    def test_hash_compare(self):
        p = psi.process.Process(self.pid)
        self.assertEqual(hash(self.p), hash(p))


class RichCompareTest(unittest.TestCase):
    def setUp(self):
        self.pid = os.getpid()
        self.p = psi.process.Process(self.pid)
        self.init = psi.process.Process(1)

    def test_eq(self):
        self.assertEqual(self.p, self.p)
        self.assertEqual(self.p, psi.process.Process(self.pid))

    def test_ne(self):
        self.assertNotEqual(self.p, self.pid)
        self.assertNotEqual(self.p, self.init)

    def test_lt(self):
        self.assertTrue(self.init < self.p)
        self.assertFalse(self.p < self.p)
        self.assertFalse(self.p < self.init)

    def test_le(self):
        self.assertTrue(self.init <= self.p)
        self.assertTrue(self.p <= self.p)
        self.assertFalse(self.p <= self.init)

    def test_gt(self):
        self.assertFalse(self.init > self.p)
        self.assertFalse(self.p > self.p)
        self.assertTrue(self.p > self.init)

    def test_ge(self):
        self.assertFalse(self.init >= self.p)
        self.assertTrue(self.p >= self.p)
        self.assertTrue(self.p >= self.init)


class ProcessExeArgsEnvTest(unittest.TestCase):
    def setUp(self):
        self.pid = os.getpid()
        self.p = psi.process.Process(self.pid)
        self.arch = psi.arch.arch_type()
        self.args_short = ['abcdefghijklmnopqrtstuvw']
        self.args_long = 50 * self.args_short
        self.env = {'foo': 'fooenv',
                    'bar': 'barenv',
                    'baz': 'bazenv'}
        self.app = None

    def tearDown(self):
        if self.app is not None:
            self.app.kill()

    def test_name(self):
        # Lets assume that 'python' will at least appear inside our name
        if isinstance(self.arch, psi.arch.ArchDarwin):
            python_name = 'Python'  # OS X
        else:
            python_name = 'python'
        self.assert_(self.p.name.find(python_name) >= 0)

    def test_exe(self):
        if (isinstance(self.arch, psi.arch.ArchLinux) or
            (isinstance(self.arch, psi.arch.ArchSunOS) and
             self.arch.release_info >= (5, 10))):
            self.assertTrue(os.path.isabs(self.p.exe), self.p.exe)
        elif isinstance(self.arch, psi.arch.ArchDarwin):
            self.assertTrue(self.p.exe.endswith('Python'))
        else:
            self.assertTrue(self.p.exe.find('python') >= 0)

    def test_args_simple(self):
        self.assert_('python' ' '.join(self.p.args).lower())

    def test_argc_simple(self):
        self.assertTrue(self.p.argc > len(sys.argv))

    def test_command(self):
        calc_comm = ' '.join(self.p.args)
        psi_comm = self.p.command
        self.assertEqual(psi_comm, calc_comm[:len(psi_comm)])

    def test_env_simple(self):
        self.assertEqual(self.p.env['HOME'], os.path.expanduser('~'))


    if APP32:
        def test_env_32bit(self):
            self.app = TestApp([APP32], env=self.env)
            p = psi.process.Process(self.app.pid)
            self.assertEqual(p.env, self.env)

        def test_args_32bit_short(self):
            self.app = TestApp([APP32] + self.args_short)
            p = psi.process.Process(self.app.pid)
            self.assertEqual(p.args, tuple([APP32] + self.args_short))

        def test_args_32bit_long(self):
            self.app = TestApp([APP32] + self.args_long)
            p = psi.process.Process(self.app.pid)
            self.assertEqual(p.args, tuple([APP32] + self.args_long))

        def test_args_32bit_longarg(self):
            args = [APP32, 'arg_longer_then_fifty_characters'*2]
            self.app = TestApp(args)
            p = psi.process.Process(self.app.pid)
            self.assertEqual(p.args[1], args[1])

        def test_argc_32bit(self):
            self.app = TestApp([APP32, 'foo', 'bar'])
            p = psi.process.Process(self.app.pid)
            self.assertEqual(p.argc, 3)

    if APP64:
        def test_env_64bit(self):
            self.app = TestApp([APP64], env=self.env)
            p = psi.process.Process(self.app.pid)
            self.assertEqual(p.env, self.env)

        def test_args_64bit_short(self):
            self.app = TestApp([APP64] + self.args_short)
            p = psi.process.Process(self.app.pid)
            self.assertEqual(p.args, tuple([APP64] + self.args_short))

        def test_args_64bit_long(self):
            self.app = TestApp([APP64] + self.args_long)
            p = psi.process.Process(self.app.pid)
            self.assertEqual(p.args, tuple([APP64] + self.args_long))

        def test_args_64bit_longarg(self):
            args = [APP64, 'arg_longer_then_fifty_characters'*2]
            self.app = TestApp(args)
            p = psi.process.Process(self.app.pid)
            self.assertEqual(p.args[1], args[1])

        def test_argc_64bit(self):
            self.app = TestApp([APP64] + ['foo', 'bar'])
            p = psi.process.Process(self.app.pid)
            self.assertEqual(p.argc, 3)


class ProcessIdsTest(unittest.TestCase):
    def setUp(self):
        self.pid = os.getpid()
        self.p = psi.process.Process(self.pid)

    def test_euid(self):
        self.assertEqual(self.p.euid, os.geteuid())

    def test_egid(self):
        self.assertEqual(self.p.egid, os.getegid())

    def test_ruid(self):
        self.assertEqual(self.p.ruid, os.getuid())

    def test_rgid(self):
        self.assertEqual(self.p.rgid, os.getgid())


class ProcessAttrsTest(unittest.TestCase):
    def setUp(self):
        self.arch = psi.arch.arch_type()
        self.pid = os.getpid()
        self.p = psi.process.Process(self.pid)

    def test_cwd(self):
        if isinstance(self.arch, psi.arch.ArchDarwin):
            self.assertRaises(psi.AttrNotAvailableError, getattr, self.p, 'cwd')
        else:
            self.assertEqual(os.path.normpath(self.p.cwd), os.getcwd())

    def test_ppid(self):
        self.assert_(self.p.ppid > 1)
        self.assertEqual(type(self.p.ppid), type(self.p.pid))

    def test_pgrp(self):
        pgrp = pscmd('pgrp')
        self.assertEqual(self.p.pgrp, int(pgrp))

    def test_sid(self):
        sid = pscmd('sid')
        self.assertEqual(self.p.sid, int(sid))

    def test_nthreads(self):
        if isinstance(self.arch, psi.arch.ArchLinux) \
                and self.arch.release[:3] == '2.4':
            return
        if isinstance(self.arch, psi.arch.ArchSunOS):
            nlwp = pscmd('nlwp')
            self.assertEqual(self.p.nthreads, int(nlwp))
        else:
            self.assertEqual(self.p.nthreads, 1)

    def test_terminal(self):
        if isinstance(self.arch, psi.arch.ArchSunOS) \
                and self.arch.release in ['5.8', '5.9']:
            return
        if not HAVE_SUBPROCESS:
            return

        terminal = subprocess.Popen(['/usr/bin/tty'],
                                    stdout=subprocess.PIPE).communicate()[0]
        terminal = terminal.decode()
        self.assertEqual(self.p.terminal, terminal.strip())

    def test_status(self):
        if isinstance(self.arch, psi.arch.ArchLinux):
            self.assertEqual(self.p.status, psi.process.PROC_STATUS_RUNNING)
        elif isinstance(self.arch, psi.arch.ArchSunOS):
            if hasattr(psi.process, 'PROC_STATUS_SONPROC'):
                self.assertEqual(self.p.status, psi.process.PROC_STATUS_SONPROC)
            else:
                self.assertEqual(self.p.status, psi.process.PROC_STATUS_SRUN)
        elif isinstance(self.arch, psi.arch.ArchAIX):
            self.assertEqual(self.p.status, psi.process.PROC_STATUS_SACTIVE)

    def test_nice(self):
        # XXX Also need to test non-default nice values.
        ni = pscmd('ni')
        self.assertEqual(self.p.nice, int(ni))


class ProcessPriorityTest(unittest.TestCase):
    if os.uname()[0] == 'AIX':
        def test_range(self):
            p = psi.process.Process(os.getpid())
            self.assertTrue(0 <= p.priority <= 255)

    # XXX Add tests_range like tests for Linux and SunOS.


class ProcessTimeTest(unittest.TestCase):
    def setUp(self):
        self.pid = os.getpid()
        self.p = psi.process.Process(self.pid)
        self.arch = psi.arch.arch_type()

    def test_start_time(self):
        self.assert_(isinstance(self.p.start_time, psi.TimeSpec))

        # Due to rounding errors and CPUs being able to do lots within
        # just one jiffie/tick it can appear that we started after
        # now, at least on Linux.
        # By assuming we haven't been running for more then 20 minutes
        # we can more accurately detect if the timezone handling is correct

        now = time.time() + 1
        before_start = now - 20*60
        self.assert_(before_start < self.p.start_time.timestamp() <= now,
                     '%s < %s <= %s' %
                     (before_start, self.p.start_time.timestamp(), now))

        localnow = time.time() + time.timezone + 1
        if time.daylight:
            localnow += time.altzone
        before_start = localnow - 20*60
        self.assert_(before_start < self.p.start_time.mktime() <= localnow,
                     '%s < %s <= %s' %
                     (before_start, self.p.start_time.mktime(), localnow))

    def test_jiffies(self):
        if isinstance(self.arch, psi.arch.ArchLinux):
            f = open('/proc/%d/stat' % self.pid)
            try:
                stat = f.read().split()
            finally:
                f.close()
            self.assertEqual(self.p.jiffies, int(stat[21]))


class ProcessCpuTest(unittest.TestCase):
    def setUp(self):
        self.pid = os.getpid()
        self.p = psi.process.Process(self.pid)

    def test_cputime(self):
        self.assert_(isinstance(self.p.cputime, psi.TimeSpec))
        self.assert_(self.p.cputime >= (0, 0))

    def test_utime(self):
        self.assert_(isinstance(self.p.utime, psi.TimeSpec))
        self.assert_(self.p.utime >= (0, 0))

    def test_stime(self):
        self.assert_(isinstance(self.p.stime, psi.TimeSpec))
        self.assert_(self.p.stime >= (0, 0))

    def test_cputime_sum(self):
        variance = psi.TimeSpec(1, 0)
        min = self.p.cputime - variance
        max = self.p.cputime + variance
        self.assert_(min < self.p.utime+self.p.stime < max,
                     '%s < %s < %s' % (min, self.p.utime+self.p.stime, max))

    def test_cputime_increases(self):
        do_work()
        p = psi.process.Process(self.pid)
        assert self.p.cputime < p.cputime


    if hasattr(psi.process.Process, 'pcpu'):
        def test_pcpu(self):
            self.assertTrue(0 <= self.p.pcpu <= 100)


class ProcessMemTest(unittest.TestCase):
    def setUp(self):
        self.pid = os.getpid()
        self.p = psi.process.Process(self.pid)
        self.arch = psi.arch.arch_type()

    def test_rss(self):
        self.assert_(self.p.rss > 0)

    def test_vsz(self):
        self.assert_(self.p.vsz > 0)

    def test_rss_vs_vsz(self):
        if isinstance(self.arch, psi.arch.ArchAIX):
            # On AIX the VSZ is only the data section of the virtual
            # size since that's what ps(1) does there.  This means
            # that often (but not always) the RSS value will be larger
            # then the VSZ value, depending on the amount of data in
            # the shared libraries.
            return
        self.assert_(self.p.rss < self.p.vsz,
                     "%d < %d" % (self.p.rss, self.p.vsz))

    def test_rss_ps(self):
        rss = int(pscmd('rssize'))
        rss_min = rss - rss*0.1
        rss_max = rss + rss*0.1
        self.assert_(rss_min < self.p.rss/1024 < rss_max,
                     '%s < %s < %s' % (rss_min, self.p.rss/1024, rss_max))

    def test_vsz_ps(self):
        vsz = int(pscmd('vsz'))
        self.assert_(vsz*0.8 < self.p.vsz/1024 < vsz*1.2,
                     '%s < %s < %s' % (vsz*0.8, self.p.vsz/1024, vsz*1.2))


class ConstantsTest(unittest.TestCase):
    def test_status_codes(self):
        stat_names = [s for s in dir(psi.process)
                      if s.startswith('PROC_STATUS_')]
        self.assert_(len(stat_names) > 0)
        for name in stat_names:
            attr = getattr(psi.process, name)
            self.assert_(isinstance(attr, int))


class ProcessPrivsTest(unittest.TestCase):
    def test_init_works(self):
        p = psi.process.Process(1)
        self.assertEqual(p.euid, 0)


if os.geteuid() == 0 and (APP32 or APP64):
    class ProcessPriorityTest(unittest.TestCase):
        def setUp(self):
            self.appname = APP32 or APP64
            self.app = None

        def tearDown(self):
            if self.app is not None:
                self.app.kill()

        if os.uname()[0] == 'Linux':
            def test_sched_other(self):
                self.app = TestApp(['/usr/bin/chrt',
                                    '--other', '0', self.appname])
                p = psi.process.Process(self.app.pid)
                self.assertEqual(p.priority, 0)

            def test_sched_fifo(self):
                self.app = TestApp(['/usr/bin/chrt',
                                    '--fifo', '42', self.appname])
                p = psi.process.Process(self.app.pid)
                self.assertEqual(p.priority, 42)

            def test_sched_rr(self):
                self.app = TestApp(['/usr/bin/chrt',
                                    '--rr', '42', self.appname])
                p = psi.process.Process(self.app.pid)
                self.assertEqual(p.priority, 42)

        if os.uname()[0] == 'SunOS':
            def test_class_ts(self):
                # This is a very bad class to test, we can't really
                # assert anything since psi gets the real priority and
                # not the user priority.  But on SunOS 8 the FX class
                # is not available, so better have this test then none.
                self.app = TestApp(['/usr/bin/priocntl', '-e',
                                    '-c', 'TS', self.appname])
                p = psi.process.Process(self.app.pid)
                self.assertTrue(-60 <= p.priority <= 60,
                                '-60 <= %d <= 60' % p.priority)

            if psi.arch.arch_type().release_info > (5,8):
                def test_class_fx(self):
                    self.app = TestApp(['/usr/bin/priocntl', '-e',
                                        '-c', 'FX', '-m', '60',
                                        '-p', '42', self.appname])
                    p = psi.process.Process(self.app.pid)
                    self.assertEqual(p.priority, 42)

        if os.uname()[0] == 'AIX':
            def test_aixapp(self):
                # aixapp runs under SCHED_RR at priority 42.
                aixapp = os.path.join(os.path.dirname(__file__), 'aixapp')
                self.app = TestApp([aixapp])
                p = psi.process.Process(self.app.pid)
                self.assertEqual(p.priority, 42)


class RefreshTests(unittest.TestCase):
    def test_refresh(self):
        p = psi.process.Process(os.getpid())
        pid = p.pid
        ct = p.cputime
        do_work()
        p.refresh()
        self.assertEqual(pid, p.pid)
        self.assert_(ct < p.cputime, '%s < %s' % (ct, p.cputime))

    def test_proc_gone(self):
        app = TestApp(APP32 or APP64)
        try:
            p = psi.process.Process(app.pid)
            app.kill()
            app = None
            self.assertRaises(psi.process.NoSuchProcessError, p.refresh)
        finally:
            if app is not None:
                app.kill()


class ChildrenTests(unittest.TestCase):
    def setUp(self):
        self.p = psi.process.Process(os.getpid())

    def test_no_child(self):
        children = self.p.children()
        self.assertEqual(len(children), 0)

    def test_one_child(self):
        app = TestApp(APP32 or APP64)
        try:
            child = psi.process.Process(app.pid)
            children = self.p.children()
            self.assertEqual(len(children), 1)
            self.assertEqual(children[0], child)
        finally:
            app.kill()


class ExistsTests(unittest.TestCase):
    def test_exists(self):
        p = psi.process.Process(os.getpid())
        self.assertEqual(p.exists(), True)

    def test_dead(self):
        app = TestApp(APP32 or APP64)
        try:
            p = psi.process.Process(app.pid)
            app.kill()
            app = None
            self.assertEqual(p.exists(), False)
        finally:
            if app is not None:
                app.kill()


class KillTests(unittest.TestCase):
    def setUp(self):
        self.app = TestApp(APP32 or APP64)
        self.p = psi.process.Process(self.app.pid)

    def tearDown(self):
        if self.app.pid:
            self.app.kill()

    def test_default(self):
        self.p.kill()
        t = 5.0
        while t > 0:
            os.waitpid(self.p.pid, os.WNOHANG)
            if not self.p.exists():
                break
            time.sleep(0.05)
            t -= 0.05
        self.assertEqual(self.p.exists(), False)

    def test_stop(self):
        if isinstance(psi.arch.arch_type(), psi.arch.ArchLinux):
            stop_status = psi.process.PROC_STATUS_STOPPED
        else:
            stop_status = psi.process.PROC_STATUS_SSTOP
        self.p.kill(signal.SIGSTOP)
        t = 5.0
        while t > 0:
            self.p.refresh()
            if self.p.status == stop_status:
                break
            time.sleep(0.05)
            t -= 0.05
        self.assertEqual(self.p.status, stop_status)
        self.p.kill(signal.SIGCONT)
        self.p.refresh()
        self.assertNotEqual(self.p.status, stop_status)
        self.assertEqual(self.p.exists(), True)


class ZombieTests(unittest.TestCase):
    def setUp(self):
        self.app = TestApp(APP32 or APP64)
        self.app.kill_to_zombie()
        a = psi.arch.arch_type()
        if isinstance(a, psi.arch.ArchLinux):
            self.zombie_stat = psi.process.PROC_STATUS_ZOMBIE
        else:
            self.zombie_stat = psi.process.PROC_STATUS_SZOMB

    def tearDown(self):
        if self.app.pid:
            self.app.kill()

    def test_zombie_no_err(self):
        # Ensure no error during init when looking at a zombie
        if isinstance(psi.arch.arch_type(), psi.arch.ArchDarwin):
            # pause to allow process status to update
            time.sleep(1)
        p = psi.process.Process(self.app.pid)
        self.assertEqual(p.status, self.zombie_stat)


if __name__ == '__main__':
    unittest.main()