Repository URL to install this package:
# This file should be kept compatible with both Python 2.6 and Python >= 3.0.
from __future__ import division
from __future__ import print_function
"""
ccbench, a Python concurrency benchmark.
"""
import time
import os
import sys
import itertools
import threading
import subprocess
import socket
from optparse import OptionParser, SUPPRESS_HELP
import platform
# Compatibility
try:
xrange
except NameError:
xrange = range
try:
map = itertools.imap
except AttributeError:
pass
THROUGHPUT_DURATION = 2.0
LATENCY_PING_INTERVAL = 0.1
LATENCY_DURATION = 2.0
BANDWIDTH_PACKET_SIZE = 1024
BANDWIDTH_DURATION = 2.0
def task_pidigits():
"""Pi calculation (Python)"""
_map = map
_count = itertools.count
_islice = itertools.islice
def calc_ndigits(n):
# From http://shootout.alioth.debian.org/
def gen_x():
return _map(lambda k: (k, 4*k + 2, 0, 2*k + 1), _count(1))
def compose(a, b):
aq, ar, as_, at = a
bq, br, bs, bt = b
return (aq * bq,
aq * br + ar * bt,
as_ * bq + at * bs,
as_ * br + at * bt)
def extract(z, j):
q, r, s, t = z
return (q*j + r) // (s*j + t)
def pi_digits():
z = (1, 0, 0, 1)
x = gen_x()
while 1:
y = extract(z, 3)
while y != extract(z, 4):
z = compose(z, next(x))
y = extract(z, 3)
z = compose((10, -10*y, 0, 1), z)
yield y
return list(_islice(pi_digits(), n))
return calc_ndigits, (50, )
def task_regex():
"""regular expression (C)"""
# XXX this task gives horrendous latency results.
import re
# Taken from the `inspect` module
pat = re.compile(r'^(\s*def\s)|(.*(?<!\w)lambda(:|\s))|^(\s*@)', re.MULTILINE)
with open(__file__, "r") as f:
arg = f.read(2000)
def findall(s):
t = time.time()
try:
return pat.findall(s)
finally:
print(time.time() - t)
return pat.findall, (arg, )
def task_sort():
"""list sorting (C)"""
def list_sort(l):
l = l[::-1]
l.sort()
return list_sort, (list(range(1000)), )
def task_compress_zlib():
"""zlib compression (C)"""
import zlib
with open(__file__, "rb") as f:
arg = f.read(5000) * 3
def compress(s):
zlib.decompress(zlib.compress(s, 5))
return compress, (arg, )
def task_compress_bz2():
"""bz2 compression (C)"""
import bz2
with open(__file__, "rb") as f:
arg = f.read(3000) * 2
def compress(s):
bz2.compress(s)
return compress, (arg, )
def task_hashing():
"""SHA1 hashing (C)"""
import hashlib
with open(__file__, "rb") as f:
arg = f.read(5000) * 30
def compute(s):
hashlib.sha1(s).digest()
return compute, (arg, )
throughput_tasks = [task_pidigits, task_regex]
for mod in 'bz2', 'hashlib':
try:
globals()[mod] = __import__(mod)
except ImportError:
globals()[mod] = None
# For whatever reasons, zlib gives irregular results, so we prefer bz2 or
# hashlib if available.
# (NOTE: hashlib releases the GIL from 2.7 and 3.1 onwards)
if bz2 is not None:
throughput_tasks.append(task_compress_bz2)
elif hashlib is not None:
throughput_tasks.append(task_hashing)
else:
throughput_tasks.append(task_compress_zlib)
latency_tasks = throughput_tasks
bandwidth_tasks = [task_pidigits]
class TimedLoop:
def __init__(self, func, args):
self.func = func
self.args = args
def __call__(self, start_time, min_duration, end_event, do_yield=False):
step = 20
niters = 0
duration = 0.0
_time = time.time
_sleep = time.sleep
_func = self.func
_args = self.args
t1 = start_time
while True:
for i in range(step):
_func(*_args)
t2 = _time()
# If another thread terminated, the current measurement is invalid
# => return the previous one.
if end_event:
return niters, duration
niters += step
duration = t2 - start_time
if duration >= min_duration:
end_event.append(None)
return niters, duration
if t2 - t1 < 0.01:
# Minimize interference of measurement on overall runtime
step = step * 3 // 2
elif do_yield:
# OS scheduling of Python threads is sometimes so bad that we
# have to force thread switching ourselves, otherwise we get
# completely useless results.
_sleep(0.0001)
t1 = t2
def run_throughput_test(func, args, nthreads):
assert nthreads >= 1
# Warm up
func(*args)
results = []
loop = TimedLoop(func, args)
end_event = []
if nthreads == 1:
# Pure single-threaded performance, without any switching or
# synchronization overhead.
start_time = time.time()
results.append(loop(start_time, THROUGHPUT_DURATION,
end_event, do_yield=False))
return results
started = False
ready_cond = threading.Condition()
start_cond = threading.Condition()
ready = []
def run():
with ready_cond:
ready.append(None)
ready_cond.notify()
with start_cond:
while not started:
start_cond.wait()
results.append(loop(start_time, THROUGHPUT_DURATION,
end_event, do_yield=True))
threads = []
for i in range(nthreads):
threads.append(threading.Thread(target=run))
for t in threads:
t.setDaemon(True)
t.start()
# We don't want measurements to include thread startup overhead,
# so we arrange for timing to start after all threads are ready.
with ready_cond:
while len(ready) < nthreads:
ready_cond.wait()
with start_cond:
start_time = time.time()
started = True
start_cond.notify(nthreads)
for t in threads:
t.join()
return results
def run_throughput_tests(max_threads):
for task in throughput_tasks:
print(task.__doc__)
print()
func, args = task()
nthreads = 1
baseline_speed = None
while nthreads <= max_threads:
results = run_throughput_test(func, args, nthreads)
# Taking the max duration rather than average gives pessimistic
# results rather than optimistic.
speed = sum(r[0] for r in results) / max(r[1] for r in results)
print("threads=%d: %d" % (nthreads, speed), end="")
if baseline_speed is None:
print(" iterations/s.")
baseline_speed = speed
else:
print(" ( %d %%)" % (speed / baseline_speed * 100))
nthreads += 1
print()
LAT_END = "END"
def _sendto(sock, s, addr):
sock.sendto(s.encode('ascii'), addr)
def _recv(sock, n):
return sock.recv(n).decode('ascii')
def latency_client(addr, nb_pings, interval):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
_time = time.time
_sleep = time.sleep
def _ping():
_sendto(sock, "%r\n" % _time(), addr)
# The first ping signals the parent process that we are ready.
_ping()
# We give the parent a bit of time to notice.
_sleep(1.0)
for i in range(nb_pings):
_sleep(interval)
_ping()
_sendto(sock, LAT_END + "\n", addr)
finally:
sock.close()
def run_latency_client(**kwargs):
cmd_line = [sys.executable, '-E', os.path.abspath(__file__)]
cmd_line.extend(['--latclient', repr(kwargs)])
return subprocess.Popen(cmd_line) #, stdin=subprocess.PIPE,
#stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
def run_latency_test(func, args, nthreads):
# Create a listening socket to receive the pings. We use UDP which should
# be painlessly cross-platform.
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind(("127.0.0.1", 0))
addr = sock.getsockname()
interval = LATENCY_PING_INTERVAL
duration = LATENCY_DURATION
nb_pings = int(duration / interval)
results = []
threads = []
end_event = []
start_cond = threading.Condition()
started = False
if nthreads > 0:
# Warm up
func(*args)
results = []
loop = TimedLoop(func, args)
ready = []
ready_cond = threading.Condition()
def run():
with ready_cond:
ready.append(None)
ready_cond.notify()
with start_cond:
while not started:
start_cond.wait()
loop(start_time, duration * 1.5, end_event, do_yield=False)
for i in range(nthreads):
threads.append(threading.Thread(target=run))
for t in threads:
t.setDaemon(True)
t.start()
# Wait for threads to be ready
with ready_cond:
while len(ready) < nthreads:
ready_cond.wait()
# Run the client and wait for the first ping(s) to arrive before
Loading ...