import collections
import ctypes
import itertools
import logging
import multiprocessing
import os
import pickle
import textwrap
import unittest
from importlib import import_module
from django.conf import settings
from django.core.exceptions import ImproperlyConfigured
from django.db import DEFAULT_DB_ALIAS, connections
from django.test import SimpleTestCase, TestCase
from django.test.utils import setup_test_environment, teardown_test_environment
from django.utils.datastructures import OrderedSet
from django.utils.six import StringIO
try:
import tblib.pickling_support
except ImportError:
tblib = None
class DebugSQLTextTestResult(unittest.TextTestResult):
def __init__(self, stream, descriptions, verbosity):
self.logger = logging.getLogger('django.db.backends')
self.logger.setLevel(logging.DEBUG)
super(DebugSQLTextTestResult, self).__init__(stream, descriptions, verbosity)
def startTest(self, test):
self.debug_sql_stream = StringIO()
self.handler = logging.StreamHandler(self.debug_sql_stream)
self.logger.addHandler(self.handler)
super(DebugSQLTextTestResult, self).startTest(test)
def stopTest(self, test):
super(DebugSQLTextTestResult, self).stopTest(test)
self.logger.removeHandler(self.handler)
if self.showAll:
self.debug_sql_stream.seek(0)
self.stream.write(self.debug_sql_stream.read())
self.stream.writeln(self.separator2)
def addError(self, test, err):
super(DebugSQLTextTestResult, self).addError(test, err)
self.debug_sql_stream.seek(0)
self.errors[-1] = self.errors[-1] + (self.debug_sql_stream.read(),)
def addFailure(self, test, err):
super(DebugSQLTextTestResult, self).addFailure(test, err)
self.debug_sql_stream.seek(0)
self.failures[-1] = self.failures[-1] + (self.debug_sql_stream.read(),)
def printErrorList(self, flavour, errors):
for test, err, sql_debug in errors:
self.stream.writeln(self.separator1)
self.stream.writeln("%s: %s" % (flavour, self.getDescription(test)))
self.stream.writeln(self.separator2)
self.stream.writeln("%s" % err)
self.stream.writeln(self.separator2)
self.stream.writeln("%s" % sql_debug)
class RemoteTestResult(object):
"""
Record information about which tests have succeeded and which have failed.
The sole purpose of this class is to record events in the child processes
so they can be replayed in the master process. As a consequence it doesn't
inherit unittest.TestResult and doesn't attempt to implement all its API.
The implementation matches the unpythonic coding style of unittest2.
"""
def __init__(self):
self.events = []
self.failfast = False
self.shouldStop = False
self.testsRun = 0
@property
def test_index(self):
return self.testsRun - 1
def check_picklable(self, test, err):
# Ensure that sys.exc_info() tuples are picklable. This displays a
# clear multiprocessing.pool.RemoteTraceback generated in the child
# process instead of a multiprocessing.pool.MaybeEncodingError, making
# the root cause easier to figure out for users who aren't familiar
# with the multiprocessing module. Since we're in a forked process,
# our best chance to communicate with them is to print to stdout.
try:
pickle.dumps(err)
except Exception as exc:
original_exc_txt = repr(err[1])
original_exc_txt = textwrap.fill(original_exc_txt, 75, initial_indent=' ', subsequent_indent=' ')
pickle_exc_txt = repr(exc)
pickle_exc_txt = textwrap.fill(pickle_exc_txt, 75, initial_indent=' ', subsequent_indent=' ')
if tblib is None:
print("""
{} failed:
{}
Unfortunately, tracebacks cannot be pickled, making it impossible for the
parallel test runner to handle this exception cleanly.
In order to see the traceback, you should install tblib:
pip install tblib
""".format(test, original_exc_txt))
else:
print("""
{} failed:
{}
Unfortunately, the exception it raised cannot be pickled, making it impossible
for the parallel test runner to handle it cleanly.
Here's the error encountered while trying to pickle the exception:
{}
You should re-run this test without the --parallel option to reproduce the
failure and get a correct traceback.
""".format(test, original_exc_txt, pickle_exc_txt))
raise
def stop_if_failfast(self):
if self.failfast:
self.stop()
def stop(self):
self.shouldStop = True
def startTestRun(self):
self.events.append(('startTestRun',))
def stopTestRun(self):
self.events.append(('stopTestRun',))
def startTest(self, test):
self.testsRun += 1
self.events.append(('startTest', self.test_index))
def stopTest(self, test):
self.events.append(('stopTest', self.test_index))
def addError(self, test, err):
self.check_picklable(test, err)
self.events.append(('addError', self.test_index, err))
self.stop_if_failfast()
def addFailure(self, test, err):
self.check_picklable(test, err)
self.events.append(('addFailure', self.test_index, err))
self.stop_if_failfast()
def addSubTest(self, test, subtest, err):
raise NotImplementedError("subtests aren't supported at this time")
def addSuccess(self, test):
self.events.append(('addSuccess', self.test_index))
def addSkip(self, test, reason):
self.events.append(('addSkip', self.test_index, reason))
def addExpectedFailure(self, test, err):
# If tblib isn't installed, pickling the traceback will always fail.
# However we don't want tblib to be required for running the tests
# when they pass or fail as expected. Drop the traceback when an
# expected failure occurs.
if tblib is None:
err = err[0], err[1], None
self.check_picklable(test, err)
self.events.append(('addExpectedFailure', self.test_index, err))
def addUnexpectedSuccess(self, test):
self.events.append(('addUnexpectedSuccess', self.test_index))
self.stop_if_failfast()
class RemoteTestRunner(object):
"""
Run tests and record everything but don't display anything.
The implementation matches the unpythonic coding style of unittest2.
"""
resultclass = RemoteTestResult
def __init__(self, failfast=False, resultclass=None):
self.failfast = failfast
if resultclass is not None:
self.resultclass = resultclass
def run(self, test):
result = self.resultclass()
unittest.registerResult(result)
result.failfast = self.failfast
test(result)
return result
def default_test_processes():
"""
Default number of test processes when using the --parallel option.
"""
# The current implementation of the parallel test runner requires
# multiprocessing to start subprocesses with fork().
# On Python 3.4+: if multiprocessing.get_start_method() != 'fork':
if not hasattr(os, 'fork'):
return 1
try:
return int(os.environ['DJANGO_TEST_PROCESSES'])
except KeyError:
return multiprocessing.cpu_count()
_worker_id = 0
def _init_worker(counter):
"""
Switch to databases dedicated to this worker.
This helper lives at module-level because of the multiprocessing module's
requirements.
"""
global _worker_id
with counter.get_lock():
counter.value += 1
_worker_id = counter.value
for alias in connections:
connection = connections[alias]
settings_dict = connection.creation.get_test_db_clone_settings(_worker_id)
# connection.settings_dict must be updated in place for changes to be
# reflected in django.db.connections. If the following line assigned
# connection.settings_dict = settings_dict, new threads would connect
# to the default database instead of the appropriate clone.
connection.settings_dict.update(settings_dict)
connection.close()
def _run_subsuite(args):
"""
Run a suite of tests with a RemoteTestRunner and return a RemoteTestResult.
This helper lives at module-level and its arguments are wrapped in a tuple
because of the multiprocessing module's requirements.
"""
subsuite_index, subsuite, failfast = args
runner = RemoteTestRunner(failfast=failfast)
result = runner.run(subsuite)
return subsuite_index, result.events
class ParallelTestSuite(unittest.TestSuite):
"""
Run a series of tests in parallel in several processes.
While the unittest module's documentation implies that orchestrating the
execution of tests is the responsibility of the test runner, in practice,
it appears that TestRunner classes are more concerned with formatting and
displaying test results.
Since there are fewer use cases for customizing TestSuite than TestRunner,
implementing parallelization at the level of the TestSuite improves
interoperability with existing custom test runners. A single instance of a
test runner can still collect results from all tests without being aware
that they have been run in parallel.
"""
# In case someone wants to modify these in a subclass.
init_worker = _init_worker
run_subsuite = _run_subsuite
def __init__(self, suite, processes, failfast=False):
self.subsuites = partition_suite_by_case(suite)
self.processes = processes
self.failfast = failfast
super(ParallelTestSuite, self).__init__()
def run(self, result):
"""
Distribute test cases across workers.
Return an identifier of each test case with its result in order to use
imap_unordered to show results as soon as they're available.
To minimize pickling errors when getting results from workers:
- pass back numeric indexes in self.subsuites instead of tests
- make tracebacks picklable with tblib, if available
Even with tblib, errors may still occur for dynamically created
exception classes such Model.DoesNotExist which cannot be unpickled.
"""
if tblib is not None:
tblib.pickling_support.install()
counter = multiprocessing.Value(ctypes.c_int, 0)
pool = multiprocessing.Pool(
processes=self.processes,
initializer=self.init_worker.__func__,
initargs=[counter])
args = [
(index, subsuite, self.failfast)
for index, subsuite in enumerate(self.subsuites)
]
test_results = pool.imap_unordered(self.run_subsuite.__func__, args)
while True:
if result.shouldStop:
pool.terminate()
break
try:
subsuite_index, events = test_results.next(timeout=0.1)
except multiprocessing.TimeoutError:
continue
except StopIteration:
pool.close()
break
tests = list(self.subsuites[subsuite_index])
for event in events:
event_name = event[0]
handler = getattr(result, event_name, None)
if handler is None:
continue
test = tests[event[1]]
args = event[2:]
handler(test, *args)
pool.join()
Loading ...