Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
pytest-benchmark / fixture.py
Size: Mime:
from __future__ import division
from __future__ import print_function

import cProfile
import gc
import pstats
import sys
import time
import traceback
from math import ceil

from .compat import INT
from .compat import XRANGE
from .timers import compute_timer_precision
from .utils import NameWrapper
from .utils import format_time

try:
    import statistics
except (ImportError, SyntaxError):
    statistics_error = traceback.format_exc()
    statistics = None
else:
    statistics_error = None
    from .stats import Metadata


class FixtureAlreadyUsed(Exception):
    pass


class BenchmarkFixture(object):
    _precisions = {}

    def __init__(self, node, disable_gc, timer, min_rounds, min_time, max_time, warmup, warmup_iterations,
                 calibration_precision, add_stats, logger, warner, disabled, cprofile, group=None):
        self.name = node.name
        self.fullname = node._nodeid
        self.disabled = disabled
        if hasattr(node, 'callspec'):
            self.param = node.callspec.id
            self.params = node.callspec.params
        else:
            self.param = None
            self.params = None
        self.group = group
        self.has_error = False
        self.extra_info = {}
        self.skipped = False

        self._disable_gc = disable_gc
        self._timer = timer.target
        self._min_rounds = min_rounds
        self._max_time = float(max_time)
        self._min_time = float(min_time)
        self._add_stats = add_stats
        self._calibration_precision = calibration_precision
        self._warmup = warmup and warmup_iterations
        self._logger = logger
        self._warner = warner
        self._cleanup_callbacks = []
        self._mode = None
        self.cprofile = cprofile
        self.cprofile_stats = None

    @property
    def enabled(self):
        return not self.disabled

    def _get_precision(self, timer):
        if timer in self._precisions:
            timer_precision = self._precisions[timer]
        else:
            timer_precision = self._precisions[timer] = compute_timer_precision(timer)
            self._logger.debug("")
            self._logger.debug("Computing precision for %s ... %ss." % (
                NameWrapper(timer), format_time(timer_precision)), blue=True, bold=True)
        return timer_precision

    def _make_runner(self, function_to_benchmark, args, kwargs):
        def runner(loops_range, timer=self._timer):
            gc_enabled = gc.isenabled()
            if self._disable_gc:
                gc.disable()
            tracer = sys.gettrace()
            sys.settrace(None)
            try:
                if loops_range:
                    start = timer()
                    for _ in loops_range:
                        function_to_benchmark(*args, **kwargs)
                    end = timer()
                    return end - start
                else:
                    start = timer()
                    result = function_to_benchmark(*args, **kwargs)
                    end = timer()
                    return end - start, result
            finally:
                sys.settrace(tracer)
                if gc_enabled:
                    gc.enable()

        return runner

    def _make_stats(self, iterations):
        bench_stats = Metadata(self, iterations=iterations, options={
            "disable_gc": self._disable_gc,
            "timer": self._timer,
            "min_rounds": self._min_rounds,
            "max_time": self._max_time,
            "min_time": self._min_time,
            "warmup": self._warmup,
        })
        self._add_stats(bench_stats)
        self.stats = bench_stats
        return bench_stats

    def __call__(self, function_to_benchmark, *args, **kwargs):
        if self._mode:
            self.has_error = True
            raise FixtureAlreadyUsed(
                "Fixture can only be used once. Previously it was used in %s mode." % self._mode)
        try:
            self._mode = 'benchmark(...)'
            return self._raw(function_to_benchmark, *args, **kwargs)
        except Exception:
            self.has_error = True
            raise

    def pedantic(self, target, args=(), kwargs=None, setup=None, rounds=1, warmup_rounds=0, iterations=1):
        if self._mode:
            self.has_error = True
            raise FixtureAlreadyUsed(
                "Fixture can only be used once. Previously it was used in %s mode." % self._mode)
        try:
            self._mode = 'benchmark.pedantic(...)'
            return self._raw_pedantic(target, args=args, kwargs=kwargs, setup=setup, rounds=rounds,
                                      warmup_rounds=warmup_rounds, iterations=iterations)
        except Exception:
            self.has_error = True
            raise

    def _raw(self, function_to_benchmark, *args, **kwargs):
        if not self.disabled:
            runner = self._make_runner(function_to_benchmark, args, kwargs)

            duration, iterations, loops_range = self._calibrate_timer(runner)

            # Choose how many time we must repeat the test
            rounds = int(ceil(self._max_time / duration))
            rounds = max(rounds, self._min_rounds)
            rounds = min(rounds, sys.maxsize)

            stats = self._make_stats(iterations)

            self._logger.debug("  Running %s rounds x %s iterations ..." % (rounds, iterations), yellow=True, bold=True)
            run_start = time.time()
            if self._warmup:
                warmup_rounds = min(rounds, max(1, int(self._warmup / iterations)))
                self._logger.debug("  Warmup %s rounds x %s iterations ..." % (warmup_rounds, iterations))
                for _ in XRANGE(warmup_rounds):
                    runner(loops_range)
            for _ in XRANGE(rounds):
                stats.update(runner(loops_range))
            self._logger.debug("  Ran for %ss." % format_time(time.time() - run_start), yellow=True, bold=True)
        if self.cprofile:
            profile = cProfile.Profile()
            function_result = profile.runcall(function_to_benchmark, *args, **kwargs)
            self.stats.cprofile_stats = pstats.Stats(profile)
        else:
            function_result = function_to_benchmark(*args, **kwargs)
        return function_result

    def _raw_pedantic(self, target, args=(), kwargs=None, setup=None, rounds=1, warmup_rounds=0, iterations=1):
        if kwargs is None:
            kwargs = {}

        has_args = bool(args or kwargs)

        if not isinstance(iterations, INT) or iterations < 1:
            raise ValueError("Must have positive int for `iterations`.")

        if not isinstance(rounds, INT) or rounds < 1:
            raise ValueError("Must have positive int for `rounds`.")

        if not isinstance(warmup_rounds, INT) or warmup_rounds < 0:
            raise ValueError("Must have positive int for `warmup_rounds`.")

        if iterations > 1 and setup:
            raise ValueError("Can't use more than 1 `iterations` with a `setup` function.")

        def make_arguments(args=args, kwargs=kwargs):
            if setup:
                maybe_args = setup()
                if maybe_args:
                    if has_args:
                        raise TypeError("Can't use `args` or `kwargs` if `setup` returns the arguments.")
                    args, kwargs = maybe_args
            return args, kwargs

        if self.disabled:
            args, kwargs = make_arguments()
            return target(*args, **kwargs)

        stats = self._make_stats(iterations)
        loops_range = XRANGE(iterations) if iterations > 1 else None
        for _ in XRANGE(warmup_rounds):
            args, kwargs = make_arguments()

            runner = self._make_runner(target, args, kwargs)
            runner(loops_range)

        for _ in XRANGE(rounds):
            args, kwargs = make_arguments()

            runner = self._make_runner(target, args, kwargs)
            if loops_range:
                duration = runner(loops_range)
            else:
                duration, result = runner(loops_range)
            stats.update(duration)

        if loops_range:
            args, kwargs = make_arguments()
            result = target(*args, **kwargs)

        if self.cprofile:
            profile = cProfile.Profile()
            args, kwargs = make_arguments()
            profile.runcall(target, *args, **kwargs)
            self.stats.cprofile_stats = pstats.Stats(profile)

        return result

    def weave(self, target, **kwargs):
        try:
            import aspectlib
        except ImportError as exc:
            raise ImportError(exc.args, "Please install aspectlib or pytest-benchmark[aspect]")

        def aspect(function):
            def wrapper(*args, **kwargs):
                return self(function, *args, **kwargs)

            return wrapper

        self._cleanup_callbacks.append(aspectlib.weave(target, aspect, **kwargs).rollback)

    patch = weave

    def _cleanup(self):
        while self._cleanup_callbacks:
            callback = self._cleanup_callbacks.pop()
            callback()
        if not self._mode and not self.skipped:
            self._logger.warn("Benchmark fixture was not used at all in this test!",
                              warner=self._warner, suspend=True)

    def _calibrate_timer(self, runner):
        timer_precision = self._get_precision(self._timer)
        min_time = max(self._min_time, timer_precision * self._calibration_precision)
        min_time_estimate = min_time * 5 / self._calibration_precision
        self._logger.debug("")
        self._logger.debug("  Calibrating to target round %ss; will estimate when reaching %ss "
                           "(using: %s, precision: %ss)." % (
                               format_time(min_time),
                               format_time(min_time_estimate),
                               NameWrapper(self._timer),
                               format_time(timer_precision)
                           ), yellow=True, bold=True)

        loops = 1
        while True:
            loops_range = XRANGE(loops)
            duration = runner(loops_range)
            if self._warmup:
                warmup_start = time.time()
                warmup_iterations = 0
                warmup_rounds = 0
                while time.time() - warmup_start < self._max_time and warmup_iterations < self._warmup:
                    duration = min(duration, runner(loops_range))
                    warmup_rounds += 1
                    warmup_iterations += loops
                self._logger.debug("    Warmup: %ss (%s x %s iterations)." % (
                    format_time(time.time() - warmup_start),
                    warmup_rounds, loops
                ))

            self._logger.debug("    Measured %s iterations: %ss." % (loops, format_time(duration)), yellow=True)
            if duration >= min_time:
                break

            if duration >= min_time_estimate:
                # coarse estimation of the number of loops
                loops = int(ceil(min_time * loops / duration))
                self._logger.debug("    Estimating %s iterations." % loops, green=True)
                if loops == 1:
                    # If we got a single loop then bail early - nothing to calibrate if the the
                    # test function is 100 times slower than the timer resolution.
                    loops_range = XRANGE(loops)
                    break
            else:
                loops *= 10
        return duration, loops, loops_range