Repository URL to install this package:
|
Version:
1.7.1 ▾
|
# To run this script, run
# `python studentized_range_mpmath_ref.py`
# in the "scipy/stats/tests/" directory
# This script generates a JSON file "./data/studentized_range_mpmath_ref.json"
# that is used to compare the accuracy of `studentized_range` functions against
# precise (20 DOP) results generated using `mpmath`.
# Equations in this file have been taken from
# https://en.wikipedia.org/wiki/Studentized_range_distribution
# and have been checked against the following reference:
# Lund, R. E., and J. R. Lund. "Algorithm AS 190: Probabilities and
# Upper Quantiles for the Studentized Range." Journal of the Royal
# Statistical Society. Series C (Applied Statistics), vol. 32, no. 2,
# 1983, pp. 204-210. JSTOR, www.jstor.org/stable/2347300. Accessed 18
# Feb. 2021.
# Note: I would have prefered to use pickle rather than JSON, but -
# due to security concerns - decided against it.
import itertools
from collections import namedtuple
import json
import time
import os
from multiprocessing import Pool, cpu_count
from mpmath import gamma, pi, sqrt, quad, inf, mpf, mp
from mpmath import npdf as phi
from mpmath import ncdf as Phi
results_filepath = "data/studentized_range_mpmath_ref.json"
num_pools = max(cpu_count() - 1, 1)
MPResult = namedtuple("MPResult", ["src_case", "mp_result"])
CdfCase = namedtuple("CdfCase",
["q", "k", "v", "expected_atol", "expected_rtol"])
MomentCase = namedtuple("MomentCase",
["m", "k", "v", "expected_atol", "expected_rtol"])
# Load previously generated JSON results, or init a new dict if none exist
if os.path.isfile(results_filepath):
res_dict = json.load(open(results_filepath, mode="r"))
else:
res_dict = dict()
# Frame out data structure. Store data with the function type as a top level
# key to allow future expansion
res_dict["COMMENT"] = ("!!!!!! THIS FILE WAS AUTOGENERATED BY RUNNING "
"`python studentized_range_mpmath_ref.py` !!!!!!")
res_dict.setdefault("cdf_data", [])
res_dict.setdefault("pdf_data", [])
res_dict.setdefault("moment_data", [])
general_atol, general_rtol = 1e-11, 1e-11
mp.dps = 24
cp_q = [0.1, 1, 4, 10]
cp_k = [3, 10, 20]
cp_nu = [3, 10, 20, 50, 100, 120]
cdf_pdf_cases = [
CdfCase(*case,
general_atol,
general_rtol)
for case in
itertools.product(cp_q, cp_k, cp_nu)
]
mom_atol, mom_rtol = 1e-9, 1e-9
# These are EXTREMELY slow - Multiple days each in worst case.
moment_cases = [
MomentCase(i, 3, 10, mom_atol, mom_rtol)
for i in range(5)
]
def write_data():
"""Writes the current res_dict to the target JSON file"""
with open(results_filepath, mode="w") as f:
json.dump(res_dict, f, indent=2)
def to_dict(named_tuple):
"""Converts a namedtuple to a dict"""
return dict(named_tuple._asdict())
def mp_res_to_dict(mp_result):
"""Formats an MPResult namedtuple into a dict for JSON dumping"""
return {
"src_case": to_dict(mp_result.src_case),
# np assert can't handle mpf, so take the accuracy hit here.
"mp_result": float(mp_result.mp_result)
}
def cdf_mp(q, k, nu):
"""Straightforward implementation of studentized range CDF"""
q, k, nu = mpf(q), mpf(k), mpf(nu)
def inner(s, z):
return phi(z) * (Phi(z + q * s) - Phi(z)) ** (k - 1)
def outer(s, z):
return s ** (nu - 1) * phi(sqrt(nu) * s) * inner(s, z)
def whole(s, z):
return (sqrt(2 * pi) * k * nu ** (nu / 2)
/ (gamma(nu / 2) * 2 ** (nu / 2 - 1)) * outer(s, z))
res = quad(whole, [0, inf], [-inf, inf],
method="gauss-legendre", maxdegree=10)
return res
def pdf_mp(q, k, nu):
"""Straightforward implementation of studentized range PDF"""
q, k, nu = mpf(q), mpf(k), mpf(nu)
def inner(s, z):
return phi(z + q * s) * phi(z) * (Phi(z + q * s) - Phi(z)) ** (k - 2)
def outer(s, z):
return s ** nu * phi(sqrt(nu) * s) * inner(s, z)
def whole(s, z):
return (sqrt(2 * pi) * k * (k - 1) * nu ** (nu / 2)
/ (gamma(nu / 2) * 2 ** (nu / 2 - 1)) * outer(s, z))
res = quad(whole, [0, inf], [-inf, inf],
method="gauss-legendre", maxdegree=10)
return res
def moment_mp(m, k, nu):
"""Implementation of the studentized range moment"""
m, k, nu = mpf(m), mpf(k), mpf(nu)
def inner(q, s, z):
return phi(z + q * s) * phi(z) * (Phi(z + q * s) - Phi(z)) ** (k - 2)
def outer(q, s, z):
return s ** nu * phi(sqrt(nu) * s) * inner(q, s, z)
def pdf(q, s, z):
return (sqrt(2 * pi) * k * (k - 1) * nu ** (nu / 2)
/ (gamma(nu / 2) * 2 ** (nu / 2 - 1)) * outer(q, s, z))
def whole(q, s, z):
return q ** m * pdf(q, s, z)
res = quad(whole, [0, inf], [0, inf], [-inf, inf],
method="gauss-legendre", maxdegree=10)
return res
def result_exists(set_key, case):
"""Searches the results dict for a result in the set that matches a case.
Returns True if such a case exists."""
if set_key not in res_dict:
raise ValueError(f"{set_key} not present in data structure!")
case_dict = to_dict(case)
existing_res = list(filter(
lambda res: res["src_case"] == case_dict, # dict comparison
res_dict[set_key]))
return len(existing_res) > 0
def run(case, run_lambda, set_key, index=0, total_cases=0):
"""Runs the single passed case, returning an mp dictionary and index"""
t_start = time.perf_counter()
res = run_lambda(case)
print(f"Finished {index + 1}/{total_cases} in batch. "
f"(Took {time.perf_counter() - t_start}s)")
return index, set_key, mp_res_to_dict(MPResult(case, res))
def write_result(res):
"""A callback for completed jobs. Inserts and writes a calculated result
to file."""
index, set_key, result_dict = res
res_dict[set_key].insert(index, result_dict)
write_data()
def run_cases(cases, run_lambda, set_key):
"""Runs an array of cases and writes to file"""
# Generate jobs to run from cases that do not have a result in
# the previously loaded JSON.
job_arg = [(case, run_lambda, set_key, index, len(cases))
for index, case in enumerate(cases)
if not result_exists(set_key, case)]
print(f"{len(cases) - len(job_arg)}/{len(cases)} cases won't be "
f"calculated because their results already exist.")
jobs = []
pool = Pool(num_pools)
# Run all using multiprocess
for case in job_arg:
jobs.append(pool.apply_async(run, args=case, callback=write_result))
pool.close()
pool.join()
def run_pdf(case):
return pdf_mp(case.q, case.k, case.v)
def run_cdf(case):
return cdf_mp(case.q, case.k, case.v)
def run_moment(case):
return moment_mp(case.m, case.k, case.v)
def main():
t_start = time.perf_counter()
total_cases = 2 * len(cdf_pdf_cases) + len(moment_cases)
print(f"Processing {total_cases} test cases")
print(f"Running 1st batch ({len(cdf_pdf_cases)} PDF cases). "
f"These take about 30s each.")
run_cases(cdf_pdf_cases, run_pdf, "pdf_data")
print(f"Running 2nd batch ({len(cdf_pdf_cases)} CDF cases). "
f"These take about 30s each.")
run_cases(cdf_pdf_cases, run_cdf, "cdf_data")
print(f"Running 3rd batch ({len(moment_cases)} moment cases). "
f"These take about anywhere from a few hours to days each.")
run_cases(moment_cases, run_moment, "moment_data")
print(f"Test data generated in {time.perf_counter() - t_start}s")
if __name__ == "__main__":
main()