From a0fe0ad49a916bf3e7ffbc07abac16bc6f221b65 Mon Sep 17 00:00:00 2001 From: kparasch Date: Mon, 2 Mar 2026 19:10:27 +0100 Subject: [PATCH 1/8] fix of weight in rf-orbit correction --- pySC/apps/response_matrix.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pySC/apps/response_matrix.py b/pySC/apps/response_matrix.py index afedc3c..705e17a 100644 --- a/pySC/apps/response_matrix.py +++ b/pySC/apps/response_matrix.py @@ -248,7 +248,7 @@ def build_pseudoinverse(self, method='svd_cutoff', parameter: float = 0., zerosu if plane is not None: rf_response = rf_response[tot_output_mask] # tot_output_mask will have been defined earlier always. - matrix_to_invert[:matrix.shape[0], -1] = self.rf_weight*rf_response + matrix_to_invert[:matrix.shape[0], -1] = self.rf_weight * rf_response else: matrix_to_invert = matrix @@ -362,7 +362,7 @@ def solve(self, output: np.array, method: str = 'svd_cutoff', parameter: float = bad_input[:self._n_inputs][np.logical_and(self._input_mask, input_plane_mask)] = good_input if rf: - bad_input[-1] = rf_input + bad_input[-1] = rf_input * self.rf_weight return bad_input def micado(self, good_output: np.array, n: int, plane: Optional[PLANE_TYPE] = None) -> np.ndarray: From e323d9b1c466fdac8170391f4496239fde60bcdd Mon Sep 17 00:00:00 2001 From: kparasch Date: Tue, 3 Mar 2026 16:18:50 +0100 Subject: [PATCH 2/8] enable weights handling --- pySC/apps/response_matrix.py | 86 +++++++++++++++++++++++++++++++++--- 1 file changed, 81 insertions(+), 5 deletions(-) diff --git a/pySC/apps/response_matrix.py b/pySC/apps/response_matrix.py index 705e17a..d2784e0 100644 --- a/pySC/apps/response_matrix.py +++ b/pySC/apps/response_matrix.py @@ -5,6 +5,14 @@ import logging import json +## Some timing info with 640 outputs, 576 inputs: +## response_matrix.build_pseudoinverse() -> 30 ms +## response_matrix.solve() -> 2 ms (if pseudo-inverse is cached) +## hash(bytes(response_matrix.input_weights)) -> 0.2 ms +## hash(bytes(response_matrix.output_weights)) -> 0.2 ms +## + + PLANE_TYPE = Literal['H', 'V', 'Q', 'SQ'] logger = logging.getLogger(__name__) @@ -15,6 +23,10 @@ class InverseResponseMatrix(BaseModel, extra="forbid"): parameter: float zerosum: bool = True rf: bool = False + rf_weight: int + hash_rf_response: int + hash_input_weights: int + hash_output_weights: int model_config = ConfigDict(arbitrary_types_allowed=True) @@ -36,7 +48,10 @@ class ResponseMatrix(BaseModel): output_names: Optional[list[str]] = None inputs_plane: Optional[list[PLANE_TYPE]] = None outputs_plane: Optional[list[PLANE_TYPE]] = None + rf_response: Optional[NPARRAY] = None + input_weights: Optional[NPARRAY] = None + output_weights: Optional[NPARRAY] = None rf_weight: float = 1 _n_outputs: int = PrivateAttr(default=0) @@ -90,8 +105,45 @@ def initialize_and_check(self): logger.warning('RF response will be removed.') self.rf_response = np.zeros(self._n_outputs) + if self.input_weights is None: + self.input_weights = np.ones(self._n_inputs, dtype=float) + + if self.output_weights is None: + self.output_weights = np.ones(self._n_outputs, dtype=float) + return self + @property + def hash_rf_response(self) -> int: + return hash(bytes(self.rf_response)) + + @property + def hash_input_weights(self) -> int: + return hash(bytes(self.input_weights)) + + @property + def hash_output_weights(self) -> int: + return hash(bytes(self.output_weights)) + + def set_weight(self, name: str, weight: float, plane: Optional[PLANE_TYPE] = None): + applied = False + for ii, input_name in enumerate(self.input_names): + if input_name == name: + if plane is None or self.inputs_plane[ii] == plane: + self.input_weights[ii] = weight + applied = True + + for ii, output_name in enumerate(self.output_names): + if output_name == name: + if plane is None or self.outputs_plane[ii] == plane: + self.output_weights[ii] = weight + applied = True + + if not applied: + logger.warning('{name} was not found to apply weight.') + + return + @property def singular_values(self) -> np.array: return self._singular_values @@ -222,10 +274,12 @@ def enable_all_outputs(self): def build_pseudoinverse(self, method='svd_cutoff', parameter: float = 0., zerosum: bool = False, rf: bool = False, plane: Optional[PLANE_TYPE] = None): - logger.info(f'(Re-)Building pseudoinverse RM with {method=} and {parameter=} with {zerosum=}.') + logger.info(f'(Re-)Building pseudoinverse RM with {method=}, {parameter=}, {plane=}, {zerosum=}, {rf=}.') assert plane is None or plane in ['H', 'V'], f'Unknown plane: {plane}.' if plane is None: matrix = self.matrix[self._output_mask, :][:, self._input_mask] + tot_output_mask = self._output_mask + tot_input_mask = self._input_mask elif plane in ['H', 'V']: output_plane_mask = self.get_output_plane_mask(plane) input_plane_mask = self.get_input_plane_mask(plane) @@ -233,6 +287,10 @@ def build_pseudoinverse(self, method='svd_cutoff', parameter: float = 0., zerosu tot_input_mask = np.logical_and(self._input_mask, input_plane_mask) matrix = self.matrix[tot_output_mask, :][:, tot_input_mask] + # at this point, matrix has bad outputs/inputs (bpms/correctors) removed. + # also has only plane-specific outputs/inputs if requested. + + # add extra column/row for the rf response or to enforce that the sum of all outputs is zero. if zerosum or rf: rows, cols = matrix.shape if zerosum: @@ -252,6 +310,13 @@ def build_pseudoinverse(self, method='svd_cutoff', parameter: float = 0., zerosu else: matrix_to_invert = matrix + # matrix_to_invert is extended by one row and/or column if rf and/or zerosum was enabled. + + # handle weights + rows, cols = matrix.shape + matrix_to_invert[:, :cols] = np.multiply(matrix_to_invert[:, :cols], self.input_weights[tot_input_mask]) + matrix_to_invert[:rows, :] = np.multiply(matrix_to_invert[:rows, :], self.output_weights[tot_output_mask][:, np.newaxis]) + U, s_mat, Vh = np.linalg.svd(matrix_to_invert, full_matrices=False) if method == 'svd_cutoff': cutoff = parameter @@ -271,7 +336,9 @@ def build_pseudoinverse(self, method='svd_cutoff', parameter: float = 0., zerosu #matrix_inv = np.dot(np.dot(np.transpose(Vh), np.diag(d_mat)), np.transpose(U)) matrix_inv = np.dot(np.dot(np.transpose(Vh[:keep,:]), np.diag(d_mat)), np.transpose(U[:, :keep])) - return InverseResponseMatrix(matrix=matrix_inv, method=method, parameter=parameter, zerosum=zerosum) + return InverseResponseMatrix(matrix=matrix_inv, method=method, parameter=parameter, zerosum=zerosum, rf=rf, + rf_weight=self.rf_weight, hash_rf_response=self.hash_rf_response, + hash_input_weights=self.hash_input_weights, hash_output_weights=self.hash_output_weights) def solve(self, output: np.array, method: str = 'svd_cutoff', parameter: float = 0., zerosum: bool = False, rf: bool = False, plane: Optional[Literal['H', 'V']] = None) -> np.ndarray: @@ -305,8 +372,16 @@ def solve(self, output: np.array, method: str = 'svd_cutoff', parameter: float = if active_inverse_RM is None: active_inverse_RM = self.build_pseudoinverse(method=method, parameter=parameter, zerosum=zerosum, plane=plane, rf=rf) else: - if (active_inverse_RM.method != method or active_inverse_RM.parameter != parameter or - active_inverse_RM.zerosum != zerosum or active_inverse_RM.shape != expected_shape): + if not (active_inverse_RM.method == method + and active_inverse_RM.parameter == parameter + and active_inverse_RM.zerosum == zerosum + and active_inverse_RM.rf == rf + and active_inverse_RM.shape == expected_shape + and active_inverse_RM.hash_rf_response == self.hash_rf_response + and active_inverse_RM.rf_weight == self.rf_weight + and active_inverse_RM.hash_input_weights == self.hash_input_weights + and active_inverse_RM.hash_output_weights == self.hash_output_weights + ): active_inverse_RM = self.build_pseudoinverse(method=method, parameter=parameter, zerosum=zerosum, plane=plane, rf=rf) # cache it @@ -322,7 +397,7 @@ def solve(self, output: np.array, method: str = 'svd_cutoff', parameter: float = if plane in ['H', 'V']: output_plane_mask = np.array(self.outputs_plane) == plane - bad_output = output.copy() + bad_output = output.copy() * self.output_weights bad_output[np.isnan(bad_output)] = 0 good_output = bad_output[np.logical_and(self._output_mask, output_plane_mask)] @@ -361,6 +436,7 @@ def solve(self, output: np.array, method: str = 'svd_cutoff', parameter: float = input_plane_mask = np.array(self.inputs_plane) == plane bad_input[:self._n_inputs][np.logical_and(self._input_mask, input_plane_mask)] = good_input + bad_input[:self._n_inputs] = np.multiply(bad_input[:self._n_inputs], self.input_weights) if rf: bad_input[-1] = rf_input * self.rf_weight return bad_input From d520845ce682b32d7c820ea56dc74c4e1c6924b5 Mon Sep 17 00:00:00 2001 From: kparasch Date: Tue, 3 Mar 2026 18:25:43 +0100 Subject: [PATCH 3/8] rename zerosum to virtual, enable virtual weight and virtual target --- pySC/apps/measurements.py | 13 +++++--- pySC/apps/response_matrix.py | 59 +++++++++++++++++++++--------------- pySC/tuning/tuning_core.py | 10 +++--- 3 files changed, 48 insertions(+), 34 deletions(-) diff --git a/pySC/apps/measurements.py b/pySC/apps/measurements.py index 0ff15fc..9e162c8 100644 --- a/pySC/apps/measurements.py +++ b/pySC/apps/measurements.py @@ -13,8 +13,13 @@ def orbit_correction(interface: AbstractInterface, response_matrix: ResponseMatrix, method='svd_cutoff', parameter: Union[int,float] = 0, reference: Optional[np.ndarray] = None, - gain: float = 1, zerosum: bool = False, plane: Optional[Literal['H', 'V']] = None, - rf: bool = False, apply: bool = False): + gain: float = 1, virtual: bool = False, plane: Optional[Literal['H', 'V']] = None, + rf: bool = False, apply: bool = False, virtual_target: float = 0, + gain_rf: float = 1, zerosum: Optional[bool] = None): + + if zerosum is not None: + logger.warning('`zerosum` argument in ResponseMatrix.solve is deprecated. Please use `virtual` instead.') + virtual = zerosum correctors = response_matrix.input_names assert correctors is not None, 'Corrector names are undefined in the response matrix' @@ -29,7 +34,7 @@ def orbit_correction(interface: AbstractInterface, response_matrix: ResponseMatr assert len(reference) == len(orbit), "Reference orbit has wrong length" orbit -= reference - trim_list = -response_matrix.solve(orbit, method=method, parameter=parameter, zerosum=zerosum, plane=plane, rf=rf) + trim_list = -response_matrix.solve(orbit, method=method, parameter=parameter, virtual=virtual, plane=plane, rf=rf, virtual_target=virtual_target) trims = {corr: trim for corr, trim in zip(correctors, trim_list, strict=False) if trim != 0} # if rf is selected, trim_list will be larger than correctors by one element. The last element is the rf frequency. @@ -43,7 +48,7 @@ def orbit_correction(interface: AbstractInterface, response_matrix: ResponseMatr interface.set_many(data) if rf and trims['rf'] != 0: f_rf = interface.get_rf_main_frequency() - interface.set_rf_main_frequency(f_rf + trims['rf']) + interface.set_rf_main_frequency(f_rf + gain_rf * trims['rf']) return trims diff --git a/pySC/apps/response_matrix.py b/pySC/apps/response_matrix.py index d2784e0..18eddb7 100644 --- a/pySC/apps/response_matrix.py +++ b/pySC/apps/response_matrix.py @@ -21,9 +21,10 @@ class InverseResponseMatrix(BaseModel, extra="forbid"): matrix: NPARRAY method: Literal['tikhonov', 'svd_values', 'svd_cutoff', 'micado'] parameter: float - zerosum: bool = True + virtual: bool = True rf: bool = False - rf_weight: int + rf_weight: float + virtual_weight: float hash_rf_response: int hash_input_weights: int hash_output_weights: int @@ -53,6 +54,7 @@ class ResponseMatrix(BaseModel): input_weights: Optional[NPARRAY] = None output_weights: Optional[NPARRAY] = None rf_weight: float = 1 + virtual_weight: float = 1 _n_outputs: int = PrivateAttr(default=0) _n_inputs: int = PrivateAttr(default=0) @@ -272,9 +274,9 @@ def disable_all_outputs_but(self, outputs: list[str]): def enable_all_outputs(self): self.bad_outputs = [] - def build_pseudoinverse(self, method='svd_cutoff', parameter: float = 0., zerosum: bool = False, + def build_pseudoinverse(self, method='svd_cutoff', parameter: float = 0., virtual: bool = False, rf: bool = False, plane: Optional[PLANE_TYPE] = None): - logger.info(f'(Re-)Building pseudoinverse RM with {method=}, {parameter=}, {plane=}, {zerosum=}, {rf=}.') + logger.info(f'(Re-)Building pseudoinverse RM with {method=}, {parameter=}, {plane=}, {virtual=}, {rf=}.') assert plane is None or plane in ['H', 'V'], f'Unknown plane: {plane}.' if plane is None: matrix = self.matrix[self._output_mask, :][:, self._input_mask] @@ -291,16 +293,16 @@ def build_pseudoinverse(self, method='svd_cutoff', parameter: float = 0., zerosu # also has only plane-specific outputs/inputs if requested. # add extra column/row for the rf response or to enforce that the sum of all outputs is zero. - if zerosum or rf: + if virtual or rf: rows, cols = matrix.shape - if zerosum: + if virtual: rows += 1 if rf: cols += 1 matrix_to_invert = np.zeros((rows, cols), dtype=float) matrix_to_invert[:matrix.shape[0], :matrix.shape[1]] = matrix - if zerosum: - matrix_to_invert[-1, :matrix.shape[1]] + if virtual: + matrix_to_invert[-1, :matrix.shape[1]] = self.virtual_weight if rf: rf_response = self.rf_response if plane is not None: @@ -310,7 +312,7 @@ def build_pseudoinverse(self, method='svd_cutoff', parameter: float = 0., zerosu else: matrix_to_invert = matrix - # matrix_to_invert is extended by one row and/or column if rf and/or zerosum was enabled. + # matrix_to_invert is extended by one row and/or column if rf and/or virtual was enabled. # handle weights rows, cols = matrix.shape @@ -336,12 +338,19 @@ def build_pseudoinverse(self, method='svd_cutoff', parameter: float = 0., zerosu #matrix_inv = np.dot(np.dot(np.transpose(Vh), np.diag(d_mat)), np.transpose(U)) matrix_inv = np.dot(np.dot(np.transpose(Vh[:keep,:]), np.diag(d_mat)), np.transpose(U[:, :keep])) - return InverseResponseMatrix(matrix=matrix_inv, method=method, parameter=parameter, zerosum=zerosum, rf=rf, - rf_weight=self.rf_weight, hash_rf_response=self.hash_rf_response, - hash_input_weights=self.hash_input_weights, hash_output_weights=self.hash_output_weights) + return InverseResponseMatrix(matrix=matrix_inv, method=method, parameter=parameter, virtual=virtual, + virtual_weight=self.virtual_weight, rf=rf, rf_weight=self.rf_weight, + hash_rf_response=self.hash_rf_response, + hash_input_weights=self.hash_input_weights, + hash_output_weights=self.hash_output_weights) + + def solve(self, output: np.array, method: str = 'svd_cutoff', parameter: float = 0., virtual: bool = False, + zerosum: Optional[bool] = None, rf: bool = False, plane: Optional[Literal['H', 'V']] = None, + virtual_target: float = 0) -> np.ndarray: + if zerosum is not None: + logger.warning('`zerosum` argument in ResponseMatrix.solve is deprecated. Please use `virtual` instead.') + virtual = zerosum - def solve(self, output: np.array, method: str = 'svd_cutoff', parameter: float = 0., - zerosum: bool = False, rf: bool = False, plane: Optional[Literal['H', 'V']] = None) -> np.ndarray: assert len(self.bad_outputs) != self.matrix.shape[0], 'All outputs are disabled!' assert len(self.bad_inputs) != self.matrix.shape[1], 'All inputs are disabled!' assert plane is None or plane in ['H', 'V'], f'Unknown plane: {plane}.' @@ -354,7 +363,7 @@ def solve(self, output: np.array, method: str = 'svd_cutoff', parameter: float = tot_input_mask = np.logical_and(self._input_mask, input_plane_mask) expected_shape = (sum(tot_input_mask), sum(tot_output_mask)) - if zerosum: + if virtual: expected_shape = (expected_shape[0], expected_shape[1] + 1) if rf: @@ -370,11 +379,12 @@ def solve(self, output: np.array, method: str = 'svd_cutoff', parameter: float = active_inverse_RM = self._inverse_RM_V if active_inverse_RM is None: - active_inverse_RM = self.build_pseudoinverse(method=method, parameter=parameter, zerosum=zerosum, plane=plane, rf=rf) + active_inverse_RM = self.build_pseudoinverse(method=method, parameter=parameter, virtual=virtual, plane=plane, rf=rf) else: if not (active_inverse_RM.method == method and active_inverse_RM.parameter == parameter - and active_inverse_RM.zerosum == zerosum + and active_inverse_RM.virtual == virtual + and active_inverse_RM.virtual_weight == self.virtual_weight and active_inverse_RM.rf == rf and active_inverse_RM.shape == expected_shape and active_inverse_RM.hash_rf_response == self.hash_rf_response @@ -382,7 +392,7 @@ def solve(self, output: np.array, method: str = 'svd_cutoff', parameter: float = and active_inverse_RM.hash_input_weights == self.hash_input_weights and active_inverse_RM.hash_output_weights == self.hash_output_weights ): - active_inverse_RM = self.build_pseudoinverse(method=method, parameter=parameter, zerosum=zerosum, plane=plane, rf=rf) + active_inverse_RM = self.build_pseudoinverse(method=method, parameter=parameter, virtual=virtual, plane=plane, rf=rf) # cache it if plane is None: @@ -404,8 +414,8 @@ def solve(self, output: np.array, method: str = 'svd_cutoff', parameter: float = if method == 'micado': bad_input = self.micado(good_output, int(parameter), plane=plane) - if zerosum: - logger.warning('Zerosum option is incompatible with the micado method and will be ignored.') + if virtual: + logger.warning('virtual option is incompatible with the micado method and will be ignored.') if rf: logger.warning('Rf option is incompatible with the micado method and will be ignored.') else: @@ -415,10 +425,11 @@ def solve(self, output: np.array, method: str = 'svd_cutoff', parameter: float = + f'expected inputs = {expected_shape[0]},\n' + f'expected outputs = {expected_shape[1]},') - if zerosum: - zerosum_good_output = np.zeros(len(good_output) + 1) - zerosum_good_output[:-1] = good_output - good_input = active_inverse_RM.dot(zerosum_good_output) + if virtual: + virtual_good_output = np.zeros(len(good_output) + 1) + virtual_good_output[:-1] = good_output + virtual_good_output[-1] = virtual_target * self.virtual_weight + good_input = active_inverse_RM.dot(virtual_good_output) else: good_input = active_inverse_RM.dot(good_output) diff --git a/pySC/tuning/tuning_core.py b/pySC/tuning/tuning_core.py index 89b6452..cbee93c 100644 --- a/pySC/tuning/tuning_core.py +++ b/pySC/tuning/tuning_core.py @@ -156,7 +156,7 @@ def first_turn_transmission(SC): return - def correct_injection(self, n_turns=1, n_reps=1, method='tikhonov', parameter=100, gain=1, correct_to_first_turn=False, zerosum=False): + def correct_injection(self, n_turns=1, n_reps=1, method='tikhonov', parameter=100, gain=1, correct_to_first_turn=False, virtual=False): RM_name = f'trajectory{n_turns}' self.fetch_response_matrix(RM_name, orbit=False, n_turns=n_turns) response_matrix = self.response_matrix[RM_name] @@ -167,7 +167,7 @@ def correct_injection(self, n_turns=1, n_reps=1, method='tikhonov', parameter=10 for _ in range(n_reps): _ = orbit_correction(interface=interface, response_matrix=response_matrix, reference=None, - method=method, parameter=parameter, gain=gain, apply=True) + method=method, parameter=parameter, gain=gain, virtual=virtual, apply=True) trajectory_x, trajectory_y = SC.bpm_system.capture_injection(n_turns=n_turns) trajectory_x = trajectory_x.flatten('F') @@ -180,7 +180,7 @@ def correct_injection(self, n_turns=1, n_reps=1, method='tikhonov', parameter=10 return - def correct_orbit(self, n_reps=1, method='tikhonov', parameter=100, gain=1, zerosum=False): + def correct_orbit(self, n_reps=1, method='tikhonov', parameter=100, gain=1, virtual=False): RM_name = 'orbit' self.fetch_response_matrix(RM_name, orbit=True) response_matrix = self.response_matrix[RM_name] @@ -191,7 +191,7 @@ def correct_orbit(self, n_reps=1, method='tikhonov', parameter=100, gain=1, zero for _ in range(n_reps): _ = orbit_correction(interface=interface, response_matrix=response_matrix, reference=None, - method=method, parameter=parameter, zerosum=zerosum, gain=gain, apply=True) + method=method, parameter=parameter, virtual=virtual, gain=gain, apply=True) orbit_x, orbit_y = SC.bpm_system.capture_orbit() rms_x = np.nanstd(orbit_x) * 1e6 @@ -238,8 +238,6 @@ def fit_dispersive_orbit(self): return np.dot(xy, response) / np.dot(response, response) - - def set_multipole_scale(self, scale: float = 1): logger.info(f'Setting "multipoles" to {scale*100:.0f}%') for control_name in self.multipoles: From 7aa53c5b0709dfdd863ccf9d92ad15e2676ec362 Mon Sep 17 00:00:00 2001 From: kparasch Date: Tue, 3 Mar 2026 19:01:41 +0100 Subject: [PATCH 4/8] rename inputs_plane/outputs_plane to input_planes/output_planes for homogeneity --- pySC/apps/response_matrix.py | 28 ++++++++++++++-------------- pySC/tuning/tuning_core.py | 18 +++++++++--------- 2 files changed, 23 insertions(+), 23 deletions(-) diff --git a/pySC/apps/response_matrix.py b/pySC/apps/response_matrix.py index 18eddb7..b28958b 100644 --- a/pySC/apps/response_matrix.py +++ b/pySC/apps/response_matrix.py @@ -47,8 +47,8 @@ class ResponseMatrix(BaseModel): input_names: Optional[list[str]] = None output_names: Optional[list[str]] = None - inputs_plane: Optional[list[PLANE_TYPE]] = None - outputs_plane: Optional[list[PLANE_TYPE]] = None + input_planes: Optional[list[PLANE_TYPE]] = None + output_planes: Optional[list[PLANE_TYPE]] = None rf_response: Optional[NPARRAY] = None input_weights: Optional[NPARRAY] = None @@ -85,19 +85,19 @@ def initialize_and_check(self): self._singular_values = None self.make_masks() - if self.inputs_plane is None: + if self.input_planes is None: Nh = self._n_inputs // 2 if self._n_inputs % 2 != 0: logger.warning('Plane of inputs is undefined and number of inputs in response matrix is not even. ' 'Misinterpretation of the input plane is guaranteed!') - self.inputs_plane = ['H'] * Nh + ['V'] * (self._n_inputs - Nh) + self.input_planes = ['H'] * Nh + ['V'] * (self._n_inputs - Nh) - if self.outputs_plane is None: + if self.output_planes is None: Nh = self._n_outputs // 2 if self._n_outputs % 2 != 0: logger.warning('Plane of outputs is undefined and number of outputs in response matrix is not even. ' 'Misinterpretation of the output plane is guaranteed!') - self.outputs_plane = ['H'] * Nh + ['V'] * (self._n_outputs - Nh) + self.output_planes = ['H'] * Nh + ['V'] * (self._n_outputs - Nh) if self.rf_response is None: self.rf_response = np.zeros(self._n_outputs) @@ -131,13 +131,13 @@ def set_weight(self, name: str, weight: float, plane: Optional[PLANE_TYPE] = Non applied = False for ii, input_name in enumerate(self.input_names): if input_name == name: - if plane is None or self.inputs_plane[ii] == plane: + if plane is None or self.input_planes[ii] == plane: self.input_weights[ii] = weight applied = True for ii, output_name in enumerate(self.output_names): if output_name == name: - if plane is None or self.outputs_plane[ii] == plane: + if plane is None or self.output_planes[ii] == plane: self.output_weights[ii] = weight applied = True @@ -172,10 +172,10 @@ def get_matrix_in_plane(self, plane: Optional[PLANE_TYPE] = None) -> np.array: return self.matrix[output_plane_mask, :][:, input_plane_mask] def get_input_plane_mask(self, plane: Literal[PLANE_TYPE]) -> np.array: - return np.array(self.inputs_plane) == plane + return np.array(self.input_planes) == plane def get_output_plane_mask(self, plane: Literal[PLANE_TYPE]) -> np.array: - return np.array(self.outputs_plane) == plane + return np.array(self.output_planes) == plane @property def matrix_h(self) -> np.array: @@ -357,8 +357,8 @@ def solve(self, output: np.array, method: str = 'svd_cutoff', parameter: float = if plane is None: expected_shape = (self._n_inputs - len(self._bad_inputs), self._n_outputs - len(self._bad_outputs)) else: - output_plane_mask = np.array(self.outputs_plane) == plane - input_plane_mask = np.array(self.inputs_plane) == plane + output_plane_mask = np.array(self.output_planes) == plane + input_plane_mask = np.array(self.input_planes) == plane tot_output_mask = np.logical_and(self._output_mask, output_plane_mask) tot_input_mask = np.logical_and(self._input_mask, input_plane_mask) expected_shape = (sum(tot_input_mask), sum(tot_output_mask)) @@ -405,7 +405,7 @@ def solve(self, output: np.array, method: str = 'svd_cutoff', parameter: float = output_plane_mask = np.ones_like(self._output_mask, dtype=bool) if plane in ['H', 'V']: - output_plane_mask = np.array(self.outputs_plane) == plane + output_plane_mask = np.array(self.output_planes) == plane bad_output = output.copy() * self.output_weights bad_output[np.isnan(bad_output)] = 0 @@ -444,7 +444,7 @@ def solve(self, output: np.array, method: str = 'svd_cutoff', parameter: float = bad_input = np.zeros(final_input_length, dtype=float) input_plane_mask = np.ones_like(self._input_mask, dtype=bool) if plane in ['H', 'V']: - input_plane_mask = np.array(self.inputs_plane) == plane + input_plane_mask = np.array(self.input_planes) == plane bad_input[:self._n_inputs][np.logical_and(self._input_mask, input_plane_mask)] = good_input bad_input[:self._n_inputs] = np.multiply(bad_input[:self._n_inputs], self.input_weights) diff --git a/pySC/tuning/tuning_core.py b/pySC/tuning/tuning_core.py index cbee93c..3f06d20 100644 --- a/pySC/tuning/tuning_core.py +++ b/pySC/tuning/tuning_core.py @@ -68,20 +68,20 @@ def fetch_response_matrix(self, name: str, orbit=True, n_turns=1) -> None: self.calculate_model_trajectory_response_matrix(n_turns=n_turns) return - def get_inputs_plane(self, control_names): + def get_input_planes(self, control_names): SC = self._parent - inputs_plane = [] + input_planes = [] for corr in control_names: control = SC.magnet_settings.controls[corr] if type(control.info) is not IndivControl: raise NotImplementedError(f'Unsupported control type for {corr} of type {type(control.info).__name__}.') if control.info.component == 'B': - inputs_plane.append('H') + input_planes.append('H') elif control.info.component == 'A': - inputs_plane.append('V') + input_planes.append('V') else: raise Exception(f'Unknown component: {control.info.component}') - return inputs_plane + return input_planes def calculate_model_trajectory_response_matrix(self, n_turns=1, dkick=1e-5, save_as: str = None): # assumes all bpms are dual plane @@ -90,10 +90,10 @@ def calculate_model_trajectory_response_matrix(self, n_turns=1, dkick=1e-5, save input_names = SC.tuning.CORR output_names = SC.bpm_system.names * n_turns * 2 # two: one per plane and per turn matrix = measure_TrajectoryResponseMatrix(SC, n_turns=n_turns, dkick=dkick, use_design=True) - inputs_plane = self.get_inputs_plane(SC.tuning.CORR) + input_planes = self.get_input_planes(SC.tuning.CORR) self.response_matrix[RM_name] = ResponseMatrix(matrix=matrix, output_names=output_names, - input_names=input_names, inputs_plane=inputs_plane) + input_names=input_names, input_planes=input_planes) if save_as is not None: self.response_matrix[RM_name].to_json(save_as) return @@ -104,9 +104,9 @@ def calculate_model_orbit_response_matrix(self, dkick=1e-5, save_as: str = None) input_names = SC.tuning.CORR output_names = SC.bpm_system.names * 2 # two: one per plane matrix = measure_OrbitResponseMatrix(SC, dkick=dkick, use_design=True) - inputs_plane = self.get_inputs_plane(SC.tuning.CORR) + input_planes = self.get_input_planes(SC.tuning.CORR) self.response_matrix[RM_name] = ResponseMatrix(matrix=matrix, output_names=output_names, - input_names=input_names, inputs_plane=inputs_plane) + input_names=input_names, input_planes=input_planes) if save_as is not None: self.response_matrix[RM_name].to_json(save_as) return From c9375d166bb130ef218cef9fc3f3a822b7ace762 Mon Sep 17 00:00:00 2001 From: kparasch Date: Tue, 3 Mar 2026 19:02:29 +0100 Subject: [PATCH 5/8] version bump --- pySC/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pySC/__init__.py b/pySC/__init__.py index 6c8f043..9314d6f 100644 --- a/pySC/__init__.py +++ b/pySC/__init__.py @@ -6,7 +6,7 @@ """ -__version__ = "1.0.0" +__version__ = "1.1.0" from .core.simulated_commissioning import SimulatedCommissioning from .configuration.generation import generate_SC From fbb7885c4e8aacf4feb01992677bee31d7d37661 Mon Sep 17 00:00:00 2001 From: kparasch Date: Fri, 6 Mar 2026 11:17:19 +0100 Subject: [PATCH 6/8] make the deprecation warning of inputs_plane and outputs_plane not silent. Also rename automatically for backwards-compatibility --- pySC/apps/response_matrix.py | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/pySC/apps/response_matrix.py b/pySC/apps/response_matrix.py index b28958b..3cf2fa2 100644 --- a/pySC/apps/response_matrix.py +++ b/pySC/apps/response_matrix.py @@ -75,6 +75,29 @@ def RM(self): logger.warning('ResponseMatrix.RM is deprecated! Please use ResponseMatrix.matrix instead.') return self.matrix + @model_validator(mode='before') + def check_deprecations(cls, data): + if 'inputs_plane' in data.keys(): + logger.warning('DEPRECATION: `inputs_plane` in the ResponseMatrix has been renamed to `input_planes`.') + logger.warning('DEPRECATION: You should do the same.') + if 'input_planes' in data.keys(): + raise Exception('Both `inputs_plane` and `input_planes` are in the ResponseMatrix. Please only use the later one.') + else: + logger.warning('DEPRECATION: renaming automatically `inputs_plane` to `input_planes`.') + data['input_planes'] = data['inputs_plane'] + del data['inputs_plane'] + + if 'outputs_plane' in data.keys(): + logger.warning('DEPRECATION: `outputs_plane` in the ResponseMatrix has been renamed to `output_planes`.') + logger.warning('DEPRECATION: You should do the same.') + if 'output_planes' in data.keys(): + raise Exception('Both `outputs_plane` and `output_planes` are in the ResponseMatrix. Please only use the later one.') + else: + logger.warning('DEPRECATION: renaming automatically `outputs_plane` to `output_planes`.') + data['output_planes'] = data['outputs_plane'] + del data['outputs_plane'] + return data + @model_validator(mode='after') def initialize_and_check(self): self._n_outputs, self._n_inputs = self.matrix.shape From 0de1e85e81011b867245b3b4bc0dba6eab53743d Mon Sep 17 00:00:00 2001 From: kparasch Date: Fri, 6 Mar 2026 11:30:14 +0100 Subject: [PATCH 7/8] use logger.warning instead of print when setpoint exceeds limit --- pySC/core/control.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/pySC/core/control.py b/pySC/core/control.py index 2e66f5a..1180bff 100644 --- a/pySC/core/control.py +++ b/pySC/core/control.py @@ -1,11 +1,14 @@ from __future__ import annotations from pydantic import BaseModel, PrivateAttr, PositiveInt from typing import Optional, Literal, Union, TYPE_CHECKING +import logging from .types import BaseModelWithSave if TYPE_CHECKING: from .magnet import ControlMagnetLink +logger = logging.getLogger(__name__) + class LinearConv(BaseModel, extra="forbid"): factor: float = 1.0 offset: float = 0.0 @@ -39,10 +42,10 @@ def check_limits_and_set(self, setpoint: float) -> None: if self.limits is not None: lower_limit, upper_limit = self.limits if setpoint < lower_limit: - print(f'WARNING: Setpoint {setpoint} for control "{self.name}" is out of limits ({lower_limit}, {upper_limit})') + logger.warning(f'Setpoint {setpoint} for control "{self.name}" is out of limits ({lower_limit}, {upper_limit})') self.setpoint = lower_limit elif setpoint > upper_limit: - print(f'WARNING: Setpoint {setpoint} for control "{self.name}" is out of limits ({lower_limit}, {upper_limit})') + logger.warning(f'Setpoint {setpoint} for control "{self.name}" is out of limits ({lower_limit}, {upper_limit})') self.setpoint = upper_limit else: self.setpoint = setpoint From 4b676425447e969e58fbfc265f1f45f8704a6ef4 Mon Sep 17 00:00:00 2001 From: kparasch Date: Fri, 6 Mar 2026 15:53:13 +0100 Subject: [PATCH 8/8] default rf weight (jean-luc magic formula) --- pySC/apps/response_matrix.py | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/pySC/apps/response_matrix.py b/pySC/apps/response_matrix.py index 3cf2fa2..1775019 100644 --- a/pySC/apps/response_matrix.py +++ b/pySC/apps/response_matrix.py @@ -53,7 +53,7 @@ class ResponseMatrix(BaseModel): rf_response: Optional[NPARRAY] = None input_weights: Optional[NPARRAY] = None output_weights: Optional[NPARRAY] = None - rf_weight: float = 1 + rf_weight: Optional[float] = None virtual_weight: float = 1 _n_outputs: int = PrivateAttr(default=0) @@ -136,6 +136,11 @@ def initialize_and_check(self): if self.output_weights is None: self.output_weights = np.ones(self._n_outputs, dtype=float) + if self.rf_response is not None and self.rf_weight is None: + default_rf_weight = self.default_rf_weight() + logger.info(f'Setting the rf_weight by default to {default_rf_weight}.') + self.rf_weight = default_rf_weight + return self @property @@ -169,6 +174,16 @@ def set_weight(self, name: str, weight: float, plane: Optional[PLANE_TYPE] = Non return + def default_rf_weight(self) -> float: + if self.rf_response is None: + raise Exception('rf_response was not found.') + matrix_h = self.matrix_h + rms_per_input = np.std(matrix_h, axis=0) + mean_rms_inputs = np.mean(rms_per_input) + rms_rf = np.std(self.rf_response) + default_rf_weight = mean_rms_inputs / rms_rf + return default_rf_weight + @property def singular_values(self) -> np.array: return self._singular_values