Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
gevent / src / gevent / tests / test__examples.py
Size: Mime:
"""
Test the contents of the ``examples/`` directory.

If an existing test in *this* directory named ``test__example_<fn>.py`` exists,
where ``<fn>`` is the base filename of an example file, it will not be tested
here.

Examples can specify that they need particular test resources to be enabled
by commenting (one per line) ``# gevent-test-requires-resource: <resource>``;
most commonly the resource will be ``network``. You can use this technique to specify
non-existant resources for things that should never be tested.
"""
import re
import sys
import os
import glob
import time
import unittest

import gevent.testing as greentest
from gevent.testing import util

this_dir = os.path.dirname(__file__)

def _find_files_to_ignore():
    old_dir = os.getcwd()
    try:
        os.chdir(this_dir)

        result = [x[14:] for x in glob.glob('test__example_*.py')]
        if greentest.PYPY and greentest.RUNNING_ON_APPVEYOR:
            # For some reason on Windows with PyPy, this times out,
            # when it should be very fast.
            result.append("processes.py")
    finally:
        os.chdir(old_dir)

    return result

default_time_range = (2, 10)
time_ranges = {
    'concurrent_download.py': (0, 30),
    'processes.py': (0, default_time_range[-1])
}

class _AbstractTestMixin(util.ExampleMixin):
    time_range = default_time_range
    filename = None

    def _check_resources(self):
        from gevent.testing import resources

        with open(os.path.join(self.cwd, self.filename), 'r') as f:
            contents = f.read()

        pattern = re.compile('^# gevent-test-requires-resource: (.*)$', re.MULTILINE)
        resources_needed = re.finditer(pattern, contents)
        for match in resources_needed:
            needed = contents[match.start(1):match.end(1)]
            resources.skip_without_resource(needed)

    def test_runs(self):
        self._check_resources()

        start = time.time()
        min_time, max_time = self.time_range
        if not util.run([sys.executable, '-u', self.filename],
                        timeout=max_time,
                        cwd=self.cwd,
                        quiet=True,
                        buffer_output=True,
                        nested=True,
                        setenv={'GEVENT_DEBUG': 'error'}):
            self.fail("Failed example: " + self.filename)
        else:
            took = time.time() - start
            self.assertGreaterEqual(took, min_time)

def _build_test_classes():
    result = {}
    try:
        example_dir = util.ExampleMixin().cwd
    except unittest.SkipTest:
        util.log("WARNING: No examples dir found", color='suboptimal-behaviour')
        return result

    ignore = _find_files_to_ignore()
    for filename in glob.glob(example_dir + '/*.py'):
        bn = os.path.basename(filename)
        if bn in ignore:
            continue

        tc = type(
            'Test_' + bn,
            (_AbstractTestMixin, greentest.TestCase),
            {
                'filename': bn,
                'time_range': time_ranges.get(bn, _AbstractTestMixin.time_range)
            }
        )
        result[tc.__name__] = tc
    return result

for k, v in _build_test_classes().items():
    locals()[k] = v

if __name__ == '__main__':
    greentest.main()