Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

agriconnect / python3.8-examples   deb

Repository URL to install this package:

/ usr / share / doc / python3.8 / examples / ccbench / ccbench.py

# This file should be kept compatible with both Python 2.6 and Python >= 3.0.

from __future__ import division
from __future__ import print_function

"""
ccbench, a Python concurrency benchmark.
"""

import time
import os
import sys
import itertools
import threading
import subprocess
import socket
from optparse import OptionParser, SUPPRESS_HELP
import platform

# Compatibility
try:
    xrange
except NameError:
    xrange = range

try:
    map = itertools.imap
except AttributeError:
    pass


THROUGHPUT_DURATION = 2.0

LATENCY_PING_INTERVAL = 0.1
LATENCY_DURATION = 2.0

BANDWIDTH_PACKET_SIZE = 1024
BANDWIDTH_DURATION = 2.0


def task_pidigits():
    """Pi calculation (Python)"""
    _map = map
    _count = itertools.count
    _islice = itertools.islice

    def calc_ndigits(n):
        # From http://shootout.alioth.debian.org/
        def gen_x():
            return _map(lambda k: (k, 4*k + 2, 0, 2*k + 1), _count(1))

        def compose(a, b):
            aq, ar, as_, at = a
            bq, br, bs, bt = b
            return (aq * bq,
                    aq * br + ar * bt,
                    as_ * bq + at * bs,
                    as_ * br + at * bt)

        def extract(z, j):
            q, r, s, t = z
            return (q*j + r) // (s*j + t)

        def pi_digits():
            z = (1, 0, 0, 1)
            x = gen_x()
            while 1:
                y = extract(z, 3)
                while y != extract(z, 4):
                    z = compose(z, next(x))
                    y = extract(z, 3)
                z = compose((10, -10*y, 0, 1), z)
                yield y

        return list(_islice(pi_digits(), n))

    return calc_ndigits, (50, )

def task_regex():
    """regular expression (C)"""
    # XXX this task gives horrendous latency results.
    import re
    # Taken from the `inspect` module
    pat = re.compile(r'^(\s*def\s)|(.*(?<!\w)lambda(:|\s))|^(\s*@)', re.MULTILINE)
    with open(__file__, "r") as f:
        arg = f.read(2000)

    def findall(s):
        t = time.time()
        try:
            return pat.findall(s)
        finally:
            print(time.time() - t)
    return pat.findall, (arg, )

def task_sort():
    """list sorting (C)"""
    def list_sort(l):
        l = l[::-1]
        l.sort()

    return list_sort, (list(range(1000)), )

def task_compress_zlib():
    """zlib compression (C)"""
    import zlib
    with open(__file__, "rb") as f:
        arg = f.read(5000) * 3

    def compress(s):
        zlib.decompress(zlib.compress(s, 5))
    return compress, (arg, )

def task_compress_bz2():
    """bz2 compression (C)"""
    import bz2
    with open(__file__, "rb") as f:
        arg = f.read(3000) * 2

    def compress(s):
        bz2.compress(s)
    return compress, (arg, )

def task_hashing():
    """SHA1 hashing (C)"""
    import hashlib
    with open(__file__, "rb") as f:
        arg = f.read(5000) * 30

    def compute(s):
        hashlib.sha1(s).digest()
    return compute, (arg, )


throughput_tasks = [task_pidigits, task_regex]
for mod in 'bz2', 'hashlib':
    try:
        globals()[mod] = __import__(mod)
    except ImportError:
        globals()[mod] = None

# For whatever reasons, zlib gives irregular results, so we prefer bz2 or
# hashlib if available.
# (NOTE: hashlib releases the GIL from 2.7 and 3.1 onwards)
if bz2 is not None:
    throughput_tasks.append(task_compress_bz2)
elif hashlib is not None:
    throughput_tasks.append(task_hashing)
else:
    throughput_tasks.append(task_compress_zlib)

latency_tasks = throughput_tasks
bandwidth_tasks = [task_pidigits]


class TimedLoop:
    def __init__(self, func, args):
        self.func = func
        self.args = args

    def __call__(self, start_time, min_duration, end_event, do_yield=False):
        step = 20
        niters = 0
        duration = 0.0
        _time = time.time
        _sleep = time.sleep
        _func = self.func
        _args = self.args
        t1 = start_time
        while True:
            for i in range(step):
                _func(*_args)
            t2 = _time()
            # If another thread terminated, the current measurement is invalid
            # => return the previous one.
            if end_event:
                return niters, duration
            niters += step
            duration = t2 - start_time
            if duration >= min_duration:
                end_event.append(None)
                return niters, duration
            if t2 - t1 < 0.01:
                # Minimize interference of measurement on overall runtime
                step = step * 3 // 2
            elif do_yield:
                # OS scheduling of Python threads is sometimes so bad that we
                # have to force thread switching ourselves, otherwise we get
                # completely useless results.
                _sleep(0.0001)
            t1 = t2


def run_throughput_test(func, args, nthreads):
    assert nthreads >= 1

    # Warm up
    func(*args)

    results = []
    loop = TimedLoop(func, args)
    end_event = []

    if nthreads == 1:
        # Pure single-threaded performance, without any switching or
        # synchronization overhead.
        start_time = time.time()
        results.append(loop(start_time, THROUGHPUT_DURATION,
                            end_event, do_yield=False))
        return results

    started = False
    ready_cond = threading.Condition()
    start_cond = threading.Condition()
    ready = []

    def run():
        with ready_cond:
            ready.append(None)
            ready_cond.notify()
        with start_cond:
            while not started:
                start_cond.wait()
        results.append(loop(start_time, THROUGHPUT_DURATION,
                            end_event, do_yield=True))

    threads = []
    for i in range(nthreads):
        threads.append(threading.Thread(target=run))
    for t in threads:
        t.setDaemon(True)
        t.start()
    # We don't want measurements to include thread startup overhead,
    # so we arrange for timing to start after all threads are ready.
    with ready_cond:
        while len(ready) < nthreads:
            ready_cond.wait()
    with start_cond:
        start_time = time.time()
        started = True
        start_cond.notify(nthreads)
    for t in threads:
        t.join()

    return results

def run_throughput_tests(max_threads):
    for task in throughput_tasks:
        print(task.__doc__)
        print()
        func, args = task()
        nthreads = 1
        baseline_speed = None
        while nthreads <= max_threads:
            results = run_throughput_test(func, args, nthreads)
            # Taking the max duration rather than average gives pessimistic
            # results rather than optimistic.
            speed = sum(r[0] for r in results) / max(r[1] for r in results)
            print("threads=%d: %d" % (nthreads, speed), end="")
            if baseline_speed is None:
                print(" iterations/s.")
                baseline_speed = speed
            else:
                print(" ( %d %%)" % (speed / baseline_speed * 100))
            nthreads += 1
        print()


LAT_END = "END"

def _sendto(sock, s, addr):
    sock.sendto(s.encode('ascii'), addr)

def _recv(sock, n):
    return sock.recv(n).decode('ascii')

def latency_client(addr, nb_pings, interval):
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    try:
        _time = time.time
        _sleep = time.sleep
        def _ping():
            _sendto(sock, "%r\n" % _time(), addr)
        # The first ping signals the parent process that we are ready.
        _ping()
        # We give the parent a bit of time to notice.
        _sleep(1.0)
        for i in range(nb_pings):
            _sleep(interval)
            _ping()
        _sendto(sock, LAT_END + "\n", addr)
    finally:
        sock.close()

def run_latency_client(**kwargs):
    cmd_line = [sys.executable, '-E', os.path.abspath(__file__)]
    cmd_line.extend(['--latclient', repr(kwargs)])
    return subprocess.Popen(cmd_line) #, stdin=subprocess.PIPE,
                            #stdout=subprocess.PIPE, stderr=subprocess.STDOUT)

def run_latency_test(func, args, nthreads):
    # Create a listening socket to receive the pings. We use UDP which should
    # be painlessly cross-platform.
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    sock.bind(("127.0.0.1", 0))
    addr = sock.getsockname()

    interval = LATENCY_PING_INTERVAL
    duration = LATENCY_DURATION
    nb_pings = int(duration / interval)

    results = []
    threads = []
    end_event = []
    start_cond = threading.Condition()
    started = False
    if nthreads > 0:
        # Warm up
        func(*args)

        results = []
        loop = TimedLoop(func, args)
        ready = []
        ready_cond = threading.Condition()

        def run():
            with ready_cond:
                ready.append(None)
                ready_cond.notify()
            with start_cond:
                while not started:
                    start_cond.wait()
            loop(start_time, duration * 1.5, end_event, do_yield=False)

        for i in range(nthreads):
            threads.append(threading.Thread(target=run))
        for t in threads:
            t.setDaemon(True)
            t.start()
        # Wait for threads to be ready
        with ready_cond:
            while len(ready) < nthreads:
                ready_cond.wait()

    # Run the client and wait for the first ping(s) to arrive before
Loading ...