Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pySC/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

"""

__version__ = "1.0.0"
__version__ = "1.1.0"

from .core.simulated_commissioning import SimulatedCommissioning
from .configuration.generation import generate_SC
Expand Down
13 changes: 9 additions & 4 deletions pySC/apps/measurements.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,13 @@

def orbit_correction(interface: AbstractInterface, response_matrix: ResponseMatrix, method='svd_cutoff',
parameter: Union[int,float] = 0, reference: Optional[np.ndarray] = None,
gain: float = 1, zerosum: bool = False, plane: Optional[Literal['H', 'V']] = None,
rf: bool = False, apply: bool = False):
gain: float = 1, virtual: bool = False, plane: Optional[Literal['H', 'V']] = None,
rf: bool = False, apply: bool = False, virtual_target: float = 0,
gain_rf: float = 1, zerosum: Optional[bool] = None):

if zerosum is not None:
logger.warning('`zerosum` argument in ResponseMatrix.solve is deprecated. Please use `virtual` instead.')
virtual = zerosum

correctors = response_matrix.input_names
assert correctors is not None, 'Corrector names are undefined in the response matrix'
Expand All @@ -29,7 +34,7 @@ def orbit_correction(interface: AbstractInterface, response_matrix: ResponseMatr
assert len(reference) == len(orbit), "Reference orbit has wrong length"
orbit -= reference

trim_list = -response_matrix.solve(orbit, method=method, parameter=parameter, zerosum=zerosum, plane=plane, rf=rf)
trim_list = -response_matrix.solve(orbit, method=method, parameter=parameter, virtual=virtual, plane=plane, rf=rf, virtual_target=virtual_target)

trims = {corr: trim for corr, trim in zip(correctors, trim_list, strict=False) if trim != 0}
# if rf is selected, trim_list will be larger than correctors by one element. The last element is the rf frequency.
Expand All @@ -43,7 +48,7 @@ def orbit_correction(interface: AbstractInterface, response_matrix: ResponseMatr
interface.set_many(data)
if rf and trims['rf'] != 0:
f_rf = interface.get_rf_main_frequency()
interface.set_rf_main_frequency(f_rf + trims['rf'])
interface.set_rf_main_frequency(f_rf + gain_rf * trims['rf'])

return trims

Expand Down
199 changes: 162 additions & 37 deletions pySC/apps/response_matrix.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,14 @@
import logging
import json

## Some timing info with 640 outputs, 576 inputs:
## response_matrix.build_pseudoinverse() -> 30 ms
## response_matrix.solve() -> 2 ms (if pseudo-inverse is cached)
## hash(bytes(response_matrix.input_weights)) -> 0.2 ms
## hash(bytes(response_matrix.output_weights)) -> 0.2 ms
##


PLANE_TYPE = Literal['H', 'V', 'Q', 'SQ']

logger = logging.getLogger(__name__)
Expand All @@ -13,8 +21,13 @@ class InverseResponseMatrix(BaseModel, extra="forbid"):
matrix: NPARRAY
method: Literal['tikhonov', 'svd_values', 'svd_cutoff', 'micado']
parameter: float
zerosum: bool = True
virtual: bool = True
rf: bool = False
rf_weight: float
virtual_weight: float
hash_rf_response: int
hash_input_weights: int
hash_output_weights: int

model_config = ConfigDict(arbitrary_types_allowed=True)

Expand All @@ -34,10 +47,14 @@ class ResponseMatrix(BaseModel):

input_names: Optional[list[str]] = None
output_names: Optional[list[str]] = None
inputs_plane: Optional[list[PLANE_TYPE]] = None
outputs_plane: Optional[list[PLANE_TYPE]] = None
input_planes: Optional[list[PLANE_TYPE]] = None
output_planes: Optional[list[PLANE_TYPE]] = None

rf_response: Optional[NPARRAY] = None
rf_weight: float = 1
input_weights: Optional[NPARRAY] = None
output_weights: Optional[NPARRAY] = None
rf_weight: Optional[float] = None
virtual_weight: float = 1

_n_outputs: int = PrivateAttr(default=0)
_n_inputs: int = PrivateAttr(default=0)
Expand All @@ -58,6 +75,29 @@ def RM(self):
logger.warning('ResponseMatrix.RM is deprecated! Please use ResponseMatrix.matrix instead.')
return self.matrix

@model_validator(mode='before')
def check_deprecations(cls, data):
if 'inputs_plane' in data.keys():
logger.warning('DEPRECATION: `inputs_plane` in the ResponseMatrix has been renamed to `input_planes`.')
logger.warning('DEPRECATION: You should do the same.')
if 'input_planes' in data.keys():
raise Exception('Both `inputs_plane` and `input_planes` are in the ResponseMatrix. Please only use the later one.')
else:
logger.warning('DEPRECATION: renaming automatically `inputs_plane` to `input_planes`.')
data['input_planes'] = data['inputs_plane']
del data['inputs_plane']

if 'outputs_plane' in data.keys():
logger.warning('DEPRECATION: `outputs_plane` in the ResponseMatrix has been renamed to `output_planes`.')
logger.warning('DEPRECATION: You should do the same.')
if 'output_planes' in data.keys():
raise Exception('Both `outputs_plane` and `output_planes` are in the ResponseMatrix. Please only use the later one.')
else:
logger.warning('DEPRECATION: renaming automatically `outputs_plane` to `output_planes`.')
data['output_planes'] = data['outputs_plane']
del data['outputs_plane']
return data

@model_validator(mode='after')
def initialize_and_check(self):
self._n_outputs, self._n_inputs = self.matrix.shape
Expand All @@ -68,19 +108,19 @@ def initialize_and_check(self):
self._singular_values = None
self.make_masks()

if self.inputs_plane is None:
if self.input_planes is None:
Nh = self._n_inputs // 2
if self._n_inputs % 2 != 0:
logger.warning('Plane of inputs is undefined and number of inputs in response matrix is not even. '
'Misinterpretation of the input plane is guaranteed!')
self.inputs_plane = ['H'] * Nh + ['V'] * (self._n_inputs - Nh)
self.input_planes = ['H'] * Nh + ['V'] * (self._n_inputs - Nh)

if self.outputs_plane is None:
if self.output_planes is None:
Nh = self._n_outputs // 2
if self._n_outputs % 2 != 0:
logger.warning('Plane of outputs is undefined and number of outputs in response matrix is not even. '
'Misinterpretation of the output plane is guaranteed!')
self.outputs_plane = ['H'] * Nh + ['V'] * (self._n_outputs - Nh)
self.output_planes = ['H'] * Nh + ['V'] * (self._n_outputs - Nh)

if self.rf_response is None:
self.rf_response = np.zeros(self._n_outputs)
Expand All @@ -90,8 +130,60 @@ def initialize_and_check(self):
logger.warning('RF response will be removed.')
self.rf_response = np.zeros(self._n_outputs)

if self.input_weights is None:
self.input_weights = np.ones(self._n_inputs, dtype=float)

if self.output_weights is None:
self.output_weights = np.ones(self._n_outputs, dtype=float)

if self.rf_response is not None and self.rf_weight is None:
default_rf_weight = self.default_rf_weight()
logger.info(f'Setting the rf_weight by default to {default_rf_weight}.')
self.rf_weight = default_rf_weight

return self

@property
def hash_rf_response(self) -> int:
return hash(bytes(self.rf_response))

@property
def hash_input_weights(self) -> int:
return hash(bytes(self.input_weights))

@property
def hash_output_weights(self) -> int:
return hash(bytes(self.output_weights))

def set_weight(self, name: str, weight: float, plane: Optional[PLANE_TYPE] = None):
applied = False
for ii, input_name in enumerate(self.input_names):
if input_name == name:
if plane is None or self.input_planes[ii] == plane:
self.input_weights[ii] = weight
applied = True

for ii, output_name in enumerate(self.output_names):
if output_name == name:
if plane is None or self.output_planes[ii] == plane:
self.output_weights[ii] = weight
applied = True

if not applied:
logger.warning('{name} was not found to apply weight.')

return

def default_rf_weight(self) -> float:
if self.rf_response is None:
raise Exception('rf_response was not found.')
matrix_h = self.matrix_h
rms_per_input = np.std(matrix_h, axis=0)
mean_rms_inputs = np.mean(rms_per_input)
rms_rf = np.std(self.rf_response)
default_rf_weight = mean_rms_inputs / rms_rf
return default_rf_weight

@property
def singular_values(self) -> np.array:
return self._singular_values
Expand All @@ -118,10 +210,10 @@ def get_matrix_in_plane(self, plane: Optional[PLANE_TYPE] = None) -> np.array:
return self.matrix[output_plane_mask, :][:, input_plane_mask]

def get_input_plane_mask(self, plane: Literal[PLANE_TYPE]) -> np.array:
return np.array(self.inputs_plane) == plane
return np.array(self.input_planes) == plane

def get_output_plane_mask(self, plane: Literal[PLANE_TYPE]) -> np.array:
return np.array(self.outputs_plane) == plane
return np.array(self.output_planes) == plane

@property
def matrix_h(self) -> np.array:
Expand Down Expand Up @@ -220,38 +312,51 @@ def disable_all_outputs_but(self, outputs: list[str]):
def enable_all_outputs(self):
self.bad_outputs = []

def build_pseudoinverse(self, method='svd_cutoff', parameter: float = 0., zerosum: bool = False,
def build_pseudoinverse(self, method='svd_cutoff', parameter: float = 0., virtual: bool = False,
rf: bool = False, plane: Optional[PLANE_TYPE] = None):
logger.info(f'(Re-)Building pseudoinverse RM with {method=} and {parameter=} with {zerosum=}.')
logger.info(f'(Re-)Building pseudoinverse RM with {method=}, {parameter=}, {plane=}, {virtual=}, {rf=}.')
assert plane is None or plane in ['H', 'V'], f'Unknown plane: {plane}.'
if plane is None:
matrix = self.matrix[self._output_mask, :][:, self._input_mask]
tot_output_mask = self._output_mask
tot_input_mask = self._input_mask
elif plane in ['H', 'V']:
output_plane_mask = self.get_output_plane_mask(plane)
input_plane_mask = self.get_input_plane_mask(plane)
tot_output_mask = np.logical_and(self._output_mask, output_plane_mask)
tot_input_mask = np.logical_and(self._input_mask, input_plane_mask)
matrix = self.matrix[tot_output_mask, :][:, tot_input_mask]

if zerosum or rf:
# at this point, matrix has bad outputs/inputs (bpms/correctors) removed.
# also has only plane-specific outputs/inputs if requested.

# add extra column/row for the rf response or to enforce that the sum of all outputs is zero.
if virtual or rf:
rows, cols = matrix.shape
if zerosum:
if virtual:
rows += 1
if rf:
cols += 1
matrix_to_invert = np.zeros((rows, cols), dtype=float)
matrix_to_invert[:matrix.shape[0], :matrix.shape[1]] = matrix
if zerosum:
matrix_to_invert[-1, :matrix.shape[1]]
if virtual:
matrix_to_invert[-1, :matrix.shape[1]] = self.virtual_weight
if rf:
rf_response = self.rf_response
if plane is not None:
rf_response = rf_response[tot_output_mask] # tot_output_mask will have been defined earlier always.

matrix_to_invert[:matrix.shape[0], -1] = self.rf_weight*rf_response
matrix_to_invert[:matrix.shape[0], -1] = self.rf_weight * rf_response
else:
matrix_to_invert = matrix

# matrix_to_invert is extended by one row and/or column if rf and/or virtual was enabled.

# handle weights
rows, cols = matrix.shape
matrix_to_invert[:, :cols] = np.multiply(matrix_to_invert[:, :cols], self.input_weights[tot_input_mask])
matrix_to_invert[:rows, :] = np.multiply(matrix_to_invert[:rows, :], self.output_weights[tot_output_mask][:, np.newaxis])

U, s_mat, Vh = np.linalg.svd(matrix_to_invert, full_matrices=False)
if method == 'svd_cutoff':
cutoff = parameter
Expand All @@ -271,23 +376,32 @@ def build_pseudoinverse(self, method='svd_cutoff', parameter: float = 0., zerosu
#matrix_inv = np.dot(np.dot(np.transpose(Vh), np.diag(d_mat)), np.transpose(U))
matrix_inv = np.dot(np.dot(np.transpose(Vh[:keep,:]), np.diag(d_mat)), np.transpose(U[:, :keep]))

return InverseResponseMatrix(matrix=matrix_inv, method=method, parameter=parameter, zerosum=zerosum)
return InverseResponseMatrix(matrix=matrix_inv, method=method, parameter=parameter, virtual=virtual,
virtual_weight=self.virtual_weight, rf=rf, rf_weight=self.rf_weight,
hash_rf_response=self.hash_rf_response,
hash_input_weights=self.hash_input_weights,
hash_output_weights=self.hash_output_weights)

def solve(self, output: np.array, method: str = 'svd_cutoff', parameter: float = 0., virtual: bool = False,
zerosum: Optional[bool] = None, rf: bool = False, plane: Optional[Literal['H', 'V']] = None,
virtual_target: float = 0) -> np.ndarray:
if zerosum is not None:
logger.warning('`zerosum` argument in ResponseMatrix.solve is deprecated. Please use `virtual` instead.')
virtual = zerosum

def solve(self, output: np.array, method: str = 'svd_cutoff', parameter: float = 0.,
zerosum: bool = False, rf: bool = False, plane: Optional[Literal['H', 'V']] = None) -> np.ndarray:
assert len(self.bad_outputs) != self.matrix.shape[0], 'All outputs are disabled!'
assert len(self.bad_inputs) != self.matrix.shape[1], 'All inputs are disabled!'
assert plane is None or plane in ['H', 'V'], f'Unknown plane: {plane}.'
if plane is None:
expected_shape = (self._n_inputs - len(self._bad_inputs), self._n_outputs - len(self._bad_outputs))
else:
output_plane_mask = np.array(self.outputs_plane) == plane
input_plane_mask = np.array(self.inputs_plane) == plane
output_plane_mask = np.array(self.output_planes) == plane
input_plane_mask = np.array(self.input_planes) == plane
tot_output_mask = np.logical_and(self._output_mask, output_plane_mask)
tot_input_mask = np.logical_and(self._input_mask, input_plane_mask)
expected_shape = (sum(tot_input_mask), sum(tot_output_mask))

if zerosum:
if virtual:
expected_shape = (expected_shape[0], expected_shape[1] + 1)

if rf:
Expand All @@ -303,11 +417,20 @@ def solve(self, output: np.array, method: str = 'svd_cutoff', parameter: float =
active_inverse_RM = self._inverse_RM_V

if active_inverse_RM is None:
active_inverse_RM = self.build_pseudoinverse(method=method, parameter=parameter, zerosum=zerosum, plane=plane, rf=rf)
active_inverse_RM = self.build_pseudoinverse(method=method, parameter=parameter, virtual=virtual, plane=plane, rf=rf)
else:
if (active_inverse_RM.method != method or active_inverse_RM.parameter != parameter or
active_inverse_RM.zerosum != zerosum or active_inverse_RM.shape != expected_shape):
active_inverse_RM = self.build_pseudoinverse(method=method, parameter=parameter, zerosum=zerosum, plane=plane, rf=rf)
if not (active_inverse_RM.method == method
and active_inverse_RM.parameter == parameter
and active_inverse_RM.virtual == virtual
and active_inverse_RM.virtual_weight == self.virtual_weight
and active_inverse_RM.rf == rf
and active_inverse_RM.shape == expected_shape
and active_inverse_RM.hash_rf_response == self.hash_rf_response
and active_inverse_RM.rf_weight == self.rf_weight
and active_inverse_RM.hash_input_weights == self.hash_input_weights
and active_inverse_RM.hash_output_weights == self.hash_output_weights
):
active_inverse_RM = self.build_pseudoinverse(method=method, parameter=parameter, virtual=virtual, plane=plane, rf=rf)

# cache it
if plane is None:
Expand All @@ -320,17 +443,17 @@ def solve(self, output: np.array, method: str = 'svd_cutoff', parameter: float =

output_plane_mask = np.ones_like(self._output_mask, dtype=bool)
if plane in ['H', 'V']:
output_plane_mask = np.array(self.outputs_plane) == plane
output_plane_mask = np.array(self.output_planes) == plane

bad_output = output.copy()
bad_output = output.copy() * self.output_weights
bad_output[np.isnan(bad_output)] = 0
good_output = bad_output[np.logical_and(self._output_mask, output_plane_mask)]


if method == 'micado':
bad_input = self.micado(good_output, int(parameter), plane=plane)
if zerosum:
logger.warning('Zerosum option is incompatible with the micado method and will be ignored.')
if virtual:
logger.warning('virtual option is incompatible with the micado method and will be ignored.')
if rf:
logger.warning('Rf option is incompatible with the micado method and will be ignored.')
else:
Expand All @@ -340,10 +463,11 @@ def solve(self, output: np.array, method: str = 'svd_cutoff', parameter: float =
+ f'expected inputs = {expected_shape[0]},\n'
+ f'expected outputs = {expected_shape[1]},')

if zerosum:
zerosum_good_output = np.zeros(len(good_output) + 1)
zerosum_good_output[:-1] = good_output
good_input = active_inverse_RM.dot(zerosum_good_output)
if virtual:
virtual_good_output = np.zeros(len(good_output) + 1)
virtual_good_output[:-1] = good_output
virtual_good_output[-1] = virtual_target * self.virtual_weight
good_input = active_inverse_RM.dot(virtual_good_output)
else:
good_input = active_inverse_RM.dot(good_output)

Expand All @@ -358,11 +482,12 @@ def solve(self, output: np.array, method: str = 'svd_cutoff', parameter: float =
bad_input = np.zeros(final_input_length, dtype=float)
input_plane_mask = np.ones_like(self._input_mask, dtype=bool)
if plane in ['H', 'V']:
input_plane_mask = np.array(self.inputs_plane) == plane
input_plane_mask = np.array(self.input_planes) == plane
bad_input[:self._n_inputs][np.logical_and(self._input_mask, input_plane_mask)] = good_input

bad_input[:self._n_inputs] = np.multiply(bad_input[:self._n_inputs], self.input_weights)
if rf:
bad_input[-1] = rf_input
bad_input[-1] = rf_input * self.rf_weight
return bad_input

def micado(self, good_output: np.array, n: int, plane: Optional[PLANE_TYPE] = None) -> np.ndarray:
Expand Down
Loading