-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathquantization.py
More file actions
194 lines (150 loc) · 5.34 KB
/
Copy pathquantization.py
File metadata and controls
194 lines (150 loc) · 5.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
from __future__ import annotations
import numpy as np
import cvxpy as cp
from config_and_utils import Array, MethodName, QuantConfig
def uniform_codebook(levels: int) -> tuple[Array, Array]:
if levels < 2:
raise ValueError("levels must be >= 2")
codebook = np.linspace(-1.0, 1.0, levels, dtype=np.float64)
if levels == 2:
partition = np.array([0.0], dtype=np.float64)
else:
partition = (codebook[:-1] + codebook[1:]) / 2.0
return partition, codebook
def quantize_scalar(x: float, levels: int) -> float:
partition, codebook = uniform_codebook(levels)
idx = int(np.searchsorted(partition, x, side="right"))
idx = min(max(idx, 0), len(codebook) - 1)
return float(codebook[idx])
def scale_vector_for_quantization(w: Array) -> tuple[Array, float]:
scale = float(np.max(w) - np.min(w))
if scale <= 1e-12:
return w.copy(), 1.0
return w / scale, scale
def first_order_sigma_delta_matrix(n: int) -> Array:
"""
First-order Sigma-Delta / error-feedback matrix.
With sub-diagonal -1, the additive model becomes
w_q[i] = w[i] - q[i-1] + q[i]
which matches the usual first-order noise-shaping term (1 - z^{-1})Q.
"""
G = np.eye(n, dtype=np.float64)
for i in range(1, n):
G[i, i - 1] = -1.0
return G
def sigma_delta_quantize_scaled(w_bar: Array, levels: int, G: Array) -> tuple[Array, Array]:
"""
Quantize a scaled vector w_bar using a fixed lower-triangular error-feedback matrix G.
Returns
-------
wq_bar : quantized vector in the same scale as w_bar
q : local residuals in the same scale as w_bar, satisfying
wq_bar = w_bar + G q
"""
n = w_bar.shape[0]
wq_bar = np.zeros_like(w_bar)
q = np.zeros_like(w_bar)
for i in range(n):
b = w_bar[i] + float(G[i, :i] @ q[:i])
wq_bar[i] = quantize_scalar(b, levels)
q[i] = wq_bar[i] - b
return wq_bar, q
def direct_quantize_vector(w: Array, levels: int) -> Array:
w_bar, scale = scale_vector_for_quantization(w)
wq_bar = np.array([quantize_scalar(v, levels) for v in w_bar], dtype=np.float64)
return wq_bar * scale
def sigma_delta_first_order_vector(w: Array, levels: int) -> tuple[Array, Array, Array]:
"""
Fixed first-order Sigma-Delta baseline.
Returns
-------
wq : quantized vector in original scale
q_orig : residual vector in original scale, so that wq = w + G q_orig
G : fixed first-order Sigma-Delta matrix
"""
w_bar, scale = scale_vector_for_quantization(w)
G = first_order_sigma_delta_matrix(w.shape[0])
wq_bar, q = sigma_delta_quantize_scaled(w_bar, levels, G)
q_orig = scale * q
return wq_bar * scale, q_orig, G
def optimize_g_cvx_once(
X: Array,
w: Array,
levels: int,
solver: str = "SCS",
) -> tuple[Array, Array, Array]:
"""
One-shot constrained optimization that matches the MATLAB/CVX prototype.
Solve
minimize ||X V||_F
subject to
V[i,i] = xi,
V[i,j] = 0 for j > i,
||V[i,:i]||_1 <= M*xi - (M-1),
xi >= 0.
Then recover
A = 1 / xi,
G = V * A,
C = A / ||w||_inf.
Finally quantize C*w with the fixed G and map the result back to the
original scale.
"""
n = X.shape[1]
if X.shape[0] == 0 or X.shape[1] == 0:
raise ValueError("X must be non-empty")
if w.shape[0] != n:
raise ValueError("w and X have incompatible shapes")
w_inf = float(np.max(np.abs(w)))
if w_inf <= 1e-12:
G = np.eye(n, dtype=np.float64)
q = np.zeros(n, dtype=np.float64)
return w.copy(), q, G
V = cp.Variable((n, n))
xi = cp.Variable(nonneg=True)
constraints: list[cp.Constraint] = []
for i in range(n):
constraints.append(V[i, i] == xi)
if i + 1 < n:
constraints.append(V[i, i + 1 :] == 0)
if i == 0:
constraints.append(levels * xi - (levels - 1) >= 0)
else:
constraints.append(cp.norm1(V[i, :i]) <= levels * xi - (levels - 1))
objective = cp.Minimize(cp.norm(X @ V, "fro"))
problem = cp.Problem(objective, constraints)
problem.solve(solver=solver, verbose=False)
if problem.status not in ("optimal", "optimal_inaccurate"):
raise RuntimeError(f"CVXPY failed: status={problem.status}")
xi_val = float(xi.value)
if xi_val <= 1e-12:
raise RuntimeError(f"xi is too small: {xi_val}")
A = 1.0 / xi_val
G = np.array(V.value, dtype=np.float64) * A
# Numerical cleanup after the solver.
G = np.tril(G)
np.fill_diagonal(G, 1.0)
# Match the MATLAB prototype: quantize C*w, then map back.
C = A / w_inf
w_bar = C * w
wq_bar, q = sigma_delta_quantize_scaled(w_bar, levels, G)
wq = wq_bar / C
q_orig = q / C
return wq, q_orig, G
def quantize_vector(
w: Array,
X: Array,
method: MethodName,
cfg: QuantConfig,
) -> tuple[Array, Array | None, Array | None]:
if method == "direct":
return direct_quantize_vector(w, cfg.levels), None, None
if method == "sigma_delta_1st":
return sigma_delta_first_order_vector(w, cfg.levels)
if method == "optimized_g":
return optimize_g_cvx_once(
X=X,
w=w,
levels=cfg.levels,
solver="SCS",
)
raise ValueError(f"Unknown method: {method}")