import numpy as np
# Note we explicitly cast variables to np.float32 in a couple of places to avoid
# the default casting in Python often resuling in double precision and to make
# sure we're doing the same numerics as C++ code.
def param_search_greedy(x, bit_rate, n_bins=200, ratio=0.16):
xmin, xmax = np.min(x), np.max(x)
stepsize = (xmax - xmin) / np.float32(n_bins)
min_bins = np.float32(n_bins) * (np.float32(1) - np.float32(ratio))
xq, loss = _compress_uniform_simplified(x, bit_rate, xmin, xmax)
solutions = [] # [(left, right, loss)] # local optima solution
cur_min, cur_max, cur_loss = xmin, xmax, loss
thr = min_bins * stepsize
while cur_min + thr < cur_max:
# move left
xq, loss1 = _compress_uniform_simplified(
x, bit_rate, cur_min + stepsize, cur_max
)
# move right
xq, loss2 = _compress_uniform_simplified(
x, bit_rate, cur_min, cur_max - stepsize
)
if cur_loss < loss1 and cur_loss < loss2:
# found a local optima
solutions.append((cur_min, cur_max, cur_loss))
if loss1 < loss2:
cur_min, cur_max, cur_loss = cur_min + stepsize, cur_max, loss1
else:
cur_min, cur_max, cur_loss = cur_min, cur_max - stepsize, loss2
if len(solutions):
best = solutions[0]
for solution in solutions:
if solution[-1] < best[-1]:
best = solution
return best[0], best[1]
return xmin, xmax
def _compress_uniform_simplified(X, bit_rate, xmin, xmax, fp16_scale_bias=True):
# affine transform to put Xq in [0,2**bit_rate - 1]
# Xq = (2 ** bit_rate - 1) * (Xq - xmin) / data_range
if fp16_scale_bias:
xmin = xmin.astype(np.float16).astype(np.float32)
data_range = xmax - xmin
scale = np.where(
data_range == 0, np.float32(1), data_range / np.float32(2 ** bit_rate - 1)
)
if fp16_scale_bias:
scale = scale.astype(np.float16).astype(np.float32)
inverse_scale = np.float32(1) / scale
Xq = np.clip(np.round((X - xmin) * inverse_scale), 0, np.float32(2 ** bit_rate - 1))
Xq = Xq * scale + xmin
# Manually compute loss instead of using np.linalg.norm to use the same
# accumulation order used by C++ code
vlen = 8
loss_v = np.zeros(vlen).astype(np.float32)
for i in range(len(Xq) // vlen * vlen):
loss_v[i % vlen] += (X[i] - Xq[i]) * (X[i] - Xq[i])
loss = np.float32(0)
for i in range(vlen):
loss += loss_v[i]
for i in range(len(Xq) // vlen * vlen, len(Xq)):
loss += (X[i] - Xq[i]) * (X[i] - Xq[i])
loss = np.sqrt(loss)
return Xq, loss