Repository URL to install this package:
"Test posix functions"
from test import support
from test.support.script_helper import assert_python_ok
# Skip these tests if there is no posix module.
posix = support.import_module('posix')
import errno
import sys
import signal
import time
import os
import platform
import pwd
import stat
import tempfile
import unittest
import warnings
import textwrap
_DUMMY_SYMLINK = os.path.join(tempfile.gettempdir(),
support.TESTFN + '-dummy-symlink')
requires_32b = unittest.skipUnless(sys.maxsize < 2**32,
'test is only meaningful on 32-bit builds')
def _supports_sched():
if not hasattr(posix, 'sched_getscheduler'):
return False
try:
posix.sched_getscheduler(0)
except OSError as e:
if e.errno == errno.ENOSYS:
return False
return True
requires_sched = unittest.skipUnless(_supports_sched(), 'requires POSIX scheduler API')
class PosixTester(unittest.TestCase):
def setUp(self):
# create empty file
fp = open(support.TESTFN, 'w+')
fp.close()
self.teardown_files = [ support.TESTFN ]
self._warnings_manager = support.check_warnings()
self._warnings_manager.__enter__()
warnings.filterwarnings('ignore', '.* potential security risk .*',
RuntimeWarning)
def tearDown(self):
for teardown_file in self.teardown_files:
support.unlink(teardown_file)
self._warnings_manager.__exit__(None, None, None)
def testNoArgFunctions(self):
# test posix functions which take no arguments and have
# no side-effects which we need to cleanup (e.g., fork, wait, abort)
NO_ARG_FUNCTIONS = [ "ctermid", "getcwd", "getcwdb", "uname",
"times", "getloadavg",
"getegid", "geteuid", "getgid", "getgroups",
"getpid", "getpgrp", "getppid", "getuid", "sync",
]
for name in NO_ARG_FUNCTIONS:
posix_func = getattr(posix, name, None)
if posix_func is not None:
posix_func()
self.assertRaises(TypeError, posix_func, 1)
@unittest.skipUnless(hasattr(posix, 'getresuid'),
'test needs posix.getresuid()')
def test_getresuid(self):
user_ids = posix.getresuid()
self.assertEqual(len(user_ids), 3)
for val in user_ids:
self.assertGreaterEqual(val, 0)
@unittest.skipUnless(hasattr(posix, 'getresgid'),
'test needs posix.getresgid()')
def test_getresgid(self):
group_ids = posix.getresgid()
self.assertEqual(len(group_ids), 3)
for val in group_ids:
self.assertGreaterEqual(val, 0)
@unittest.skipUnless(hasattr(posix, 'setresuid'),
'test needs posix.setresuid()')
def test_setresuid(self):
current_user_ids = posix.getresuid()
self.assertIsNone(posix.setresuid(*current_user_ids))
# -1 means don't change that value.
self.assertIsNone(posix.setresuid(-1, -1, -1))
@unittest.skipUnless(hasattr(posix, 'setresuid'),
'test needs posix.setresuid()')
def test_setresuid_exception(self):
# Don't do this test if someone is silly enough to run us as root.
current_user_ids = posix.getresuid()
if 0 not in current_user_ids:
new_user_ids = (current_user_ids[0]+1, -1, -1)
self.assertRaises(OSError, posix.setresuid, *new_user_ids)
@unittest.skipUnless(hasattr(posix, 'setresgid'),
'test needs posix.setresgid()')
def test_setresgid(self):
current_group_ids = posix.getresgid()
self.assertIsNone(posix.setresgid(*current_group_ids))
# -1 means don't change that value.
self.assertIsNone(posix.setresgid(-1, -1, -1))
@unittest.skipUnless(hasattr(posix, 'setresgid'),
'test needs posix.setresgid()')
def test_setresgid_exception(self):
# Don't do this test if someone is silly enough to run us as root.
current_group_ids = posix.getresgid()
if 0 not in current_group_ids:
new_group_ids = (current_group_ids[0]+1, -1, -1)
self.assertRaises(OSError, posix.setresgid, *new_group_ids)
@unittest.skipUnless(hasattr(posix, 'initgroups'),
"test needs os.initgroups()")
def test_initgroups(self):
# It takes a string and an integer; check that it raises a TypeError
# for other argument lists.
self.assertRaises(TypeError, posix.initgroups)
self.assertRaises(TypeError, posix.initgroups, None)
self.assertRaises(TypeError, posix.initgroups, 3, "foo")
self.assertRaises(TypeError, posix.initgroups, "foo", 3, object())
# If a non-privileged user invokes it, it should fail with OSError
# EPERM.
if os.getuid() != 0:
try:
name = pwd.getpwuid(posix.getuid()).pw_name
except KeyError:
# the current UID may not have a pwd entry
raise unittest.SkipTest("need a pwd entry")
try:
posix.initgroups(name, 13)
except OSError as e:
self.assertEqual(e.errno, errno.EPERM)
else:
self.fail("Expected OSError to be raised by initgroups")
@unittest.skipUnless(hasattr(posix, 'statvfs'),
'test needs posix.statvfs()')
def test_statvfs(self):
self.assertTrue(posix.statvfs(os.curdir))
@unittest.skipUnless(hasattr(posix, 'fstatvfs'),
'test needs posix.fstatvfs()')
def test_fstatvfs(self):
fp = open(support.TESTFN)
try:
self.assertTrue(posix.fstatvfs(fp.fileno()))
self.assertTrue(posix.statvfs(fp.fileno()))
finally:
fp.close()
@unittest.skipUnless(hasattr(posix, 'ftruncate'),
'test needs posix.ftruncate()')
def test_ftruncate(self):
fp = open(support.TESTFN, 'w+')
try:
# we need to have some data to truncate
fp.write('test')
fp.flush()
posix.ftruncate(fp.fileno(), 0)
finally:
fp.close()
@unittest.skipUnless(hasattr(posix, 'truncate'), "test needs posix.truncate()")
def test_truncate(self):
with open(support.TESTFN, 'w') as fp:
fp.write('test')
fp.flush()
posix.truncate(support.TESTFN, 0)
@unittest.skipUnless(getattr(os, 'execve', None) in os.supports_fd, "test needs execve() to support the fd parameter")
@unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()")
@unittest.skipUnless(hasattr(os, 'waitpid'), "test needs os.waitpid()")
def test_fexecve(self):
fp = os.open(sys.executable, os.O_RDONLY)
try:
pid = os.fork()
if pid == 0:
os.chdir(os.path.split(sys.executable)[0])
posix.execve(fp, [sys.executable, '-c', 'pass'], os.environ)
else:
self.assertEqual(os.waitpid(pid, 0), (pid, 0))
finally:
os.close(fp)
@unittest.skipUnless(hasattr(posix, 'waitid'), "test needs posix.waitid()")
@unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()")
def test_waitid(self):
pid = os.fork()
if pid == 0:
os.chdir(os.path.split(sys.executable)[0])
posix.execve(sys.executable, [sys.executable, '-c', 'pass'], os.environ)
else:
res = posix.waitid(posix.P_PID, pid, posix.WEXITED)
self.assertEqual(pid, res.si_pid)
@unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()")
def test_register_at_fork(self):
with self.assertRaises(TypeError, msg="Positional args not allowed"):
os.register_at_fork(lambda: None)
with self.assertRaises(TypeError, msg="Args must be callable"):
os.register_at_fork(before=2)
with self.assertRaises(TypeError, msg="Args must be callable"):
os.register_at_fork(after_in_child="three")
with self.assertRaises(TypeError, msg="Args must be callable"):
os.register_at_fork(after_in_parent=b"Five")
with self.assertRaises(TypeError, msg="Args must not be None"):
os.register_at_fork(before=None)
with self.assertRaises(TypeError, msg="Args must not be None"):
os.register_at_fork(after_in_child=None)
with self.assertRaises(TypeError, msg="Args must not be None"):
os.register_at_fork(after_in_parent=None)
with self.assertRaises(TypeError, msg="Invalid arg was allowed"):
# Ensure a combination of valid and invalid is an error.
os.register_at_fork(before=None, after_in_parent=lambda: 3)
with self.assertRaises(TypeError, msg="Invalid arg was allowed"):
# Ensure a combination of valid and invalid is an error.
os.register_at_fork(before=lambda: None, after_in_child='')
# We test actual registrations in their own process so as not to
# pollute this one. There is no way to unregister for cleanup.
code = """if 1:
import os
r, w = os.pipe()
fin_r, fin_w = os.pipe()
os.register_at_fork(before=lambda: os.write(w, b'A'))
os.register_at_fork(after_in_parent=lambda: os.write(w, b'C'))
os.register_at_fork(after_in_child=lambda: os.write(w, b'E'))
os.register_at_fork(before=lambda: os.write(w, b'B'),
after_in_parent=lambda: os.write(w, b'D'),
after_in_child=lambda: os.write(w, b'F'))
pid = os.fork()
if pid == 0:
# At this point, after-forkers have already been executed
os.close(w)
# Wait for parent to tell us to exit
os.read(fin_r, 1)
os._exit(0)
else:
try:
os.close(w)
with open(r, "rb") as f:
data = f.read()
assert len(data) == 6, data
# Check before-fork callbacks
assert data[:2] == b'BA', data
# Check after-fork callbacks
assert sorted(data[2:]) == list(b'CDEF'), data
assert data.index(b'C') < data.index(b'D'), data
assert data.index(b'E') < data.index(b'F'), data
finally:
os.write(fin_w, b'!')
"""
assert_python_ok('-c', code)
@unittest.skipUnless(hasattr(posix, 'lockf'), "test needs posix.lockf()")
def test_lockf(self):
fd = os.open(support.TESTFN, os.O_WRONLY | os.O_CREAT)
try:
os.write(fd, b'test')
os.lseek(fd, 0, os.SEEK_SET)
posix.lockf(fd, posix.F_LOCK, 4)
# section is locked
posix.lockf(fd, posix.F_ULOCK, 4)
finally:
os.close(fd)
@unittest.skipUnless(hasattr(posix, 'pread'), "test needs posix.pread()")
def test_pread(self):
fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT)
try:
os.write(fd, b'test')
os.lseek(fd, 0, os.SEEK_SET)
self.assertEqual(b'es', posix.pread(fd, 2, 1))
# the first pread() shouldn't disturb the file offset
self.assertEqual(b'te', posix.read(fd, 2))
finally:
os.close(fd)
@unittest.skipUnless(hasattr(posix, 'preadv'), "test needs posix.preadv()")
def test_preadv(self):
fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT)
try:
os.write(fd, b'test1tt2t3t5t6t6t8')
buf = [bytearray(i) for i in [5, 3, 2]]
self.assertEqual(posix.preadv(fd, buf, 3), 10)
self.assertEqual([b't1tt2', b't3t', b'5t'], list(buf))
finally:
os.close(fd)
@unittest.skipUnless(hasattr(posix, 'preadv'), "test needs posix.preadv()")
@unittest.skipUnless(hasattr(posix, 'RWF_HIPRI'), "test needs posix.RWF_HIPRI")
def test_preadv_flags(self):
fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT)
try:
os.write(fd, b'test1tt2t3t5t6t6t8')
buf = [bytearray(i) for i in [5, 3, 2]]
self.assertEqual(posix.preadv(fd, buf, 3, os.RWF_HIPRI), 10)
self.assertEqual([b't1tt2', b't3t', b'5t'], list(buf))
except NotImplementedError:
self.skipTest("preadv2 not available")
except OSError as inst:
# Is possible that the macro RWF_HIPRI was defined at compilation time
# but the option is not supported by the kernel or the runtime libc shared
# library.
if inst.errno in {errno.EINVAL, errno.ENOTSUP}:
raise unittest.SkipTest("RWF_HIPRI is not supported by the current system")
else:
raise
finally:
os.close(fd)
@unittest.skipUnless(hasattr(posix, 'preadv'), "test needs posix.preadv()")
@requires_32b
def test_preadv_overflow_32bits(self):
fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT)
try:
buf = [bytearray(2**16)] * 2**15
with self.assertRaises(OSError) as cm:
os.preadv(fd, buf, 0)
self.assertEqual(cm.exception.errno, errno.EINVAL)
self.assertEqual(bytes(buf[0]), b'\0'* 2**16)
finally:
os.close(fd)
@unittest.skipUnless(hasattr(posix, 'pwrite'), "test needs posix.pwrite()")
def test_pwrite(self):
fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT)
try:
os.write(fd, b'test')
os.lseek(fd, 0, os.SEEK_SET)
posix.pwrite(fd, b'xx', 1)
Loading ...