Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
gevent / src / gevent / testing / testrunner.py
Size: Mime:
#!/usr/bin/env python
from __future__ import print_function, absolute_import, division

import sys
import os
import glob
import traceback
import time
import importlib
from datetime import timedelta

from multiprocessing.pool import ThreadPool
from multiprocessing import cpu_count
from . import util
from .sysinfo import RUNNING_ON_CI
from .sysinfo import PYPY
from .sysinfo import PY2
from .sysinfo import RESOLVER_ARES
from .sysinfo import RUN_LEAKCHECKS
from . import six
from . import travis

# Import this while we're probably single-threaded/single-processed
# to try to avoid issues with PyPy 5.10.
# See https://bitbucket.org/pypy/pypy/issues/2769/systemerror-unexpected-internal-exception
try:
    __import__('_testcapi')
except (ImportError, OSError, IOError):
    # This can raise a wide variety of errors
    pass

TIMEOUT = 100
NWORKERS = int(os.environ.get('NWORKERS') or max(cpu_count() - 1, 4))
if NWORKERS > 10:
    NWORKERS = 10

if RUN_LEAKCHECKS:
    # Capturing the stats takes time, and we run each
    # test at least twice
    TIMEOUT = 200

DEFAULT_RUN_OPTIONS = {
    'timeout': TIMEOUT
}


if RUNNING_ON_CI:
    # Too many and we get spurious timeouts
    NWORKERS = 4




def _package_relative_filename(filename, package):
    if not os.path.isfile(filename) and package:
        # Ok, try to locate it as a module in the package
        package_dir = _dir_from_package_name(package)
        return os.path.join(package_dir, filename)
    return filename

def _dir_from_package_name(package):
    package_mod = importlib.import_module(package)
    package_dir = os.path.dirname(package_mod.__file__)
    return package_dir


class ResultCollector(object):

    def __init__(self):
        self.total = 0
        self.failed = {}
        self.passed = {}
        self.total_cases = 0
        self.total_skipped = 0

    def __iadd__(self, result):
        if not result:
            self.failed[result.name] = result #[cmd, kwargs]
        else:
            self.passed[result.name] = True
        self.total_cases += result.run_count
        self.total_skipped += result.skipped_count
        return self


class Runner(object):

    TIME_WAIT_REAP = 0.1
    TIME_WAIT_SPAWN = 0.05

    def __init__(self,
                 tests,
                 configured_failing_tests=(),
                 failfast=False,
                 quiet=False,
                 configured_run_alone_tests=()):
        self._tests = tests
        self._configured_failing_tests = configured_failing_tests
        self._failfast = failfast
        self._quiet = quiet
        self._configured_run_alone_tests = configured_run_alone_tests

        self.results = ResultCollector()
        self.results.total = len(self._tests)
        self._running_jobs = []

        self._worker_count = min(len(tests), NWORKERS) or 1

    def _run_one(self, cmd, **kwargs):
        kwargs['quiet'] = self._quiet
        result = util.run(cmd, **kwargs)
        if not result and self._failfast:
            sys.exit(1)
        self.results += result

    def _reap(self):
        "Clean up the list of running jobs, returning how many are still outstanding."
        for r in self._running_jobs[:]:
            if not r.ready():
                continue
            if r.successful():
                self._running_jobs.remove(r)
            else:
                r.get()
                sys.exit('Internal error in testrunner.py: %r' % (r, ))
        return len(self._running_jobs)

    def _reap_all(self):
        while self._reap() > 0:
            time.sleep(self.TIME_WAIT_REAP)

    def _spawn(self, pool, cmd, options):
        while True:
            if self._reap() < self._worker_count:
                job = pool.apply_async(self._run_one, (cmd, ), options or {})
                self._running_jobs.append(job)
                return

            time.sleep(self.TIME_WAIT_SPAWN)

    def __call__(self):
        util.log("Running tests in parallel with concurrency %s" % (self._worker_count,),)
        # Setting global state, in theory we can be used multiple times.
        # This is fine as long as we are single threaded and call these
        # sequentially.
        util.BUFFER_OUTPUT = self._worker_count > 1 or self._quiet

        start = time.time()
        try:
            self._run_tests()
        except KeyboardInterrupt:
            self._report(time.time() - start, exit=False)
            util.log('(partial results)\n')
            raise
        except:
            traceback.print_exc()
            raise

        self._reap_all()
        self._report(time.time() - start, exit=True)

    def _run_tests(self):
        "Runs the tests, produces no report."
        run_alone = []

        tests = self._tests
        pool = ThreadPool(self._worker_count)
        try:
            for cmd, options in tests:
                options = options or {}
                if matches(self._configured_run_alone_tests, cmd):
                    run_alone.append((cmd, options))
                else:
                    self._spawn(pool, cmd, options)
            pool.close()
            pool.join()

            if run_alone:
                util.log("Running tests marked standalone")
                for cmd, options in run_alone:
                    self._run_one(cmd, **options)
        except KeyboardInterrupt:
            try:
                util.log('Waiting for currently running to finish...')
                self._reap_all()
            except KeyboardInterrupt:
                pool.terminate()
                raise
        except:
            pool.terminate()
            raise

    def _report(self, elapsed_time, exit=False):
        results = self.results
        report(
            results.total, results.failed, results.passed,
            exit=exit,
            took=elapsed_time,
            configured_failing_tests=self._configured_failing_tests,
            total_cases=results.total_cases,
            total_skipped=results.total_skipped
        )


class TravisFoldingRunner(object):

    def __init__(self, runner, travis_fold_msg):
        self._runner = runner
        self._travis_fold_msg = travis_fold_msg
        self._travis_fold_name = str(int(time.time()))

        # A zope-style acquisition proxy would be convenient here.
        run_tests = runner._run_tests

        def _run_tests():
            self._begin_fold()
            try:
                run_tests()
            finally:
                self._end_fold()

        runner._run_tests = _run_tests

    def _begin_fold(self):
        travis.fold_start(self._travis_fold_name,
                          self._travis_fold_msg)

    def _end_fold(self):
        travis.fold_end(self._travis_fold_name)

    def __call__(self):
        return self._runner()

def discover(
        tests=None, ignore_files=None,
        ignored=(), coverage=False,
        package=None,
        configured_ignore_coverage=(),
        configured_test_options=None,
):
    # pylint:disable=too-many-locals,too-many-branches
    configured_test_options = configured_test_options or {}
    olddir = os.getcwd()
    ignore = set(ignored or ())

    if ignore_files:
        ignore_files = ignore_files.split(',')
        for f in ignore_files:
            ignore.update(set(load_list_from_file(f, package)))

    if coverage:
        ignore.update(configured_ignore_coverage)

    if package:
        package_dir = _dir_from_package_name(package)
        # We need to glob relative names, our config is based on filenames still
        os.chdir(package_dir)

    if not tests:
        tests = set(glob.glob('test_*.py')) - set(['test_support.py'])
    else:
        tests = set(tests)

    if ignore:
        # Always ignore the designated list, even if tests were specified
        # on the command line. This fixes a nasty interaction with test__threading_vs_settrace.py
        # being run under coverage when 'grep -l subprocess test*py' is used to list the tests
        # to run.
        tests -= ignore
    tests = sorted(tests)

    to_process = []
    to_import = []

    for filename in tests:
        # Support either 'gevent.tests.foo' or 'gevent/tests/foo.py'
        if filename.startswith('gevent.tests'):
            # XXX: How does this interact with 'package'? Probably not well
            qualified_name = module_name = filename
            filename = filename[len('gevent.tests') + 1:]
            filename = filename.replace('.', os.sep) + '.py'
        else:
            module_name = os.path.splitext(filename)[0]
            qualified_name = package + '.' + module_name if package else module_name
        with open(os.path.abspath(filename), 'rb') as f:
            # Some of the test files (e.g., test__socket_dns) are
            # UTF8 encoded. Depending on the environment, Python 3 may
            # try to decode those as ASCII, which fails with UnicodeDecodeError.
            # Thus, be sure to open and compare in binary mode.
            # Open the absolute path to make errors more clear,
            # but we can't store the absolute path, our configuration is based on
            # relative file names.
            contents = f.read()
        if b'TESTRUNNER' in contents: # test__monkey_patching.py
            # XXX: Rework this to avoid importing.
            to_import.append(qualified_name)
        else:
            cmd = [sys.executable, '-u']
            if PYPY and PY2:
                # Doesn't seem to be an env var for this
                cmd.extend(('-X', 'track-resources'))
            if package:
                # Using a package is the best way to work with coverage 5
                # when we specify 'source = <package>'
                cmd.append('-m' + qualified_name)
            else:
                cmd.append(filename)

            options = DEFAULT_RUN_OPTIONS.copy()
            options.update(configured_test_options.get(filename, {}))
            to_process.append((cmd, options))

    os.chdir(olddir)
    # When we actually execute, do so from the original directory,
    # this helps find setup.py
    for qualified_name in to_import:
        module = importlib.import_module(qualified_name)
        for cmd, options in module.TESTRUNNER():
            if remove_options(cmd)[-1] in ignore:
                continue
            to_process.append((cmd, options))

    return to_process


def remove_options(lst):
    return [x for x in lst if x and not x.startswith('-')]

def load_list_from_file(filename, package):
    result = []
    if filename:
        with open(_package_relative_filename(filename, package)) as f:
            for x in f:
                x = x.split('#', 1)[0].strip()
                if x:
                    result.append(x)
    return result


def matches(possibilities, command, include_flaky=True):
    if isinstance(command, list):
        command = ' '.join(command)
    for line in possibilities:
        if not include_flaky and line.startswith('FLAKY '):
            continue
        line = line.replace('FLAKY ', '')
        # Our configs are still mostly written in terms of file names,
        # but the non-monkey tests are now using package names.
        # Strip off '.py' from filenames to see if we match a module.
        # XXX: This could be much better. Our command needs better structure.
        if command.endswith(' ' + line) or command.endswith(line.replace(".py", '')):
            return True
    return False


def format_seconds(seconds):
    if seconds < 20:
        return '%.1fs' % seconds
    seconds = str(timedelta(seconds=round(seconds)))
    if seconds.startswith('0:'):
        seconds = seconds[2:]
    return seconds


def report(total, failed, passed, exit=True, took=None,
           configured_failing_tests=(),
           total_cases=0, total_skipped=0):
    # pylint:disable=redefined-builtin,too-many-branches,too-many-locals
    runtimelog = util.runtimelog # XXX: Global state!
    if runtimelog:
        util.log('\nLongest-running tests:')
        runtimelog.sort()
        length = len('%.1f' % -runtimelog[0][0])
        frmt = '%' + str(length) + '.1f seconds: %s'
        for delta, name in runtimelog[:5]:
            util.log(frmt, -delta, name)
    if took:
        took = ' in %s' % format_seconds(took)
    else:
        took = ''

    failed_expected = []
    failed_unexpected = []
    passed_unexpected = []

    for name in passed:
        if matches(configured_failing_tests, name, include_flaky=False):
            passed_unexpected.append(name)

    if passed_unexpected:
        util.log('\n%s/%s unexpected passes', len(passed_unexpected), total, color='error')
        print_list(passed_unexpected)

    if failed:
        util.log('\n%s/%s tests failed%s', len(failed), total, took)

        for name in failed:
            if matches(configured_failing_tests, name, include_flaky=True):
                failed_expected.append(name)
            else:
                failed_unexpected.append(name)

        if failed_expected:
            util.log('\n%s/%s expected failures', len(failed_expected), total)
            print_list(failed_expected)

        if failed_unexpected:
            util.log('\n%s/%s unexpected failures', len(failed_unexpected), total, color='error')
            print_list(failed_unexpected)
    else:
        util.log(
            '\nRan %s tests%s in %s files%s',
            total_cases,
            util._colorize('skipped', " (skipped=%d)" % total_skipped) if total_skipped else '',
            total,
            took,
        )

    if exit:
        if failed_unexpected:
            sys.exit(min(100, len(failed_unexpected)))
        if passed_unexpected:
            sys.exit(101)
        if total <= 0:
            sys.exit('No tests found.')


def print_list(lst):
    for name in lst:
        util.log(' - %s', name)

def _setup_environ(debug=False):
    if ('PYTHONWARNINGS' not in os.environ
            and (not sys.warnoptions
                 # Python 3.7 goes from [] to ['default'] for nothing
                 or sys.warnoptions == ['default'])):

        # action:message:category:module:line
        os.environ['PYTHONWARNINGS'] = ','.join([
            # Enable default warnings such as ResourceWarning.
            'default',
            # On Python 3[.6], the system site.py module has
            # "open(fullname, 'rU')" which produces the warning that
            # 'U' is deprecated, so ignore warnings from site.py
            'ignore:::site:',
            # pkgutil on Python 2 complains about missing __init__.py
            'ignore:::pkgutil',
            # importlib/_bootstrap.py likes to spit out "ImportWarning:
            # can't resolve package from __spec__ or __package__, falling
            # back on __name__ and __path__". I have no idea what that means, but it seems harmless
            # and is annoying.
            'ignore:::importlib._bootstrap:',
            'ignore:::importlib._bootstrap_external:',
            # importing ABCs from collections, not collections.abc
            'ignore:::pkg_resources._vendor.pyparsing:',
            'ignore:::dns.namedict:',
            # dns.hash itself is being deprecated, importing it raises the warning;
            # we don't import it, but dnspython still does
            'ignore:::dns.hash:',
        ])

    if 'PYTHONFAULTHANDLER' not in os.environ:
        os.environ['PYTHONFAULTHANDLER'] = 'true'

    if 'GEVENT_DEBUG' not in os.environ and debug:
        os.environ['GEVENT_DEBUG'] = 'debug'

    if 'PYTHONTRACEMALLOC' not in os.environ:
        os.environ['PYTHONTRACEMALLOC'] = '10'

    if 'PYTHONDEVMODE' not in os.environ:
        # Python 3.7
        os.environ['PYTHONDEVMODE'] = '1'

    if 'PYTHONMALLOC' not in os.environ:
        # Python 3.6
        os.environ['PYTHONMALLOC'] = 'debug'



def main():
    # pylint:disable=too-many-locals,too-many-statements
    import argparse
    parser = argparse.ArgumentParser()
    parser.add_argument('--ignore')
    parser.add_argument('--discover', action='store_true')
    parser.add_argument('--full', action='store_true')
    parser.add_argument('--config', default='known_failures.py')
    parser.add_argument('--failfast', action='store_true')
    parser.add_argument("--coverage", action="store_true")
    parser.add_argument("--quiet", action="store_true", default=True)
    parser.add_argument("--verbose", action="store_false", dest='quiet')
    parser.add_argument("--debug", action="store_true", default=False)
    parser.add_argument("--package", default="gevent.tests")
    parser.add_argument("--travis-fold", metavar="MSG",
                        help="Emit Travis CI log fold markers around the output.")
    parser.add_argument('tests', nargs='*')
    options = parser.parse_args()

    # Set this before any test imports in case of 'from .util import QUIET';
    # not that this matters much because we spawn tests in subprocesses,
    # it's the environment setting that matters
    util.QUIET = options.quiet
    if 'GEVENTTEST_QUIET' not in os.environ:
        os.environ['GEVENTTEST_QUIET'] = str(options.quiet)

    FAILING_TESTS = []
    IGNORED_TESTS = []
    RUN_ALONE = []
    TEST_FILE_OPTIONS = {}

    coverage = False
    if options.coverage or os.environ.get("GEVENTTEST_COVERAGE"):
        if PYPY and RUNNING_ON_CI:
            print("Ignoring coverage option on PyPy on CI; slow")
        else:
            coverage = True
            os.environ['COVERAGE_PROCESS_START'] = os.path.abspath(".coveragerc")
            if PYPY:
                os.environ['COVERAGE_PROCESS_START'] = os.path.abspath(".coveragerc-pypy")
            this_dir = os.path.dirname(__file__)
            site_dir = os.path.join(this_dir, 'coveragesite')
            site_dir = os.path.abspath(site_dir)
            os.environ['PYTHONPATH'] = site_dir + os.pathsep + os.environ.get("PYTHONPATH", "")
            # We change directory often, use an absolute path to keep all the
            # coverage files (which will have distinct suffixes because of parallel=true in .coveragerc
            # in this directory; makes them easier to combine and use with coverage report)
            os.environ['COVERAGE_FILE'] = os.path.abspath(".") + os.sep + ".coverage"
            print("Enabling coverage to", os.environ['COVERAGE_FILE'], "with site", site_dir)

    _setup_environ(debug=options.debug)

    if options.config:
        config = {}
        options.config = _package_relative_filename(options.config, options.package)
        with open(options.config) as f:
            config_data = f.read()
        six.exec_(config_data, config)
        FAILING_TESTS = config['FAILING_TESTS']
        IGNORED_TESTS = config['IGNORED_TESTS']
        RUN_ALONE = config['RUN_ALONE']
        TEST_FILE_OPTIONS = config['TEST_FILE_OPTIONS']
        IGNORE_COVERAGE = config['IGNORE_COVERAGE']


    tests = discover(
        options.tests,
        ignore_files=options.ignore,
        ignored=IGNORED_TESTS,
        coverage=coverage,
        package=options.package,
        configured_ignore_coverage=IGNORE_COVERAGE,
        configured_test_options=TEST_FILE_OPTIONS,
    )
    if options.discover:
        for cmd, options in tests:
            print(util.getname(cmd, env=options.get('env'), setenv=options.get('setenv')))
        print('%s tests found.' % len(tests))
    else:
        if PYPY and RESOLVER_ARES:
            # XXX: Add a way to force these.
            print("Not running tests on pypy with c-ares; not a supported configuration")
            return
        if options.package:
            # Put this directory on the path so relative imports work.
            package_dir = _dir_from_package_name(options.package)
            os.environ['PYTHONPATH'] = os.environ.get('PYTHONPATH', "") + os.pathsep + package_dir
        runner = Runner(
            tests,
            configured_failing_tests=FAILING_TESTS,
            failfast=options.failfast,
            quiet=options.quiet,
            configured_run_alone_tests=RUN_ALONE,
        )

        if options.travis_fold:
            runner = TravisFoldingRunner(runner, options.travis_fold)

        runner()


if __name__ == '__main__':
    main()