diff --git a/README.md b/README.md index 637c556..1c3b4a8 100644 --- a/README.md +++ b/README.md @@ -1,10 +1,37 @@ # pySC -Python Simulated Commissioning toolkit for synchrotrons, inspired by [SC](https://github.com/ThorstenHellert/SC) which is written in Matlab. + +Python Simulated Commissioning toolkit for synchrotrons. ## Installing ```bash -git clone https://github.com/kparasch/pySC -cd pySC -pip install -e . +pip install accelerator-commissioning ``` + +## Importing specific modules + +Intended way of importing a pySC functionality: + +``` +from pySC import SimulatedCommissioning +from pySC import generate_SC + +from pySC import ResponseMatrix + +from pySC import orbit_correction +from pySC import measure_bba +from pySC import measure_ORM +from pySC import measure_dispersion + +from pySC import pySCInjectionInterface +from pySC import pySCOrbitInterface + +# the following disables rich progress bars (doesn't work well with ) +from pySC import disable_pySC_rich +disable_pySC_rich() +``` + + +## Acknowledgements + +This toolkit was inspired by [SC](https://github.com/ThorstenHellert/SC) which is written in Matlab. diff --git a/pySC/__init__.py b/pySC/__init__.py index 2ac82f9..6c8f043 100644 --- a/pySC/__init__.py +++ b/pySC/__init__.py @@ -6,26 +6,43 @@ """ -__version__ = "0.4.5" +__version__ = "1.0.0" -from .core.new_simulated_commissioning import SimulatedCommissioning +from .core.simulated_commissioning import SimulatedCommissioning from .configuration.generation import generate_SC -from .tuning.response_matrix import ResponseMatrix - +from .apps.response_matrix import ResponseMatrix +from .apps.measurements import orbit_correction +from .apps.measurements import measure_bba +from .apps.measurements import measure_ORM +from .apps.measurements import measure_dispersion +from .tuning.pySC_interface import pySCInjectionInterface, pySCOrbitInterface import logging import sys logging.basicConfig( #format='%(asctime)s.%(msecs)03d:%(levelname)s:%(name)s:\t%(message)s', format="{asctime} | {levelname} | {message}", - datefmt="%d %b% %Y, %H:%M:%S", + datefmt="%d %b %Y, %H:%M:%S", level=logging.INFO, style='{', stream=sys.stdout ) def disable_pySC_rich(): - from .tuning import response_measurements from .apps import response - response_measurements.DISABLE_RICH = True response.DISABLE_RICH = True + +# This is needed to avoid circular imports. +# Firstly the type of SC is hinted to avoid importing SimulatedCommissioning: +# class pySCOrbitInterface(AbstractInterface): +# SC: "SimulatedCommissioning" = Field(repr=False) +# +# Then, the model_rebuild is triggered here to complete the pydantic model, +# and allow validation. +# for this to be triggered, one needs to import pySC or to import from pySC +# (i.e. from pySC import ...) +# to validate a pySCInjectionInterface/pySCOrbitInterface object, one should +# already have a SimulatedCommissioning object. To acquire the SimulatedCommissioning, +# the model_rebuild is "almost certainly"? triggered. +pySCInjectionInterface.model_rebuild() +pySCOrbitInterface.model_rebuild() diff --git a/pySC/apps/bba.py b/pySC/apps/bba.py index 9fa2019..60208a2 100644 --- a/pySC/apps/bba.py +++ b/pySC/apps/bba.py @@ -1,5 +1,5 @@ -from pydantic import BaseModel, PrivateAttr -from typing import Optional +from pydantic import BaseModel, PrivateAttr, ConfigDict +from typing import Optional, ClassVar import datetime import logging import numpy as np @@ -7,10 +7,9 @@ from .codes import BBACode from ..utils.file_tools import dict_to_h5 -from ..tuning.tools import get_average_orbit +from .tools import get_average_orbit from .interface import AbstractInterface - -from ..tuning.orbit_bba import reject_bpm_outlier, reject_center_outlier, reject_slopes, get_slopes_center, get_offset +from ..core.types import NPARRAY logger = logging.getLogger(__name__) @@ -101,6 +100,7 @@ class BBA_Measurement(BaseModel): shots_per_orbit: int = 2 bipolar: bool = True quad_is_skew: bool = False + plane: str = None initial_h_k0l: Optional[float] = None initial_v_k0l: Optional[float] = None @@ -115,12 +115,12 @@ def __init__(self, **kwargs): super().__init__(**kwargs) # Initialize BBAData instances for horizontal and vertical procedures if self.h_corrector is not None: - self.H_data = BBAData(plane='X', bpm=self.bpm, quadrupole=self.quadrupole, bpm_number=self.bpm_number, + self.H_data = BBAData(plane='H', bpm=self.bpm, quadrupole=self.quadrupole, bpm_number=self.bpm_number, corrector=self.h_corrector, dk0l=self.dk0l_x, dk1l=self.dk1l_x, n0=self.n0, shots_per_orbit=self.shots_per_orbit, bipolar=self.bipolar, skew_quad=self.quad_is_skew) if self.v_corrector is not None: - self.V_data = BBAData(plane='Y', bpm=self.bpm, quadrupole=self.quadrupole, bpm_number=self.bpm_number, + self.V_data = BBAData(plane='V', bpm=self.bpm, quadrupole=self.quadrupole, bpm_number=self.bpm_number, corrector=self.v_corrector, dk0l=self.dk0l_y, dk1l=self.dk1l_y, n0=self.n0, shots_per_orbit=self.shots_per_orbit, bipolar=self.bipolar, skew_quad=self.quad_is_skew) @@ -228,7 +228,7 @@ def one_plane_loop(self, plane: str): #save data yield code_done - def generate(self, interface: AbstractInterface): + def generate(self, interface: AbstractInterface, plane: Optional[str] = None, skip_cycle: bool = False): """ step through the measurement. """ @@ -258,110 +258,179 @@ def generate(self, interface: AbstractInterface): else: dk1 = max(self.H_data.dk1l, self.V_data.dk1l) - for code in hysteresis_loop(self.quadrupole, interface, dk1, n_cycles=2, bipolar=self.bipolar): - yield code + if not skip_cycle: + for code in hysteresis_loop(self.quadrupole, interface, dk1, n_cycles=2, bipolar=self.bipolar): + yield code - if self.h_corrector is not None: + if (plane is None or plane == 'H') and self.h_corrector is not None: for code in self.one_plane_loop('H'): yield code - if self.v_corrector is not None: + if (plane is None or plane == 'V') and self.v_corrector is not None: for code in self.one_plane_loop('V'): yield code yield BBACode.DONE - def run(self, generator=None): - if generator is None: - generator = self.generate() - for code in generator: - logger.debug(f' Got code: {code}') + # def run(self, generator=None): + # if generator is None: + # generator = self.generate() + # for code in generator: + # logger.debug(f' Got code: {code}') +def prep_ios(data: BBAData, n_downstream: Optional[int] = None): + bpm_number = data.bpm_number + bpm_position = np.full((data.n0), np.nan) + + if n_downstream is not None: + induced_orbit_shift = np.full((data.n0, n_downstream), np.nan) + start = bpm_number + end = bpm_number + n_downstream + else: + n_bpm = len(data.raw_bpm_x_center[0]) + induced_orbit_shift = np.full((data.n0, n_bpm), np.nan) + start = 0 + end = n_bpm + + x_up = np.array(data.raw_bpm_x_up) + y_up = np.array(data.raw_bpm_y_up) + x_center = np.array(data.raw_bpm_x_center) + y_center = np.array(data.raw_bpm_y_center) + if data.bipolar: + k1_arr = [-data.dk1l, 0, data.dk1l] + x_down = np.array(data.raw_bpm_x_down) + y_down = np.array(data.raw_bpm_y_down) + all_x = np.array([x_down[:, start:end], x_center[:,start:end], x_up[:, start:end]]) + all_y = np.array([y_down[:, start:end], y_center[:,start:end], y_up[:, start:end]]) + else: + k1_arr = [0, data.dk1l] + all_x = np.array([x_center[:,start:end], x_up[:, start:end]]) + all_y = np.array([y_center[:,start:end], y_up[:, start:end]]) + + for ii in range(data.n0): + if data.plane == 'H': + bpm_position[ii] = np.mean(all_x[:, ii, bpm_number - start]) + if data.skew_quad: + induced_orbit_shift[ii] = np.polyfit(k1_arr, all_y[:,ii], 1)[0] * data.dk1l + else: + induced_orbit_shift[ii] = np.polyfit(k1_arr, all_x[:,ii], 1)[0] * data.dk1l + else: + bpm_position[ii] = np.mean(all_y[:, ii, bpm_number - start]) + if data.skew_quad: + induced_orbit_shift[ii] = np.polyfit(k1_arr, all_x[:,ii], 1)[0] * data.dk1l + else: + induced_orbit_shift[ii] = np.polyfit(k1_arr, all_y[:,ii], 1)[0] * data.dk1l + return bpm_position, induced_orbit_shift + +def reject_bpm_outlier(induced_orbit_shift: np.ndarray, bpm_outlier_sigma: float) -> np.ndarray[bool]: + n_bpms = induced_orbit_shift.shape[1] + mask = np.ones(n_bpms, dtype=bool) + for bpm in range(n_bpms): + data = induced_orbit_shift[:, bpm] + if np.any(data - np.mean(data) > bpm_outlier_sigma * np.std(data)): + mask[bpm] = False + return mask + +def reject_slopes(slopes: np.ndarray, slope_cutoff: float) -> np.ndarray[bool]: + max_slope = np.nanmax(np.abs(slopes)) + mask = np.abs(slopes) > slope_cutoff * max_slope + return mask + +def reject_center_outlier(center: np.ndarray, center_cutoff: float) -> np.ndarray[bool]: + mean = np.nanmean(center) + std = np.nanstd(center) + mask = abs(center - mean) < center_cutoff * std + return mask class BBAAnalysis(BaseModel): offset: float offset_error: float - slopes: list[float] - centers: list[float] + slopes: NPARRAY + centers: NPARRAY - modulation: list[list[float]] - position: list[list[float]] + slopes_err: NPARRAY + centers_err: NPARRAY - rejected_outliers: int - rejected_slopes: int - rejected_centers: int + induced_orbit_shift: NPARRAY + bpm_position: NPARRAY - @classmethod - def analyze(cls, data: BBAData): - return BBAAnalysis() + mask_accepted: NPARRAY + n_downstream: Optional[int] -def analyze_bba_data(data: BBAData): - bpm_number = data.bpm_number - nbpms = len(data.raw_bpm_x_center[0]) - orbits = np.full((data.n0, 2, nbpms), np.nan) - bpm_pos = np.full((data.n0, 2), np.nan) - for ii in range(data.n0): - if data.plane == 'X': - bpm_pos[ii, 0] = data.raw_bpm_x_up[ii][bpm_number] - bpm_pos[ii, 1] = data.raw_bpm_x_down[ii][bpm_number] - if data.skew_quad: - orbits[ii, 0] = np.array(data.raw_bpm_y_up[ii]) - np.array(data.raw_bpm_y_center[ii]) - orbits[ii, 1] = np.array(data.raw_bpm_y_down[ii]) - np.array(data.raw_bpm_y_center[ii]) - else: - orbits[ii, 0] = np.array(data.raw_bpm_x_up[ii]) - np.array(data.raw_bpm_x_center[ii]) - orbits[ii, 1] = np.array(data.raw_bpm_x_down[ii]) - np.array(data.raw_bpm_x_center[ii]) - - else: - bpm_pos[ii, 0] = data.raw_bpm_y_up[ii][bpm_number] - bpm_pos[ii, 1] = data.raw_bpm_y_down[ii][bpm_number] - if data.skew_quad: - orbits[ii, 0] = np.array(data.raw_bpm_x_up[ii]) - np.array(data.raw_bpm_x_center[ii]) - orbits[ii, 1] = np.array(data.raw_bpm_x_down[ii]) - np.array(data.raw_bpm_x_center[ii]) - else: - orbits[ii, 0] = np.array(data.raw_bpm_y_up[ii]) - np.array(data.raw_bpm_y_center[ii]) - orbits[ii, 1] = np.array(data.raw_bpm_y_down[ii]) - np.array(data.raw_bpm_y_center[ii]) - - slopes, slopes_err, center, center_err = get_slopes_center(bpm_pos, orbits, data.dk1l) - mask_bpm_outlier = reject_bpm_outlier(orbits) - mask_slopes = reject_slopes(slopes) - mask_center = reject_center_outlier(center) - final_mask = np.logical_and(np.logical_and(mask_bpm_outlier, mask_slopes), mask_center) + rejected_outliers: int + rejected_slopes: int + rejected_centers: int - offset, offset_err = get_offset(center, center_err, final_mask) - return offset, offset_err + bpm_outlier_sigma: float + slope_cutoff: float + center_cutoff: float -def get_bba_analysis_data(data: BBAData): - bpm_number = data.bpm_number - nbpms = len(data.raw_bpm_x_center[0]) - orbits = np.full((data.n0, 2, nbpms), np.nan) - bpm_pos = np.full((data.n0, 2), np.nan) - for ii in range(data.n0): - if data.plane == 'X': - bpm_pos[ii, 0] = data.raw_bpm_x_up[ii][bpm_number] - bpm_pos[ii, 1] = data.raw_bpm_x_down[ii][bpm_number] - if data.skew_quad: - orbits[ii, 0] = np.array(data.raw_bpm_y_up[ii]) - np.array(data.raw_bpm_y_center[ii]) - orbits[ii, 1] = np.array(data.raw_bpm_y_down[ii]) - np.array(data.raw_bpm_y_center[ii]) - else: - orbits[ii, 0] = np.array(data.raw_bpm_x_up[ii]) - np.array(data.raw_bpm_x_center[ii]) - orbits[ii, 1] = np.array(data.raw_bpm_x_down[ii]) - np.array(data.raw_bpm_x_center[ii]) - else: - bpm_pos[ii, 0] = data.raw_bpm_y_up[ii][bpm_number] - bpm_pos[ii, 1] = data.raw_bpm_y_down[ii][bpm_number] - if data.skew_quad: - orbits[ii, 0] = np.array(data.raw_bpm_x_up[ii]) - np.array(data.raw_bpm_x_center[ii]) - orbits[ii, 1] = np.array(data.raw_bpm_x_down[ii]) - np.array(data.raw_bpm_x_center[ii]) - else: - orbits[ii, 0] = np.array(data.raw_bpm_y_up[ii]) - np.array(data.raw_bpm_y_center[ii]) - orbits[ii, 1] = np.array(data.raw_bpm_y_down[ii]) - np.array(data.raw_bpm_y_center[ii]) + default_bpm_outlier_sigma: ClassVar[float] = 6 # number of sigma + default_slope_cutoff: ClassVar[float] = 0.10 # of max slope + default_center_cutoff: ClassVar[int] = 1 # number of sigma - slopes, slopes_err, center, center_err = get_slopes_center(bpm_pos, orbits, data.dk1l) - mask_bpm_outlier = reject_bpm_outlier(orbits) - mask_slopes = reject_slopes(slopes) - mask_center = reject_center_outlier(center) - final_mask = np.logical_and(np.logical_and(mask_bpm_outlier, mask_slopes), mask_center) + model_config = ConfigDict(arbitrary_types_allowed=True) - offset, offset_err = get_offset(center, center_err, final_mask) - return bpm_pos, orbits, slopes, center, final_mask, offset \ No newline at end of file + @classmethod + def analyze(cls, data: BBAData, n_downstream: Optional[int] = None, bpm_outlier_sigma: Optional[float] = None, + slope_cutoff: Optional[float] = None, center_cutoff: Optional[float] = None): + + if bpm_outlier_sigma is None: + bpm_outlier_sigma = cls.default_bpm_outlier_sigma + + if slope_cutoff is None: + slope_cutoff = cls.default_slope_cutoff + + if center_cutoff is None: + center_cutoff = cls.default_center_cutoff + + bpm_position, induced_orbit_shift = prep_ios(data=data, n_downstream=n_downstream) + + p, pcov = np.polyfit(bpm_position, induced_orbit_shift, 1, cov=True) + slopes = p[0] + centers = - p[1] / p[0] + slopes_err = np.sqrt(pcov[0,0]) + centers_err = np.sqrt(centers ** 2 * (pcov[0,0] / p[0]**2 + pcov[1,1] / p[1] ** 2 - 2 * pcov[0, 1] / p[0] / p[1])) + + mask_bpm_outlier = reject_bpm_outlier(induced_orbit_shift, bpm_outlier_sigma) + mask_slopes = reject_slopes(slopes, slope_cutoff) + mask_center = reject_center_outlier(centers, center_cutoff) + mask_accepted = np.logical_and(np.logical_and(mask_bpm_outlier, mask_slopes), mask_center) + + # calculate offset as a weighted average of the centers, with weights equal to the absolute slope + cc = centers[mask_accepted] + ww = np.abs(slopes[mask_accepted]) + cc_err = centers_err[mask_accepted] + ww_err = slopes_err[mask_accepted] + + CS = np.sum(ww * cc) + S = np.sum(ww) + VS = np.sum(ww_err**2) # variance of S + VCS = np.sum(cc**2 * ww_err**2 + ww**2 * cc_err**2) # variance of CS + VO = ( VCS / CS**2 + VS / S**2) # (variance of offset) / offset**2 + + offset = CS / S # offset = CS / S, average of centers with abs(slopes) as weights + offset_error = offset * np.sqrt(VO) + + result = BBAAnalysis( + offset=offset, + offset_error=offset_error, + slopes=slopes, + centers=centers, + slopes_err=slopes_err, + centers_err=centers_err, + induced_orbit_shift=induced_orbit_shift, + bpm_position=bpm_position, + mask_accepted=mask_accepted, + n_downstream=n_downstream, + rejected_outliers=sum(~mask_bpm_outlier), + rejected_slopes=sum(~mask_slopes), + rejected_centers=sum(~mask_center), + total_rejections=sum(~mask_accepted), + bpm_outlier_sigma=bpm_outlier_sigma, + slope_cutoff=slope_cutoff, + center_cutoff=center_cutoff, + ) + return result diff --git a/pySC/apps/dispersion.py b/pySC/apps/dispersion.py index f5165ac..5f92ab8 100644 --- a/pySC/apps/dispersion.py +++ b/pySC/apps/dispersion.py @@ -7,9 +7,9 @@ from .codes import DispersionCode from ..utils.file_tools import dict_to_h5 -from ..tuning.tools import get_average_orbit +from .tools import get_average_orbit from .interface import AbstractInterface -from ..core.numpy_type import NPARRAY +from ..core.types import NPARRAY logger = logging.getLogger(__name__) diff --git a/pySC/apps/interface.py b/pySC/apps/interface.py index aa76a32..e993b94 100644 --- a/pySC/apps/interface.py +++ b/pySC/apps/interface.py @@ -1,8 +1,7 @@ from abc import ABC -from pydantic import BaseModel, Field +from pydantic import BaseModel import numpy as np -from ..core.new_simulated_commissioning import SimulatedCommissioning def function_is_overriden(func): obj = func.__self__ @@ -53,45 +52,3 @@ def set_many(self, data: dict[str, float]): waiting time to make sure power supply is settled and eddy currents are decayed to be handled also here. ''' raise NotImplementedError - -class pySCOrbitInterface(AbstractInterface): - SC: SimulatedCommissioning = Field(repr=False) - - def get_orbit(self) -> tuple[np.ndarray, np.ndarray]: - return self.SC.bpm_system.capture_orbit() - - def get_ref_orbit(self) -> tuple[np.ndarray, np.ndarray]: - return self.SC.bpm_system.reference_x, self.SC.bpm_system.reference_y - - def get(self, name: str) -> float: - return self.SC.magnet_settings.get(name) - - def set(self, name: str, value: float): - self.SC.magnet_settings.set(name, value) - return - - def get_many(self, names: list) -> dict[str, float]: - return self.SC.magnet_settings.get_many(names) - - def set_many(self, data: dict[str, float]): - self.SC.magnet_settings.set_many(data) - return - - def get_rf_main_frequency(self) -> float: - return self.SC.rf_settings.main.frequency - - def set_rf_main_frequency(self, frequency: float): - self.SC.rf_settings.main.set_frequency(frequency) - return - -class pySCInjectionInterface(pySCOrbitInterface): - SC: SimulatedCommissioning = Field(repr=False) - n_turns: int = 1 - - def get_orbit(self) -> tuple[np.ndarray, np.ndarray]: - return self.SC.bpm_system.capture_injection(n_turns=self.n_turns) - - def get_ref_orbit(self) -> tuple[np.ndarray, np.ndarray]: - x_ref = np.repeat(self.SC.bpm_system.reference_x[:, np.newaxis], self.n_turns, axis=1) - y_ref = np.repeat(self.SC.bpm_system.reference_y[:, np.newaxis], self.n_turns, axis=1) - return x_ref, y_ref diff --git a/pySC/apps/measurements.py b/pySC/apps/measurements.py index 9712fde..0ff15fc 100644 --- a/pySC/apps/measurements.py +++ b/pySC/apps/measurements.py @@ -3,7 +3,7 @@ from typing import Optional, Generator, Union, Literal from pathlib import Path -from ..tuning.response_matrix import ResponseMatrix +from ..apps.response_matrix import ResponseMatrix from .bba import BBA_Measurement, BBACode from .response import ResponseMeasurement, ResponseCode from .dispersion import DispersionMeasurement, DispersionCode @@ -38,8 +38,8 @@ def orbit_correction(interface: AbstractInterface, response_matrix: ResponseMatr if apply: data = interface.get_many(correctors) - for i, corr in enumerate(correctors): - data[corr] += trim_list[i] * gain + for corr in trims.keys(): + data[corr] += trims[corr] * gain interface.set_many(data) if rf and trims['rf'] != 0: f_rf = interface.get_rf_main_frequency() @@ -48,7 +48,9 @@ def orbit_correction(interface: AbstractInterface, response_matrix: ResponseMatr return trims def measure_bba(interface: AbstractInterface, bpm_name, config: dict, shots_per_orbit: int = 1, - n_corr_steps: int = 7, bipolar: bool = True, skip_save: bool = False, folder_to_save: Optional[Path] = None) -> Generator: + n_corr_steps: int = 7, bipolar: bool = True, skip_save: bool = False, + folder_to_save: Optional[Path] = None, plane: Optional[str] = None, + skip_cycle: bool = False) -> Generator: if folder_to_save is None: folder_to_save = Path('data') @@ -73,10 +75,10 @@ def measure_bba(interface: AbstractInterface, bpm_name, config: dict, shots_per_ n0=n_corr_steps, bpm_number=config['number'], shots_per_orbit=shots_per_orbit, - bipolar=bipolar, + bipolar=bipolar ) - generator = measurement.generate(interface=interface) + generator = measurement.generate(interface=interface, plane=plane) # run measurement loop for code in generator: diff --git a/pySC/apps/response.py b/pySC/apps/response.py index 9248fed..646ac8e 100644 --- a/pySC/apps/response.py +++ b/pySC/apps/response.py @@ -3,15 +3,14 @@ import datetime import logging import numpy as np -from enum import IntEnum from pathlib import Path from contextlib import nullcontext from .codes import ResponseCode from ..utils.file_tools import dict_to_h5 -from ..tuning.tools import get_average_orbit +from .tools import get_average_orbit from .interface import AbstractInterface -from ..core.numpy_type import NPARRAY +from ..core.types import NPARRAY DISABLE_RICH = False diff --git a/pySC/tuning/response_matrix.py b/pySC/apps/response_matrix.py similarity index 96% rename from pySC/tuning/response_matrix.py rename to pySC/apps/response_matrix.py index d3f5e4a..afedc3c 100644 --- a/pySC/tuning/response_matrix.py +++ b/pySC/apps/response_matrix.py @@ -1,12 +1,11 @@ from pydantic import BaseModel, PrivateAttr, model_validator, ConfigDict from typing import Optional, Literal +from ..core.types import NPARRAY import numpy as np import logging import json -from ..core.numpy_type import NPARRAY - -PLANE_TYPE = Literal['H', 'V'] +PLANE_TYPE = Literal['H', 'V', 'Q', 'SQ'] logger = logging.getLogger(__name__) @@ -71,16 +70,16 @@ def initialize_and_check(self): if self.inputs_plane is None: Nh = self._n_inputs // 2 - if Nh % 2 != 0: - logger.warning('Plane of inputs is undefined and number of inputs in response matrix is not even.' - 'Misinterpretation of the input plane is guaranteed!') + if self._n_inputs % 2 != 0: + logger.warning('Plane of inputs is undefined and number of inputs in response matrix is not even. ' + 'Misinterpretation of the input plane is guaranteed!') self.inputs_plane = ['H'] * Nh + ['V'] * (self._n_inputs - Nh) if self.outputs_plane is None: Nh = self._n_outputs // 2 - if Nh % 2 != 0: - logger.warning('Plane of outputs is undefined and number of outputs in response matrix is not even.' - 'Misinterpretation of the output plane is guaranteed!') + if self._n_outputs % 2 != 0: + logger.warning('Plane of outputs is undefined and number of outputs in response matrix is not even. ' + 'Misinterpretation of the output plane is guaranteed!') self.outputs_plane = ['H'] * Nh + ['V'] * (self._n_outputs - Nh) if self.rf_response is None: @@ -138,8 +137,9 @@ def bad_inputs(self) -> list[int]: @bad_inputs.setter def bad_inputs(self, bad_list: list[int]) -> None: - self._bad_inputs = bad_list.copy() - self.make_masks() + if self._bad_inputs != bad_list: + self._bad_inputs = bad_list.copy() + self.make_masks() @property def bad_outputs(self) -> list[int]: @@ -147,8 +147,9 @@ def bad_outputs(self) -> list[int]: @bad_outputs.setter def bad_outputs(self, bad_list: list[int]) -> None: - self._bad_outputs = bad_list.copy() - self.make_masks() + if self._bad_outputs != bad_list: + self._bad_outputs = bad_list.copy() + self.make_masks() def make_masks(self): self._inverse_RM = None # discard inverse RM, by changing bad inputs/outputs it becomes invalid diff --git a/pySC/apps/tools.py b/pySC/apps/tools.py new file mode 100644 index 0000000..605656e --- /dev/null +++ b/pySC/apps/tools.py @@ -0,0 +1,21 @@ +import numpy as np +import logging +from typing import Callable + +logger = logging.getLogger(__name__) + +def get_average_orbit(get_orbit: Callable, n_orbits: int = 10): + orbit_x, orbit_y = get_orbit() + all_orbit_x = np.zeros((len(orbit_x), n_orbits)) + all_orbit_y = np.zeros((len(orbit_y), n_orbits)) + + all_orbit_x[:,0] = orbit_x + all_orbit_y[:,0] = orbit_y + for ii in range(1, n_orbits): + all_orbit_x[:, ii], all_orbit_y[:, ii] = get_orbit() + + mean_orbit_x = np.mean(all_orbit_x, axis=1) + mean_orbit_y = np.mean(all_orbit_y, axis=1) + std_orbit_x = np.std(all_orbit_x, axis=1) + std_orbit_y = np.std(all_orbit_y, axis=1) + return mean_orbit_x, mean_orbit_y, std_orbit_x, std_orbit_y diff --git a/pySC/configuration/bpm_system_conf.py b/pySC/configuration/bpm_system_conf.py index 245441e..2cef049 100644 --- a/pySC/configuration/bpm_system_conf.py +++ b/pySC/configuration/bpm_system_conf.py @@ -1,7 +1,7 @@ import numpy as np import logging -from ..core.new_simulated_commissioning import SimulatedCommissioning +from ..core.simulated_commissioning import SimulatedCommissioning from ..core.bpm_system import BPM_FIELDS_TO_INITIALISE from .general import get_error, get_indices_and_names from .supports_conf import generate_element_misalignments diff --git a/pySC/configuration/general.py b/pySC/configuration/general.py index 3b2da1e..253cd54 100644 --- a/pySC/configuration/general.py +++ b/pySC/configuration/general.py @@ -2,7 +2,7 @@ import logging from .load_config import load_yaml -from ..core.new_simulated_commissioning import SimulatedCommissioning +from ..core.simulated_commissioning import SimulatedCommissioning from ..core.magnet import MAGNET_NAME_TYPE logger = logging.getLogger(__name__) diff --git a/pySC/configuration/generation.py b/pySC/configuration/generation.py index a0c2312..9bb90b3 100644 --- a/pySC/configuration/generation.py +++ b/pySC/configuration/generation.py @@ -2,13 +2,14 @@ import logging from ..core.lattice import ATLattice -from ..core.new_simulated_commissioning import SimulatedCommissioning +from ..core.simulated_commissioning import SimulatedCommissioning from .load_config import load_yaml from .magnets_conf import configure_magnets from .bpm_system_conf import configure_bpms from .rf_conf import configure_rf from .supports_conf import configure_supports from .tuning_conf import configure_tuning +from .injection_conf import configure_injection from .general import scale_error_table logger = logging.getLogger(__name__) @@ -75,4 +76,7 @@ def generate_SC(yaml_filepath: str, seed: int = 1, scale_errors: Optional[int] = logger.info('Configuring tuning...') configure_tuning(SC) + + logger.info('Configuring injection...') + configure_injection(SC) return SC \ No newline at end of file diff --git a/pySC/configuration/injection_conf.py b/pySC/configuration/injection_conf.py new file mode 100644 index 0000000..3754b06 --- /dev/null +++ b/pySC/configuration/injection_conf.py @@ -0,0 +1,57 @@ +import logging + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from ..core.simulated_commissioning import SimulatedCommissioning + +logger = logging.getLogger(__name__) + +def configure_injection(SC: "SimulatedCommissioning") -> None: + injection_conf = dict.get(SC.configuration, 'injection', {}) + + if 'from_design' in injection_conf: + from_design = injection_conf['from_design'] + else: + from_design = True + + if from_design: + logger.info('Setting injection parameters from design twiss at the beginning of the lattice.') + twiss = SC.lattice.twiss + SC.injection.x = twiss['x'][0] + SC.injection.px = twiss['px'][0] + SC.injection.y = twiss['y'][0] + SC.injection.py = twiss['py'][0] + SC.injection.tau = twiss['tau'][0] + SC.injection.delta = twiss['delta'][0] + + SC.injection.betx = twiss['betx'][0] + SC.injection.alfx = twiss['alfx'][0] + SC.injection.bety = twiss['bety'][0] + SC.injection.alfy = twiss['alfy'][0] + + if 'emit_x' in injection_conf: + SC.injection.gemit_x = float(injection_conf['emit_x']) + else: + logger.warning('emit_x not specified in injection configuration. Using default value of 1.') + if 'emit_y' in injection_conf: + SC.injection.gemit_y = float(injection_conf['emit_y']) + else: + logger.warning('emit_y not specified in injection configuration. Using default value of 1.') + if 'bunch_length' in injection_conf: + SC.injection.bunch_length = float(injection_conf['bunch_length']) + else: + logger.warning('bunch_length (in m, r.m.s.) not specified in injection configuration. Using default value of 1.') + if 'energy_spread' in injection_conf: + SC.injection.energy_spread = float(injection_conf['energy_spread']) + else: + logger.warning('energy_spread not specified in injection configuration. Using default value of 1.') + + for var in ['x', 'px', 'y', 'py', 'tau', 'delta', 'betx', 'alfx', 'bety', 'alfy']: + if var in injection_conf: + setattr(SC.injection, var, float(injection_conf[var])) + + for var in ['x_error_syst', 'px_error_syst', 'y_error_syst', 'py_error_syst', 'tau_error_syst', 'delta_error_syst', + 'x_error_stat', 'px_error_stat', 'y_error_stat', 'py_error_stat', 'tau_error_stat', 'delta_error_stat']: + if var in injection_conf: + setattr(SC.injection, var, float(injection_conf[var])) \ No newline at end of file diff --git a/pySC/configuration/magnets_conf.py b/pySC/configuration/magnets_conf.py index 0ee66bb..1335175 100644 --- a/pySC/configuration/magnets_conf.py +++ b/pySC/configuration/magnets_conf.py @@ -1,5 +1,5 @@ from typing import Any -from ..core.new_simulated_commissioning import SimulatedCommissioning +from ..core.simulated_commissioning import SimulatedCommissioning from ..core.magnet import MAGNET_NAME_TYPE from .general import get_error, get_indices_and_names from .supports_conf import generate_element_misalignments diff --git a/pySC/configuration/rf_conf.py b/pySC/configuration/rf_conf.py index c85e9cf..6e2fb4e 100644 --- a/pySC/configuration/rf_conf.py +++ b/pySC/configuration/rf_conf.py @@ -1,6 +1,6 @@ import logging -from ..core.new_simulated_commissioning import SimulatedCommissioning +from ..core.simulated_commissioning import SimulatedCommissioning from ..core.rfsettings import RFCavity, RFSystem from .general import get_error, get_indices_and_names diff --git a/pySC/configuration/supports_conf.py b/pySC/configuration/supports_conf.py index 9c73886..c385fc4 100644 --- a/pySC/configuration/supports_conf.py +++ b/pySC/configuration/supports_conf.py @@ -1,6 +1,6 @@ from typing import Any -from ..core.new_simulated_commissioning import SimulatedCommissioning +from ..core.simulated_commissioning import SimulatedCommissioning from .general import get_error, get_indices_and_names import logging @@ -78,7 +78,8 @@ def configure_supports(SC: SimulatedCommissioning): if 'roll' in level_conf: sigma = get_error(level_conf['roll'], error_table) this_support.roll = SC.rng.normal_trunc(0, sigma) - logger.warning(f'Found {len(zero_length_supports)} zero-length supports in level {level} ({category_name}).') + if len(zero_length_supports): + logger.warning(f'Found {len(zero_length_supports)} zero-length supports in level {level} ({category_name}).') SC.support_system.resolve_graph() SC.support_system.update_all() \ No newline at end of file diff --git a/pySC/configuration/tuning_conf.py b/pySC/configuration/tuning_conf.py index cf21210..78640d7 100644 --- a/pySC/configuration/tuning_conf.py +++ b/pySC/configuration/tuning_conf.py @@ -1,5 +1,6 @@ -from ..core.new_simulated_commissioning import SimulatedCommissioning +from ..core.simulated_commissioning import SimulatedCommissioning from ..core.control import IndivControl +from .general import get_indices_and_names import numpy as np def sort_controls(SC: SimulatedCommissioning, control_names: list[str]) -> list[str]: @@ -37,13 +38,11 @@ def configure_tuning(SC: SimulatedCommissioning) -> None: HCORR = sort_controls(SC, HCORR) SC.tuning.HCORR = HCORR - # if 'sort_correctors' in tuning_conf and tuning_conf['sort_correctors']: if 'VCORR' in tuning_conf: VCORR = configure_family(SC, config_dict=tuning_conf['VCORR']) VCORR = sort_controls(SC, VCORR) SC.tuning.VCORR = VCORR - if 'model_RM_folder' in tuning_conf: SC.tuning.RM_folder = tuning_conf['model_RM_folder'] @@ -55,4 +54,39 @@ def configure_tuning(SC: SimulatedCommissioning) -> None: if 'bba_magnets' in tuning_conf: bba_magnets = configure_family(SC, config_dict=tuning_conf['bba_magnets']) bba_magnets = sort_controls(SC, bba_magnets) - SC.tuning.bba_magnets = bba_magnets \ No newline at end of file + SC.tuning.bba_magnets = bba_magnets + + if 'tune' in tuning_conf: + tune_conf = tuning_conf['tune'] + assert 'controls_1' in tune_conf, 'controls_1 missing from tune configuration.' + assert 'controls_2' in tune_conf, 'controls_2 missing from tune configuration.' + + control_1_conf = tune_conf['controls_1'] + assert 'regex' in control_1_conf, 'regex is missing from controls_1 in tune configuration.' + assert 'component' in control_1_conf, 'component is missing from controls_1 in tune configuration.' + component = control_1_conf['component'] + _, names = get_indices_and_names(SC, 'tune/controls_1', control_1_conf) + controls_1 = [name + '/' + component for name in names] + + if not set(controls_1).issubset(SC.magnet_settings.controls.keys()): + raise Exception(f'At least one of tune/controls_1 was not declared! ({component=})') + + control_2_conf = tune_conf['controls_2'] + assert 'component' in control_2_conf, 'component is missing from controls_2 in tune configuration.' + assert 'regex' in control_2_conf, 'regex is missing from controls_1 in tune configuration.' + component = control_2_conf['component'] + _, names = get_indices_and_names(SC, 'tune/controls_2', control_2_conf) + controls_2 = [name + '/' + component for name in names] + + if not set(controls_2).issubset(SC.magnet_settings.controls.keys()): + raise Exception(f'At least one of tune/controls_2 was not declared! ({component=})') + + SC.tuning.tune.controls_1 = controls_1 + SC.tuning.tune.controls_2 = controls_2 + + if 'c_minus' in tuning_conf: + c_minus_conf = tuning_conf['c_minus'] + if 'controls' in c_minus_conf: + c_minus_controls = configure_family(SC, config_dict=c_minus_conf['controls']) + c_minus_controls = sort_controls(SC, c_minus_controls) + SC.tuning.c_minus.controls = c_minus_controls \ No newline at end of file diff --git a/pySC/control_system/server.py b/pySC/control_system/server.py index 784fce6..2ff40b5 100644 --- a/pySC/control_system/server.py +++ b/pySC/control_system/server.py @@ -9,7 +9,7 @@ from .rf_server import rf_server if TYPE_CHECKING: - from ..core.new_simulated_commissioning import SimulatedCommissioning + from ..core.simulated_commissioning import SimulatedCommissioning HOST = "127.0.0.1" # Standard loopback interface address (localhost) # PORT = 13131 # Port to listen on (non-privileged ports are > 1023) diff --git a/pySC/core/bpm_system.py b/pySC/core/bpm_system.py index f075bc4..d02f20c 100644 --- a/pySC/core/bpm_system.py +++ b/pySC/core/bpm_system.py @@ -1,11 +1,11 @@ from pydantic import BaseModel, PrivateAttr, ConfigDict, model_validator from typing import TYPE_CHECKING, Optional, Union -from .numpy_type import NPARRAY +from .types import NPARRAY import numpy as np import warnings if TYPE_CHECKING: - from .new_simulated_commissioning import SimulatedCommissioning + from .simulated_commissioning import SimulatedCommissioning BPM_NAME_TYPE = Union[str, int] diff --git a/pySC/core/control.py b/pySC/core/control.py index 4a9df68..2e66f5a 100644 --- a/pySC/core/control.py +++ b/pySC/core/control.py @@ -1,6 +1,7 @@ from __future__ import annotations from pydantic import BaseModel, PrivateAttr, PositiveInt from typing import Optional, Literal, Union, TYPE_CHECKING +from .types import BaseModelWithSave if TYPE_CHECKING: from .magnet import ControlMagnetLink @@ -22,6 +23,9 @@ class KnobControl(BaseModel, extra="forbid"): control_names: list[str] weights: Optional[list[float]] = None +class KnobData(BaseModelWithSave, extra="forbid"): + data: dict[str, KnobControl] = {} + class Control(BaseModel, extra="forbid"): name: str setpoint: float diff --git a/pySC/core/injection.py b/pySC/core/injection.py index 15fea04..907f134 100644 --- a/pySC/core/injection.py +++ b/pySC/core/injection.py @@ -3,7 +3,7 @@ import numpy as np if TYPE_CHECKING: - from .new_simulated_commissioning import SimulatedCommissioning + from .simulated_commissioning import SimulatedCommissioning CAVITY_NAME_TYPE = Union[str, int] diff --git a/pySC/core/lattice.py b/pySC/core/lattice.py index 58d46e4..632f57c 100644 --- a/pySC/core/lattice.py +++ b/pySC/core/lattice.py @@ -52,6 +52,8 @@ class ATLattice(Lattice): # the different machine types at_simulator: None = None use: str = 'RING' + orbit_guess: Optional[list[float]] = None + use_orbit_guess: bool = False _ring: at.Lattice = PrivateAttr(default=None) _design: at.Lattice = PrivateAttr(default=None) @@ -64,6 +66,9 @@ def load_lattice(self): self._ring.enable_6d() self._design.enable_6d() + if self.orbit_guess is None: + self.orbit_guess = [0] * 6 + self._twiss = self.get_twiss(use_design=True) return self @@ -87,9 +92,22 @@ def get_name_from_index(self, index: int): logger.warning(f'{self.design[index].FamName} at index {index} has no "{self.naming}" field. Falling back to index-based name.') return str(index) + def update_orbit_guess(self, n_turns=1000): + bunch = np.zeros([1,6]) + out = self.track(bunch, n_turns=n_turns, coordinates=['x','px','y','py','delta','tau']) + guess = np.nanmean(out, axis=3).flatten() + logger.info('Found orbit guess:') + logger.info(f' x = {guess[0]}') + logger.info(f' px = {guess[1]}') + logger.info(f' y = {guess[2]}') + logger.info(f' py = {guess[3]}') + logger.info(f' tau = {guess[5]}') + logger.info(f' delta = {guess[4]}') + self.orbit_guess = list(guess) + def track(self, bunch: nparray, indices: Optional[list[int]] = None, n_turns: int = 1, use_design: bool = False, coordinates: Optional[list] = None) -> nparray: new_bunch = bunch.copy() - new_bunch[:,4], new_bunch[:,5] = new_bunch[:,5], new_bunch[:,4] # swap zeta and delta for AT + new_bunch[:,4], new_bunch[:,5] = new_bunch[:,5].copy(), new_bunch[:,4].copy() # swap zeta and delta for AT if use_design: ring = self._design else: @@ -128,7 +146,12 @@ def get_orbit(self, indices: list[int] = None, use_design=False) -> dict: if indices is None: indices = range(len(self._design)) ring = self._design if use_design else self._ring - _, orbit = at.find_orbit(ring, refpts=indices) + if self.use_orbit_guess: + assert self.no_6d is False, "Using orbit guesses with a 4D lattice is not checked/implemented." + _, orbit = at.find_orbit(ring, refpts=indices, guess=np.array(self.orbit_guess)) + else: + _, orbit = at.find_orbit(ring, refpts=indices) + return orbit[:, [0,2]].T def get_twiss(self, indices: Optional[list[int]] = None, use_design=False) -> dict: @@ -139,7 +162,13 @@ def get_twiss(self, indices: Optional[list[int]] = None, use_design=False) -> di if indices is None: indices = range(len(self._design)) ring = self._design if use_design else self._ring - _, ringdata, elemdata = at.get_optics(ring, refpts=indices, get_chrom=True) + if self.use_orbit_guess: + assert self.no_6d is False, "Using orbit guesses with a 4D lattice is not checked/implemented." + orbit0, _ = at.find_orbit(ring, refpts=indices, guess=np.array(self.orbit_guess)) + else: + orbit0, _ = at.find_orbit(ring, refpts=indices) + + _, ringdata, elemdata = at.get_optics(ring, refpts=indices, get_chrom=True, orbit=orbit0) qs = ringdata['tune'][2] if not self.no_6d else 0 # doesn't exist when ring has 6d disabled @@ -193,6 +222,31 @@ def get_tune(self, method='6d', use_design=False) -> tuple[float, float]: return qx, qy + def get_chromaticity(self, method='6d', use_design=False) -> tuple[float, float]: + assert method in ['4d', '6d'] + ring = self._design if use_design else self._ring + + if self.no_6d: + logger.warning("Lattice has 6d disabled, using 4d method instead.") + method = '4d' + + if method == '4d' and not self.no_6d: + ring.disable_6d() + + try: + chroms = ring.get_chrom() + dqx = chroms[0] + dqy = chroms[1] + except Exception as e: + logger.error(f"Error computing chromaticity, {type(e)}: {e}") + dqx = np.nan + dqy = np.nan + + if method == '4d' and not self.no_6d: + ring.enable_6d() + + return dqx, dqy + def find_with_regex(self, regex: str) -> list[int]: """ Find elements in the ring that match the given regular expression. @@ -303,3 +357,19 @@ def update_cavity(self, index: int, voltage: float, phase: float, frequency: flo elem.TimeLag = timelag elem.Frequency = frequency return + + def one_turn_matrix(self, use_design=False): + ring = self._design if use_design else self._ring + + if self.use_orbit_guess: + assert self.no_6d is False, "Using orbit guesses with a 4D lattice is not checked/implemented." + orbit0, _ = at.find_orbit(ring, guess=np.array(self.orbit_guess)) + else: + orbit0, _ = at.find_orbit(ring) + + if self.no_6d: + M = ring.find_m44(orbit=orbit0)[0] + else: + M = ring.find_m66(orbit=orbit0)[0] + + return M diff --git a/pySC/core/numpy_type.py b/pySC/core/numpy_type.py deleted file mode 100644 index 525fcf0..0000000 --- a/pySC/core/numpy_type.py +++ /dev/null @@ -1,9 +0,0 @@ - -from pydantic import BeforeValidator, PlainSerializer -from typing import Annotated -import numpy as np - -NPARRAY = Annotated[np.ndarray, - BeforeValidator(lambda x: np.array(x)), - PlainSerializer(lambda x: x.tolist(), return_type=list) - ] \ No newline at end of file diff --git a/pySC/core/rfsettings.py b/pySC/core/rfsettings.py index 1fd8973..28cf3ae 100644 --- a/pySC/core/rfsettings.py +++ b/pySC/core/rfsettings.py @@ -3,7 +3,7 @@ from pydantic import BaseModel, Field, PrivateAttr if TYPE_CHECKING: - from .new_simulated_commissioning import SimulatedCommissioning + from .simulated_commissioning import SimulatedCommissioning CAVITY_NAME_TYPE = Union[str, int] diff --git a/pySC/core/rng.py b/pySC/core/rng.py index f8e2740..1bcdcb8 100644 --- a/pySC/core/rng.py +++ b/pySC/core/rng.py @@ -43,5 +43,8 @@ def normal_trunc(self, loc: float = 0, scale: float = 1, def normal(self, loc: float = 0, scale: float = 1, size: Optional[int] = None) -> Union[float, np.ndarray]: return self._rng.normal(loc=loc, scale=scale, size=size) + def uniform(self, low: float = 0, high: float = 1, size: Optional[int] = None) -> Union[float, np.ndarray]: + return low + self._rng.random(size=size) * (high - low) + def randomize_rng(self) -> None: self._rng = default_rng() diff --git a/pySC/core/new_simulated_commissioning.py b/pySC/core/simulated_commissioning.py similarity index 62% rename from pySC/core/new_simulated_commissioning.py rename to pySC/core/simulated_commissioning.py index cfb2371..72608b6 100644 --- a/pySC/core/new_simulated_commissioning.py +++ b/pySC/core/simulated_commissioning.py @@ -1,6 +1,8 @@ -from pydantic import BaseModel, model_validator, Field +from pydantic import BaseModel, model_validator, Field, PrivateAttr from typing import Optional import json +import numpy as np +import logging from .lattice import ATLattice, XSuiteLattice from .magnetsettings import MagnetSettings @@ -12,6 +14,9 @@ from .injection import InjectionSettings from .rng import RNG from ..control_system.server import start_server as _start_server +from .control import KnobData + +logger = logging.getLogger(__name__) class SimulatedCommissioning(BaseModel, extra="forbid"): lattice: ATLattice | XSuiteLattice @@ -30,17 +35,22 @@ class SimulatedCommissioning(BaseModel, extra="forbid"): seed: int = Field(default=1, frozen=True) rng: Optional[RNG] = None + _initialized: bool = PrivateAttr(default=False) + @model_validator(mode="after") def initialize(self): - self.propagate_parents() - if self.rng is None: - self.rng = RNG(seed=self.seed) - self.support_system.update_all() - self.design_magnet_settings.sendall() - self.magnet_settings.sendall() - for rf_settings in [self.rf_settings, self.design_rf_settings]: - for system_name in rf_settings.systems: - rf_settings.systems[system_name].trigger_update() + if not self._initialized: + self._initialized = True + + self.propagate_parents() + if self.rng is None: + self.rng = RNG(seed=self.seed) + self.support_system.update_all() + self.design_magnet_settings.sendall() + self.magnet_settings.sendall() + for rf_settings in [self.rf_settings, self.design_rf_settings]: + for system_name in rf_settings.systems: + rf_settings.systems[system_name].trigger_update() return self @classmethod @@ -79,6 +89,8 @@ def propagate_parents(self) -> None: self.injection._parent = self self.tuning._parent = self self.tuning.tune._parent = self.tuning + self.tuning.chromaticity._parent = self.tuning + self.tuning.c_minus._parent = self.tuning self.tuning.rf._parent = self.tuning return @@ -89,4 +101,19 @@ def copy(self) -> "SimulatedCommissioning": """ Create a copy of the SimulatedCommissioning instance. """ - return SimulatedCommissioning.model_validate(self.model_dump()) \ No newline at end of file + return SimulatedCommissioning.model_validate(self.model_dump()) + + def import_knob(self, json_filename: str) -> None: + with open(json_filename, 'r') as fp: + obj = json.load(fp) + knob_data = KnobData.model_validate(obj) + + for knob_name in knob_data.data.keys(): + assert knob_name not in self.magnet_settings.controls.keys(), f"knob with name {knob_name} already exists SC.magnet_settings." + assert knob_name not in self.design_magnet_settings.controls.keys(), f"knob with name {knob_name} already exists SC.design_magnet_settings." + + for knob_name in knob_data.data.keys(): + tdata = knob_data.data[knob_name] + self.magnet_settings.add_knob(knob_name=knob_name, control_names=tdata.control_names, weights=tdata.weights) + self.design_magnet_settings.add_knob(knob_name=knob_name, control_names=tdata.control_names, weights=tdata.weights) + logger.info(f'Imported knob {knob_name} with sum(|weights|) = {np.sum(np.abs(tdata.weights)):.2e}') diff --git a/pySC/core/supports.py b/pySC/core/supports.py index a83e803..5c8d482 100644 --- a/pySC/core/supports.py +++ b/pySC/core/supports.py @@ -10,7 +10,7 @@ from ..utils.sc_tools import update_transformation if TYPE_CHECKING: - from .new_simulated_commissioning import SimulatedCommissioning + from .simulated_commissioning import SimulatedCommissioning logger = logging.getLogger(__name__) @@ -248,6 +248,9 @@ def get_support_offset(self, s, support_level_key): dx1, dy1 = self.get_total_offset(supp_index, supp_level, endpoint='start') dx2, dy2 = self.get_total_offset(supp_index, supp_level, endpoint='end') + if support.length == 0.: #ZERO_LENGTH_THRESHOLD here?? + return np.array([dx1, dy1]) + dx = (dx2 - dx1)/(s2 - s1 + corr_s2) * (s - s1 + corr_s) + dx1 dy = (dy2 - dy1)/(s2 - s1 + corr_s2) * (s - s1 + corr_s) + dy1 return np.array([dx, dy]) diff --git a/pySC/core/types.py b/pySC/core/types.py new file mode 100644 index 0000000..b3056ac --- /dev/null +++ b/pySC/core/types.py @@ -0,0 +1,27 @@ +from pydantic import BeforeValidator, PlainSerializer, BaseModel +from typing import Annotated, Union, Optional +import numpy as np +from pathlib import Path + +NPARRAY = Annotated[np.ndarray, + BeforeValidator(lambda x: np.array(x)), + PlainSerializer(lambda x: x.tolist(), return_type=list) + ] + +class BaseModelWithSave(BaseModel): + def save_as(self, filename: Union[Path, str], indent: Optional[int] = None) -> None: + if type(filename) is not Path: + filename = Path(filename) + + data = self.model_dump() + suffix = filename.suffix + with open(filename, 'w') as fp: + if suffix in ['', '.json']: + import json + json.dump(data, fp, indent=indent) + elif suffix == '.yaml': + import yaml + yaml.safe_dump(data, fp, indent=indent) + else: + raise Exception(f'Unknown file extension: {suffix}.') + return \ No newline at end of file diff --git a/pySC/tuning/averaging.py b/pySC/tuning/averaging.py index 3f8b8da..ea71733 100644 --- a/pySC/tuning/averaging.py +++ b/pySC/tuning/averaging.py @@ -1,4 +1,4 @@ -from typing import Optional, Union, TYPE_CHECKING +from typing import TYPE_CHECKING import numpy as np if TYPE_CHECKING: diff --git a/pySC/tuning/c_minus.py b/pySC/tuning/c_minus.py new file mode 100644 index 0000000..7532378 --- /dev/null +++ b/pySC/tuning/c_minus.py @@ -0,0 +1,129 @@ +from typing import Optional, TYPE_CHECKING +from pydantic import BaseModel, PrivateAttr, ConfigDict +import numpy as np +import logging + +from ..core.control import KnobControl, KnobData +from ..utils import rdt +from ..apps.response_matrix import ResponseMatrix + +if TYPE_CHECKING: + from .tuning_core import Tuning + +logger = logging.getLogger(__name__) + +class CMinus(BaseModel, extra="forbid"): + knob_real: str = 'c_minus_real' + knob_imag: str = 'c_minus_imag' + controls: list[str] = [] + _parent: Optional['Tuning'] = PrivateAttr(default=None) + + model_config = ConfigDict(arbitrary_types_allowed=True) + + def c_minus_response(self, delta: float = 1e-5): + SC = self._parent._parent + + N = len(self.controls) + assert N > 0 + + c_minus_0 = rdt.calculate_c_minus(SC, use_design=True) + + delta_c_minus = np.zeros_like(self.controls, dtype=complex) + for ii, control in enumerate(self.controls): + logger.info(f'[{ii+1:d}/{N:d}] Calculating ideal c_minus response of {control}.') + sp0 = SC.magnet_settings.get(control, use_design=True) + SC.magnet_settings.set(control, delta + sp0, use_design=True) + c_minus = rdt.calculate_c_minus(SC, use_design=True) + SC.magnet_settings.set(control, sp0, use_design=True) + delta_c_minus[ii] = (c_minus - c_minus_0) / delta + + return delta_c_minus + + def create_c_minus_knobs(self, delta: float = 1e-5) -> None: + if not len(self.controls) > 0: + raise Exception('c_minus.controls is empty. Please set.') + + matrix = np.zeros((2,len(self.controls))) + delta_c_minus = self.c_minus_response(delta=delta) + matrix[0] = delta_c_minus.real + matrix[1] = delta_c_minus.imag + + c_minus_response_matrix = ResponseMatrix(matrix=matrix, + outputs_plane=['SQ'] * len(self.controls) + ) + inverse_matrix = c_minus_response_matrix.build_pseudoinverse().matrix + c_minus_real_knob = inverse_matrix[:, 0] + c_minus_imag_knob = inverse_matrix[:, 1] + + knob_data = KnobData(data={ + self.knob_real: KnobControl(control_names=self.controls, weights=list(c_minus_real_knob)), + self.knob_imag: KnobControl(control_names=self.controls, weights=list(c_minus_imag_knob)) + }) + + logger.info(f"{self.knob_real}: sum(|weights|)={np.sum(np.abs(c_minus_real_knob)):.2e}") + logger.info(f"{self.knob_imag}: sum(|weights|)={np.sum(np.abs(c_minus_imag_knob)):.2e}") + return knob_data + + def trim(self, real: float = 0, imag: float = 0, use_design: bool = False) -> None: + SC = self._parent._parent + + if use_design: + assert self.knob_real in SC.design_magnet_settings.controls.keys() + assert self.knob_imag in SC.design_magnet_settings.controls.keys() + else: + assert self.knob_real in SC.magnet_settings.controls.keys() + assert self.knob_imag in SC.magnet_settings.controls.keys() + + real0 = SC.magnet_settings.get(self.knob_real, use_design=use_design) + imag0 = SC.magnet_settings.get(self.knob_imag, use_design=use_design) + + SC.magnet_settings.set(self.knob_real, real0 + real, use_design=use_design) + SC.magnet_settings.set(self.knob_imag, imag0 + imag, use_design=use_design) + + return + + def correct(self, target_c_minus_real: float = 0, target_c_minus_imag: float = 0, + n_iter: int = 1, gain: float = 1, measurement_method: str = 'cheat') -> None: + ''' + Correct c_minus to the target values. + Parameters + ---------- + target_c_minus_real : float + Target real c_minus. Default is 0. + target_c_minus_imag : float, optional + Target imaginary c_minus. Default is 0. + n_iter : int, optional + Number of correction iterations. Default is 1. + gain : float, optional + Gain for the correction. Default is 1. + measurement_method : str, optional + Method to measure c_minus. Options are 'cheat'. + Default is 'cheat'. + ''' + + if measurement_method not in ['cheat']: + raise NotImplementedError(f'{measurement_method=} not implemented yet.') + + for _ in range(n_iter): + if measurement_method == 'cheat': + c_minus = self.cheat() + else: + raise Exception(f'Unknown measurement_method {measurement_method}') + if c_minus is None or c_minus != c_minus: + logger.info("C_minus measurement failed, skipping correction.") + return + logger.info(f"Measured c_minus = {c_minus:.4f}") + delta_real = c_minus.real - target_c_minus_real + delta_imag = c_minus.imag - target_c_minus_imag + self.trim(real=-gain*delta_real, imag=-gain*delta_imag) + return + + def cheat(self, use_design: bool = False) -> complex: + SC = self._parent._parent + try: + c_minus = rdt.calculate_c_minus(SC, use_design=use_design) + except Exception as exc: + logger.warning(f"Exception while measuring c_minus: {exc}") + c_minus = np.nan + + return c_minus \ No newline at end of file diff --git a/pySC/tuning/chromaticity.py b/pySC/tuning/chromaticity.py new file mode 100644 index 0000000..d7f63f0 --- /dev/null +++ b/pySC/tuning/chromaticity.py @@ -0,0 +1,140 @@ +from typing import Optional, TYPE_CHECKING +from pydantic import BaseModel, PrivateAttr, ConfigDict +import numpy as np +import logging + +from ..core.types import NPARRAY + +if TYPE_CHECKING: + from .tuning_core import Tuning + +logger = logging.getLogger(__name__) + +class Chromaticity(BaseModel, extra="forbid"): + controls_1: list[str] = [] + controls_2: list[str] = [] + response_matrix: Optional[NPARRAY] = None + inverse_response_matrix: Optional[NPARRAY] = None + _parent: Optional['Tuning'] = PrivateAttr(default=None) + + model_config = ConfigDict(arbitrary_types_allowed=True) + + @property + def design_dqx(self): + return self._parent._parent.lattice.twiss['dqx'] + + @property + def design_dqy(self): + return self._parent._parent.lattice.twiss['dqy'] + + def chromaticity_response(self, controls: list[str], delta: float = 1e-5): + SC = self._parent._parent + dq1_i, dq2_i = SC.lattice.get_chromaticity(use_design=True) + + ref_data = SC.design_magnet_settings.get_many(controls) + data = {key: ref_data[key] + delta for key in ref_data.keys()} + SC.design_magnet_settings.set_many(data) + + dq1_f, dq2_f = SC.lattice.get_chromaticity(use_design=True) + + SC.design_magnet_settings.set_many(ref_data) + + delta_dq1 = (dq1_f - dq1_i) / delta + delta_dq2 = (dq2_f - dq2_i) / delta + return delta_dq1, delta_dq2 + + def build_response_matrix(self, delta: float = 1e-5) -> None: + if not len(self.controls_1) > 0: + raise Exception('chromaticity.controls_1 is empty. Please set.') + if not len(self.controls_2) > 0: + raise Exception('chromaticity.controls_2 is empty. Please set.') + + RM = np.zeros((2,2)) + RM[:, 0] = self.chromaticity_response(self.controls_1, delta=delta) + RM[:, 1] = self.chromaticity_response(self.controls_2, delta=delta) + iRM = np.linalg.inv(RM) + + self.response_matrix = RM + self.inverse_response_matrix = iRM + return + + def trim(self, delta_dqx: float = 0, delta_dqy: float = 0, use_design: bool = False) -> None: + SC = self._parent._parent + if self.inverse_response_matrix is None: + logger.info('Did not find inverse (chromaticity) response matrix. Building now.') + self.build_response_matrix() + + delta1, delta2 = np.dot(self.inverse_response_matrix, [delta_dqx, delta_dqy]) + ref_data1 = SC.magnet_settings.get_many(self.controls_1, use_design=use_design) + ref_data2 = SC.magnet_settings.get_many(self.controls_2, use_design=use_design) + data1 = {key: ref_data1[key] + delta1 for key in ref_data1.keys()} + data2 = {key: ref_data2[key] + delta2 for key in ref_data2.keys()} + + SC.magnet_settings.set_many(data1, use_design=use_design) + SC.magnet_settings.set_many(data2, use_design=use_design) + return + + def correct(self, target_dqx: Optional[float] = None, target_dqy: Optional[float] = None, + n_iter: int = 1, gain: float = 1, measurement_method: str = 'cheat'): + ''' + Correct the chromaticity to the target values. + Parameters + ---------- + target_dqx : float, optional + Target horizontal chromaticity. If None, use design chromaticity. + target_dqy : float, optional + Target vertical chromaticity. If None, use design chromaticity. + n_iter : int, optional + Number of correction iterations. Default is 1. + gain : float, optional + Gain for the correction. Default is 1. + measurement_method : str, optional + Method to measure the chromaticity. Options are 'cheat', 'cheat4d'. + Default is 'cheat'. + ''' + + if measurement_method not in ['cheat', 'cheat4d']: + raise NotImplementedError(f'{measurement_method=} not implemented yet.') + + if target_dqx is None: + target_dqx = self.design_dqx + + if target_dqy is None: + target_dqy = self.design_dqy + + for _ in range(n_iter): + if measurement_method == 'cheat': + dqx, dqy = self.cheat() + elif measurement_method == 'cheat4d': + dqx, dqy = self.cheat4d() + else: + raise Exception(f'Unknown measurement_method {measurement_method}') + if dqx is None or dqy is None or dqx != dqx or dqy != dqy: + logger.info("Chromaticity measurement failed, skipping correction.") + return + logger.info(f"Measured tune: dqx={dqx:.4f}, dqy={dqy:.4f}") + delta_dqx = dqx - target_dqx + delta_dqy = dqy - target_dqy + logger.info(f"Delta tune: delta_dqx={delta_dqx:.4f}, delta_dqy={delta_dqy:.4f}") + self.trim(delta_dqx=-gain*delta_dqx, delta_dqy=-gain*delta_dqy) + return + + def cheat4d(self) -> tuple[float, float]: + SC = self._parent._parent + dqx, dqy = SC.lattice.get_chromaticity(method='4d') + delta_x = dqx - self.design_dqx + delta_y = dqy - self.design_dqy + logger.info(f"Horizontal chromaticity: dQx = {dqx:.3f} (Δ = {delta_x:.3f})") + logger.info(f"Vertical chromaticity: dQy = {dqy:.3f} (Δ = {delta_y:.3f})") + + return dqx, dqy + + def cheat(self) -> tuple[float, float]: + SC = self._parent._parent + dqx, dqy = SC.lattice.get_chromaticity(method='6d') + delta_x = dqx - self.design_dqx + delta_y = dqy - self.design_dqy + logger.info(f"Horizontal tune: Qx = {dqx:.3f} (Δ = {delta_x:.3f})") + logger.info(f"Vertical tune: Qy = {dqy:.3f} (Δ = {delta_y:.3f})") + + return dqx, dqy \ No newline at end of file diff --git a/pySC/tuning/orbit_bba.py b/pySC/tuning/orbit_bba.py index 8401720..d80352f 100644 --- a/pySC/tuning/orbit_bba.py +++ b/pySC/tuning/orbit_bba.py @@ -1,15 +1,17 @@ from pydantic import BaseModel from typing import TYPE_CHECKING, Dict, Literal import numpy as np -from ..core.control import IndivControl +import logging -if TYPE_CHECKING: - from ..core.new_simulated_commissioning import SimulatedCommissioning +from ..core.control import IndivControl +from .pySC_interface import pySCOrbitInterface +from ..apps import measure_bba +from ..apps.bba import BBAAnalysis -BPM_OUTLIER = 6 # number of sigma -SLOPE_FACTOR = 0.10 # of max slope -CENTER_OUTLIER = 1 # number of sigma +logger = logging.getLogger(__name__) +if TYPE_CHECKING: + from ..core.simulated_commissioning import SimulatedCommissioning def get_mag_s_pos(SC: "SimulatedCommissioning", MAG: list[str]): s_list = [] @@ -30,7 +32,6 @@ class Orbit_BBA_Configuration(BaseModel, extra="forbid"): @classmethod def generate_config(cls, SC: "SimulatedCommissioning", max_dx_at_bpm = 1e-3, max_modulation=20e-6): - # max_modulation=600e-6, max_dx_at_bpm=1.5e-3 config = {} RM_name = 'orbit' @@ -133,176 +134,26 @@ def orbit_bba(SC: "SimulatedCommissioning", bpm_name: str, n_corr_steps: int = 7 SC.tuning.generate_orbit_bba_config() config = SC.tuning.orbit_bba_config.config[bpm_name] - corr = config[f'{plane}CORR'] - quad = config['QUAD'] - bpm_number = config['number'] - corr_delta_sp = config[f'{plane}CORR_delta'] - quad_delta = config[f'QUAD_dk_{plane}'] - - n_bpms = len(SC.bpm_system.indices) - - ## define get_orbit - def get_orbit(): - x, y = SC.bpm_system.capture_orbit(bba=False, subtract_reference=False, use_design=False) - x = x / shots_per_orbit - y = y / shots_per_orbit - for i in range(shots_per_orbit-1): - x_tmp, y_tmp = SC.bpm_system.capture_orbit(bba=False, subtract_reference=False, use_design=False) - x = x + x_tmp / shots_per_orbit - y = y + y_tmp / shots_per_orbit - - return (x.flatten(order='F'), y.flatten(order='F')) - - - ## define settings to get/set - settings = SC.magnet_settings - - bpm_pos = np.zeros([n_corr_steps, 2]) - orbits = np.zeros([n_corr_steps, 2, n_bpms]) - - corr_sp0 = settings.get(corr) - quad_sp0 = settings.get(quad) - - corr_sp_array = np.linspace(-corr_delta_sp, corr_delta_sp, n_corr_steps) + corr_sp0 - for i_corr, corr_sp in enumerate(corr_sp_array): - settings.set(corr, corr_sp) - orbit_x_center, orbit_y_center = get_orbit() - - settings.set(quad, quad_sp0 + quad_delta) - orbit_x_up, orbit_y_up = get_orbit() - - settings.set(quad, quad_sp0 - quad_delta) - orbit_x_down, orbit_y_down = get_orbit() - - settings.set(quad, quad_sp0) - - if plane == 'H': - orbit_main_down = orbit_x_down - orbit_main_up = orbit_x_up - orbit_main_center = orbit_x_center - orbit_other_down = orbit_y_down - orbit_other_up = orbit_y_up - orbit_other_center = orbit_y_center - else: - orbit_main_down = orbit_y_down - orbit_main_up = orbit_y_up - orbit_main_center = orbit_y_center - orbit_other_down = orbit_x_down - orbit_other_up = orbit_x_up - orbit_other_center = orbit_x_center - - bpm_pos[i_corr, 0] = orbit_main_down[bpm_number] - bpm_pos[i_corr, 1] = orbit_main_up[bpm_number] - info = SC.magnet_settings.controls[quad].info - assert type(info) is IndivControl - assert info.order == 2 - if info.component == 'B': - orbits[i_corr, 0, :] = orbit_main_down - orbit_main_center - orbits[i_corr, 1, :] = orbit_main_up - orbit_main_center - elif info.component == 'A': - orbits[i_corr, 0, :] = orbit_other_down - orbit_other_center - orbits[i_corr, 1, :] = orbit_other_up - orbit_other_center - else: - raise Exception(f'Invalid magnet for BBA: {quad}') - - settings.set(corr, corr_sp0) - settings.set(quad, quad_sp0) - - slopes, slopes_err, center, center_err = get_slopes_center(bpm_pos, orbits, quad_delta) - mask_bpm_outlier = reject_bpm_outlier(orbits) - mask_slopes = reject_slopes(slopes) - mask_center = reject_center_outlier(center) - final_mask = np.logical_and(np.logical_and(mask_bpm_outlier, mask_slopes), mask_center) - - offset, offset_err = get_offset(center, center_err, final_mask) - - return offset, offset_err - -def reject_bpm_outlier(orbits): - n_k1 = orbits.shape[1] - n_bpms = orbits.shape[2] - mask = np.ones(n_bpms, dtype=bool) - for k1_step in range(n_k1): - for bpm in range(n_bpms): - data = orbits[:, k1_step, bpm] - if np.any(data - np.mean(data) > BPM_OUTLIER * np.std(data)): - mask[bpm] = False - - # n_rejections = n_bpms - np.sum(mask) - # print(f"Rejected {n_rejections}/{n_bpms} bpms for bpm outliers ( > {BPM_OUTLIER} r.m.s. )") - return mask - -def reject_slopes(slopes): - max_slope = np.nanmax(np.abs(slopes)) - mask = np.abs(slopes) > SLOPE_FACTOR * max_slope - - # n_rejections = len(slopes) - np.sum(mask) - # print(f"Rejected {n_rejections}/{len(slopes)} bpms for small slope ( < {SLOPE_FACTOR} * max(slope) )") - return mask - -def reject_center_outlier(center): - mean = np.nanmean(center) - std = np.nanstd(center) - mask = abs(center - mean) < CENTER_OUTLIER * std - - # n_rejections = len(center) - np.sum(mask) - # print(f"Rejected {n_rejections}/{len(center)} bpms for center away from mean ( > {CENTER_OUTLIER} r.m.s. )") - return mask - -def get_slopes_center(bpm_pos, orbits, dk1): - mag_vec = np.array([dk1, -dk1]) - num_downstream_bpms = orbits.shape[2] - fit_order = 1 - x = np.mean(bpm_pos, axis=1) - x_mask = ~np.isnan(x) - err = np.mean(np.std(bpm_pos[x_mask, :], axis=1)) - x = x[x_mask] - new_tmp_tra = orbits[x_mask, :, :] - - tmp_slope = np.full((new_tmp_tra.shape[0], new_tmp_tra.shape[2]), np.nan) - tmp_slope_err = np.full((new_tmp_tra.shape[0], new_tmp_tra.shape[2]), np.nan) - center = np.full((new_tmp_tra.shape[2]), np.nan) - center_err = np.full((new_tmp_tra.shape[2]), np.nan) - for i in range(new_tmp_tra.shape[0]): - for j in range(new_tmp_tra.shape[2]): - y = new_tmp_tra[i, :, j] - y_mask = ~np.isnan(y) - if np.sum(y_mask) < min(len(mag_vec), 3): - continue - # TODO once the position errors are calculated and propagated, should be used - p, pcov = np.polyfit(mag_vec[y_mask], y[y_mask], 1, w=np.ones(int(np.sum(y_mask))) / err, cov='unscaled') - tmp_slope[i, j], tmp_slope_err[i, j] = p[0], pcov[0, 0] + interface = pySCOrbitInterface(SC=SC, n_turns=2, bba=False, subtract_reference=False) + generator = measure_bba(interface=interface, bpm_name=bpm_name, config=config, + shots_per_orbit=shots_per_orbit, n_corr_steps=n_corr_steps, + bipolar=True, skip_save=True, plane=plane, skip_cycle=True) - slopes = np.full((new_tmp_tra.shape[2]), np.nan) - slopes_err = np.full((new_tmp_tra.shape[2]), np.nan) - for j in range(min(new_tmp_tra.shape[2], num_downstream_bpms)): - y = tmp_slope[:, j] - y_err = tmp_slope_err[:, j] - y_mask = ~np.isnan(y) - if np.sum(y_mask) <= fit_order + 1: - continue - # TODO here do odr as the x values have also measurement errors - p, pcov = np.polyfit(x[y_mask], y[y_mask], fit_order, w=1 / y_err[y_mask], cov='unscaled') - if np.abs(p[0]) < 2 * np.sqrt(pcov[0, 0]): - continue - center[j] = -p[1] / (fit_order * p[0]) # zero-crossing if linear, minimum is quadratic - center_err[j] = np.sqrt(center[j] ** 2 * (pcov[0,0]/p[0]**2 + pcov[1,1]/p[1]**2 - 2 * pcov[0, 1] / p[0] / p[1])) - slopes[j] = p[0] - slopes_err[j] = np.sqrt(pcov[0,0]) + for _, measurement in generator: + pass - return slopes, slopes_err, center, center_err + if plane == 'H': + data = measurement.H_data + else: + data = measurement.V_data -def get_offset(center, center_err, mask): - from pySC.utils import stats try: - offset_change = stats.weighted_mean(center[mask], center_err[mask]) - offset_change_error = stats.weighted_error(center[mask]-offset_change, center_err[mask]) / np.sqrt(stats.effective_sample_size(center[mask], stats.weights_from_errors(center_err[mask]))) - except ZeroDivisionError as exc: + analysis_result = BBAAnalysis.analyze(data) + offset = analysis_result.offset + offset_error = analysis_result.offset_error + except Exception as exc: print(exc) - print('Failed to estimate offset!!') - print(f'Debug info: {center=}, {center_err=}, {mask=}') - print(f'Debug info: {center[mask]=}, {center_err[mask]=}') - offset_change = 0 - offset_change_error = np.nan + logger.warning(f'Failed to compute trajectory BBA for BPM {bpm_name}') + offset, offset_error = 0, np.nan - return offset_change, offset_change_error + return offset, offset_error diff --git a/pySC/tuning/parallel.py b/pySC/tuning/parallel.py index cac62df..bfe0cd0 100644 --- a/pySC/tuning/parallel.py +++ b/pySC/tuning/parallel.py @@ -6,7 +6,7 @@ from typing import TYPE_CHECKING if TYPE_CHECKING: - from ..core.new_simulated_commissioning import SimulatedCommissioning + from ..core.simulated_commissioning import SimulatedCommissioning def get_listener_and_queue(logger): log_queue = Queue() diff --git a/pySC/tuning/pySC_interface.py b/pySC/tuning/pySC_interface.py new file mode 100644 index 0000000..9df4b15 --- /dev/null +++ b/pySC/tuning/pySC_interface.py @@ -0,0 +1,65 @@ +from pydantic import Field +import numpy as np +from typing import TYPE_CHECKING + +from ..apps.interface import AbstractInterface +if TYPE_CHECKING: + from ..core.simulated_commissioning import SimulatedCommissioning + +class pySCOrbitInterface(AbstractInterface): + SC: "SimulatedCommissioning" = Field(repr=False) + use_design: bool = False + bba: bool = True + subtract_reference: bool = True + + def get_orbit(self) -> tuple[np.ndarray, np.ndarray]: + return self.SC.bpm_system.capture_orbit(use_design=self.use_design, bba=self.bba, + subtract_reference=self.subtract_reference) + + def get_ref_orbit(self) -> tuple[np.ndarray, np.ndarray]: + return self.SC.bpm_system.reference_x, self.SC.bpm_system.reference_y + + def get(self, name: str) -> float: + return self.SC.magnet_settings.get(name, use_design=self.use_design) + + def set(self, name: str, value: float): + self.SC.magnet_settings.set(name, value, use_design=self.use_design) + return + + def get_many(self, names: list) -> dict[str, float]: + return self.SC.magnet_settings.get_many(names, use_design=self.use_design) + + def set_many(self, data: dict[str, float]): + self.SC.magnet_settings.set_many(data, use_design=self.use_design) + return + + def get_rf_main_frequency(self) -> float: + if self.use_design: + rf_settings = self.SC.design_rf_settings + else: + rf_settings = self.SC.rf_settings + + return rf_settings.main.frequency + + def set_rf_main_frequency(self, frequency: float): + if self.use_design: + rf_settings = self.SC.design_rf_settings + else: + rf_settings = self.SC.rf_settings + + rf_settings.main.set_frequency(frequency) + return + +class pySCInjectionInterface(pySCOrbitInterface): + SC: "SimulatedCommissioning" = Field(repr=False) + n_turns: int = 1 + + def get_orbit(self) -> tuple[np.ndarray, np.ndarray]: + x,y= self.SC.bpm_system.capture_injection(n_turns=self.n_turns, use_design=self.use_design, + bba=self.bba, subtract_reference=self.subtract_reference) + return x.flatten(order='F'), y.flatten(order='F') + + def get_ref_orbit(self) -> tuple[np.ndarray, np.ndarray]: + x_ref = np.repeat(self.SC.bpm_system.reference_x[:, np.newaxis], self.n_turns, axis=1) + y_ref = np.repeat(self.SC.bpm_system.reference_y[:, np.newaxis], self.n_turns, axis=1) + return x_ref.flatten(order='F'), y_ref.flatten(order='F') diff --git a/pySC/tuning/response_measurements.py b/pySC/tuning/response_measurements.py index f573fee..697cee4 100644 --- a/pySC/tuning/response_measurements.py +++ b/pySC/tuning/response_measurements.py @@ -1,163 +1,86 @@ from typing import Union, Optional, TYPE_CHECKING import numpy as np +import logging -if TYPE_CHECKING: - from ..core.new_simulated_commissioning import SimulatedCommissioning - - -DISABLE_RICH = False - -def no_rich_progress(): - from contextlib import nullcontext - progress = nullcontext() - progress.add_task = lambda *args, **kwargs: 1 - progress.update = lambda *args, **kwargs: None - progress.remove_task = lambda *args, **kwargs: None - return progress - -try: - from rich.progress import Progress, BarColumn, TextColumn, MofNCompleteColumn, TimeRemainingColumn - rich_progress = Progress( - TextColumn("[progress.description]{task.description}"), - BarColumn(), - MofNCompleteColumn(), - TimeRemainingColumn(), - ) -except ModuleNotFoundError: - rich_progress = no_rich_progress() - DISABLE_RICH = True +from .pySC_interface import pySCInjectionInterface, pySCOrbitInterface +from ..apps import measure_ORM, measure_dispersion +if TYPE_CHECKING: + from ..core.simulated_commissioning import SimulatedCommissioning -def response_loop(inputs, inputs_delta, get_output, settings, normalize=True, bipolar=False): - n_inputs = len(inputs) +logger = logging.getLogger(__name__) - if DISABLE_RICH: - progress = no_rich_progress() - else: - progress = rich_progress - - with progress: - task_id = progress.add_task("Measuring RM", start=True, total=n_inputs) - progress.update(task_id, total=n_inputs) - - reference = get_output() - if np.any(np.isnan(reference)): - raise ValueError('Initial output is NaN. Aborting. ') - - RM = np.full((len(reference), n_inputs), np.nan) - - for i, control in enumerate(inputs): - ref_setpoint = settings.get(control) - delta = inputs_delta[i] - if bipolar: - step = delta/2 - settings.set(control, ref_setpoint - step) - reference = get_output() - else: - step = delta - settings.set(control, ref_setpoint + step) - output = get_output() - - RM[:, i] = (output - reference) - if normalize: - RM[:, i] /= delta - settings.set(control, ref_setpoint) - yield RM - progress.update(task_id, completed=i+1, description=f'Measuring response of {control}...') - progress.update(task_id, completed=n_inputs, description='Response measured.') - progress.remove_task(task_id) - - yield RM - -def measure_TrajectoryResponseMatrix(SC: "SimulatedCommissioning", n_turns: int = 1, dkick: Union[float, list] = 1e-5, use_design: bool = False, normalize: bool = True, bipolar: bool = False): - print('Calculating response matrix') +def measure_TrajectoryResponseMatrix(SC: "SimulatedCommissioning", n_turns: int = 1, + dkick: Union[float, list] = 1e-5, use_design: bool = False, + normalize: bool = True, bipolar: bool = False) -> np.ndarray: + logger.info(f'Measuring trajectory response matrix with {n_turns=}.') ### set inputs HCORR = SC.tuning.HCORR VCORR = SC.tuning.VCORR - CORR = HCORR + VCORR - - n_CORR = len(CORR) - - if type(dkick) is float: - kicks = np.ones(n_CORR, dtype=float) * dkick - else: - assert len(dkick) == n_CORR, f'ERROR: wrong length of dkick array provided. expected {n_CORR}, got {len(dkick)}' - kicks = np.array(dkick) + corrector_names = HCORR + VCORR - ### set function that gathers outputs - def get_orbit(): - x,y = SC.bpm_system.capture_injection(n_turns=n_turns, bba=False, subtract_reference=False, use_design=use_design) - return np.concat((x.flatten(order='F'), y.flatten(order='F'))) + interface = pySCInjectionInterface(SC=SC, n_turns=n_turns) + interface.use_design = use_design - ### specify to use "real" lattice or design - magnet_settings = SC.design_magnet_settings if use_design else SC.magnet_settings + generator = measure_ORM(interface=interface, corrector_names=corrector_names, + delta=dkick, bipolar=bipolar, skip_save=True) - ### measure the response matrix - generator = response_loop(inputs=CORR, inputs_delta=kicks, get_output=get_orbit, settings=magnet_settings, normalize=normalize, bipolar=bipolar) - for RM in generator: + for code, measurement in generator: pass - return RM + data = measurement.response_data + if normalize: + matrix = data.matrix + else: + matrix = data.not_normalized_response_matrix + + return matrix -def measure_OrbitResponseMatrix(SC: "SimulatedCommissioning", HCORR: Optional[list] = None, VCORR: Optional[list] = None, dkick: Union[float, list] = 1e-5, use_design: bool = False, normalize: bool = True, bipolar: bool = True): - print('Calculating response matrix') +def measure_OrbitResponseMatrix(SC: "SimulatedCommissioning", HCORR: Optional[list] = None, + VCORR: Optional[list] = None, dkick: Union[float, list] = 1e-5, + use_design: bool = False, normalize: bool = True, bipolar: bool = True) -> np.ndarray: + logger.info('Measuring orbit response matrix.') ### set inputs if HCORR is None: HCORR = SC.tuning.HCORR if VCORR is None: VCORR = SC.tuning.VCORR - CORR = HCORR + VCORR - - n_CORR = len(CORR) - - if type(dkick) is float: - kicks = np.ones(n_CORR, dtype=float) * dkick - else: - assert len(dkick) == n_CORR, f'ERROR: wrong length of dkick array provided. expected {n_CORR}, got {len(dkick)}' - kicks = np.array(dkick) + corrector_names = HCORR + VCORR - ### set function that gathers outputs - def get_orbit(): - x,y = SC.bpm_system.capture_orbit(bba=False, subtract_reference=False, use_design=use_design) - return np.concat((x.flatten(order='F'), y.flatten(order='F'))) + interface = pySCOrbitInterface(SC=SC) + interface.use_design = use_design - ### specify to use "real" lattice or design - magnet_settings = SC.design_magnet_settings if use_design else SC.magnet_settings + generator = measure_ORM(interface=interface, corrector_names=corrector_names, + delta=dkick, bipolar=bipolar, skip_save=True) - ### measure the response matrix - generator = response_loop(inputs=CORR, inputs_delta=kicks, get_output=get_orbit, settings=magnet_settings, normalize=normalize, bipolar=bipolar) - for RM in generator: + for code, measurement in generator: pass - return RM + data = measurement.response_data + if normalize: + matrix = data.matrix + else: + matrix = data.not_normalized_response_matrix + + return matrix -def measure_RFFrequencyOrbitResponse(SC: "SimulatedCommissioning", delta_frf : float = 20, rf_system_name: str = 'main', use_design: bool = False, normalize: bool = True, bipolar: bool = False): +def measure_RFFrequencyOrbitResponse(SC: "SimulatedCommissioning", delta_frf : float = 20, use_design: bool = False, + normalize: bool = True, bipolar: bool = False) -> np.ndarray: + logger.info('Measuring orbit response to RF frequency (dispersion).') + interface = pySCOrbitInterface(SC=SC) + interface.use_design = use_design - rf_settings = SC.design_rf_settings if use_design else SC.rf_settings - rf_system = rf_settings.systems[rf_system_name] + generator = measure_dispersion(interface=interface, delta=delta_frf, bipolar=bipolar, skip_save=True) - ### function that gathers outputs - def get_orbit(): - x,y = SC.bpm_system.capture_orbit(bba=False, subtract_reference=False, use_design=use_design) - return np.concat((x.flatten(order='F'), y.flatten(order='F'))) + for code, measurement in generator: + pass - frf = rf_system.frequency - if bipolar: - step = delta_frf / 2 - rf_system.set_frequency(frf - step) - xy0 = get_orbit() - else: - step = delta_frf - xy0 = get_orbit() - rf_system.set_frequency(frf + step) - xy1 = get_orbit() - rf_system.set_frequency(frf) - response = (xy1 - xy0) + data = measurement.dispersion_data if normalize: - response /= delta_frf + response = np.concatenate(data.frequency_response) + else: + response = np.concatenate(data.not_normalized_frequency_response) return response - - diff --git a/pySC/tuning/rf_tuning.py b/pySC/tuning/rf_tuning.py index e52d55d..60a8c60 100644 --- a/pySC/tuning/rf_tuning.py +++ b/pySC/tuning/rf_tuning.py @@ -1,10 +1,8 @@ -from typing import Dict, Optional, Union, TYPE_CHECKING -from pydantic import BaseModel, PrivateAttr, ConfigDict +from typing import Optional, TYPE_CHECKING +from pydantic import BaseModel, PrivateAttr import numpy as np import logging -from ..core.numpy_type import NPARRAY - if TYPE_CHECKING: from .tuning_core import Tuning diff --git a/pySC/tuning/tools.py b/pySC/tuning/tools.py index 59a6a15..c7aa3f7 100644 --- a/pySC/tuning/tools.py +++ b/pySC/tuning/tools.py @@ -1,22 +1,9 @@ -import numpy as np import logging from typing import Callable -from enum import IntEnum - +from ..apps.tools import get_average_orbit as app_get_average_orbit logger = logging.getLogger(__name__) def get_average_orbit(get_orbit: Callable, n_orbits: int = 10): - orbit_x, orbit_y = get_orbit() - all_orbit_x = np.zeros((len(orbit_x), n_orbits)) - all_orbit_y = np.zeros((len(orbit_y), n_orbits)) - - all_orbit_x[:,0] = orbit_x - all_orbit_y[:,0] = orbit_y - for ii in range(1, n_orbits): - all_orbit_x[:, ii], all_orbit_y[:, ii] = get_orbit() - - mean_orbit_x = np.mean(all_orbit_x, axis=1) - mean_orbit_y = np.mean(all_orbit_y, axis=1) - std_orbit_x = np.std(all_orbit_x, axis=1) - std_orbit_y = np.std(all_orbit_y, axis=1) - return mean_orbit_x, mean_orbit_y, std_orbit_x, std_orbit_y + logger.warning('Please stop using "get_average_orbit" from pySC.tuning.tools. ' + 'Import it from pySC.apps.tools instead!') + return app_get_average_orbit(get_orbit=get_orbit, n_orbits=n_orbits) \ No newline at end of file diff --git a/pySC/tuning/trajectory_bba.py b/pySC/tuning/trajectory_bba.py index b4178dd..f55c4f6 100644 --- a/pySC/tuning/trajectory_bba.py +++ b/pySC/tuning/trajectory_bba.py @@ -3,16 +3,14 @@ import numpy as np import logging from ..core.control import IndivControl +from ..apps import measure_bba +from ..apps.bba import BBAAnalysis +from .pySC_interface import pySCInjectionInterface logger = logging.getLogger(__name__) if TYPE_CHECKING: - from ..core.new_simulated_commissioning import SimulatedCommissioning - -BPM_OUTLIER = 6 # number of sigma -SLOPE_FACTOR = 0.10 # of max slope -CENTER_OUTLIER = 1 # number of sigma - + from ..core.simulated_commissioning import SimulatedCommissioning def get_mag_s_pos(SC: "SimulatedCommissioning", MAG: list[str]): s_list = [] @@ -37,7 +35,7 @@ def generate_config(cls, SC: "SimulatedCommissioning", max_dx_at_bpm = 1e-3, # max_modulation=600e-6, max_dx_at_bpm=1.5e-3 config = {} - n_turns = 2 + n_turns = 1 RM_name = f'trajectory{n_turns}' SC.tuning.fetch_response_matrix(RM_name, orbit=False, n_turns=n_turns) RM = SC.tuning.response_matrix[RM_name] @@ -45,9 +43,11 @@ def generate_config(cls, SC: "SimulatedCommissioning", max_dx_at_bpm = 1e-3, bba_magnets = SC.tuning.bba_magnets bba_magnets_s = get_mag_s_pos(SC, bba_magnets) - d1, d2 = RM.matrix.shape - HRM = RM.matrix[:d1//2, :d2//2] - VRM = RM.matrix[d1//2:, d2//2:] + #d1, d2 = RM.RM.shape + nh = len(SC.tuning.HCORR) + nbpm = len(SC.bpm_system.indices) + HRM = RM.matrix[:nbpm, :nh] + VRM = RM.matrix[nbpm:, nh:] for bpm_number in range(len(SC.bpm_system.indices)): bpm_index = SC.bpm_system.indices[bpm_number] @@ -69,19 +69,22 @@ def generate_config(cls, SC: "SimulatedCommissioning", max_dx_at_bpm = 1e-3, max_H_response = -1 the_HCORR_number = -1 for nn in HCORR_numbers: - response = np.abs(HRM[bpm_number]) - imax = np.argmax(response) - if response[imax] > max_H_response: - max_H_response = float(response[imax]) - the_HCORR_number = int(imax) - hcorr_delta = max_dx_at_bpm/max_H_response + response = np.abs(HRM[bpm_number, nn]) + if response > max_H_response: + max_H_response = float(response) + the_HCORR_number = int(nn) + if max_H_response <= 0: + logger.warning(f'WARNING: zero H response for BPM {SC.bpm_system.names[bpm_number]}!') + hcorr_delta = 0 + else: + hcorr_delta = max_dx_at_bpm/max_H_response if the_bba_magnet.split('/')[-1][0] == 'B': temp_RM = HRM[bpm_number:bpm_number+n_downstream_bpms, the_HCORR_number] else: # it is a skew quadrupole component ## TODO: this is wrong if hcorr and vcorr are not the same magnets!! temp_RM = VRM[bpm_number:bpm_number+n_downstream_bpms, the_HCORR_number] - + max_response = float(np.max(np.abs(temp_RM))) if max_response < 1e-10: logger.warning(f'WARNING: very small response for BPM {SC.bpm_system.names[bpm_number]} from magnet {the_bba_magnet} and HCORR {SC.tuning.HCORR[the_HCORR_number]}') @@ -93,12 +96,15 @@ def generate_config(cls, SC: "SimulatedCommissioning", max_dx_at_bpm = 1e-3, max_V_response = -1 the_VCORR_number = -1 for nn in VCORR_numbers: - response = np.abs(VRM[bpm_number]) - imax = np.argmax(response) - if response[imax] > max_V_response: - max_V_response = float(response[imax]) - the_VCORR_number = int(imax) - vcorr_delta = max_dx_at_bpm/max_V_response + response = np.abs(VRM[bpm_number, nn]) + if response > max_V_response: + max_V_response = float(response) + the_VCORR_number = int(nn) + if max_V_response <= 0: + logger.warning(f'WARNING: zero V response for BPM {SC.bpm_system.names[bpm_number]}!') + vcorr_delta = 0 + else: + vcorr_delta = max_dx_at_bpm/max_V_response if the_bba_magnet.split('/')[-1][0] == 'B': temp_RM = VRM[bpm_number:bpm_number+n_downstream_bpms, the_VCORR_number] @@ -143,174 +149,26 @@ def trajectory_bba(SC: "SimulatedCommissioning", bpm_name: str, n_corr_steps: in SC.tuning.generate_trajectory_bba_config() config = SC.tuning.trajectory_bba_config.config[bpm_name] - corr = config[f'{plane}CORR'] - quad = config['QUAD'] - bpm_number = config['number'] - corr_delta_sp = config[f'{plane}CORR_delta'] - quad_delta = config[f'QUAD_dk_{plane}'] - - n1 = bpm_number + 1 - n2 = bpm_number + 1 + n_downstream_bpms - - ## define get_orbit - def get_orbit(): - x, y = SC.bpm_system.capture_injection(n_turns=2, bba=False, subtract_reference=False, use_design=False) - x = x / shots_per_trajectory - y = y / shots_per_trajectory - for i in range(shots_per_trajectory-1): - x_tmp, y_tmp = SC.bpm_system.capture_injection(n_turns=2, bba=False, subtract_reference=False, use_design=False) - x = x + x_tmp / shots_per_trajectory - y = y + y_tmp / shots_per_trajectory - - return (x.flatten(order='F'), y.flatten(order='F')) - - - ## define settings to get/set - settings = SC.magnet_settings - - bpm_pos = np.zeros([n_corr_steps, 2]) - orbits = np.zeros([n_corr_steps, 2, n_downstream_bpms]) - - corr_sp0 = settings.get(corr) - quad_sp0 = settings.get(quad) - - corr_sp_array = np.linspace(-corr_delta_sp, corr_delta_sp, n_corr_steps) + corr_sp0 - for i_corr, corr_sp in enumerate(corr_sp_array): - settings.set(corr, corr_sp) - trajectory_x_center, trajectory_y_center = get_orbit() - - settings.set(quad, quad_sp0 + quad_delta) - trajectory_x_up, trajectory_y_up = get_orbit() - - settings.set(quad, quad_sp0 - quad_delta) - trajectory_x_down, trajectory_y_down = get_orbit() - - settings.set(quad, quad_sp0) - - if plane == 'H': - trajectory_main_down = trajectory_x_down - trajectory_main_up = trajectory_x_up - trajectory_main_center = trajectory_x_center - trajectory_other_down = trajectory_y_down - trajectory_other_up = trajectory_y_up - trajectory_other_center = trajectory_y_center - else: - trajectory_main_down = trajectory_y_down - trajectory_main_up = trajectory_y_up - trajectory_main_center = trajectory_y_center - trajectory_other_down = trajectory_x_down - trajectory_other_up = trajectory_x_up - trajectory_other_center = trajectory_x_center - - bpm_pos[i_corr, 0] = trajectory_main_down[bpm_number] - bpm_pos[i_corr, 1] = trajectory_main_up[bpm_number] - if quad.split('/')[-1] == 'B2': - orbits[i_corr, 0, :] = trajectory_main_down[n1:n2] - trajectory_main_center[n1:n2] - orbits[i_corr, 1, :] = trajectory_main_up[n1:n2] - trajectory_main_center[n1:n2] - elif quad.split('/')[-1] == 'A2': ## skew quad - orbits[i_corr, 0, :] = trajectory_other_down[n1:n2] - trajectory_other_center[n1:n2] - orbits[i_corr, 1, :] = trajectory_other_up[n1:n2] - trajectory_other_center[n1:n2] - else: - raise Exception(f'Invalid magnet for BBA: {quad}') - - settings.set(corr, corr_sp0) - settings.set(quad, quad_sp0) - - slopes, slopes_err, center, center_err = get_slopes_center(bpm_pos, orbits, quad_delta) - mask_bpm_outlier = reject_bpm_outlier(orbits) - mask_slopes = reject_slopes(slopes) - mask_center = reject_center_outlier(center) - final_mask = np.logical_and(np.logical_and(mask_bpm_outlier, mask_slopes), mask_center) - - offset, offset_err = get_offset(center, center_err, final_mask) - - return offset, offset_err - -def reject_bpm_outlier(orbits): - n_k1 = orbits.shape[1] - n_bpms = orbits.shape[2] - mask = np.ones(n_bpms, dtype=bool) - for k1_step in range(n_k1): - for bpm in range(n_bpms): - data = orbits[:, k1_step, bpm] - if np.any(data - np.mean(data) > BPM_OUTLIER * np.std(data)): - mask[bpm] = False - - # n_rejections = n_bpms - np.sum(mask) - # print(f"Rejected {n_rejections}/{n_bpms} bpms for bpm outliers ( > {BPM_OUTLIER} r.m.s. )") - return mask - -def reject_slopes(slopes): - max_slope = np.nanmax(np.abs(slopes)) - mask = np.abs(slopes) > SLOPE_FACTOR * max_slope - - # n_rejections = len(slopes) - np.sum(mask) - # print(f"Rejected {n_rejections}/{len(slopes)} bpms for small slope ( < {SLOPE_FACTOR} * max(slope) )") - return mask - -def reject_center_outlier(center): - mean = np.nanmean(center) - std = np.nanstd(center) - mask = abs(center - mean) < CENTER_OUTLIER * std - - # n_rejections = len(center) - np.sum(mask) - # print(f"Rejected {n_rejections}/{len(center)} bpms for center away from mean ( > {CENTER_OUTLIER} r.m.s. )") - return mask - -def get_slopes_center(bpm_pos, orbits, dk1): - mag_vec = np.array([dk1, -dk1]) - num_downstream_bpms = orbits.shape[2] - fit_order = 1 - x = np.mean(bpm_pos, axis=1) - x_mask = ~np.isnan(x) - err = np.mean(np.std(bpm_pos[x_mask, :], axis=1)) - x = x[x_mask] - new_tmp_tra = orbits[x_mask, :, :] - - tmp_slope = np.full((new_tmp_tra.shape[0], new_tmp_tra.shape[2]), np.nan) - tmp_slope_err = np.full((new_tmp_tra.shape[0], new_tmp_tra.shape[2]), np.nan) - center = np.full((new_tmp_tra.shape[2]), np.nan) - center_err = np.full((new_tmp_tra.shape[2]), np.nan) - for i in range(new_tmp_tra.shape[0]): - for j in range(new_tmp_tra.shape[2]): - y = new_tmp_tra[i, :, j] - y_mask = ~np.isnan(y) - if np.sum(y_mask) < min(len(mag_vec), 3): - continue - # TODO once the position errors are calculated and propagated, should be used - p, pcov = np.polyfit(mag_vec[y_mask], y[y_mask], 1, w=np.ones(int(np.sum(y_mask))) / err, cov='unscaled') - tmp_slope[i, j], tmp_slope_err[i, j] = p[0], pcov[0, 0] + interface = pySCInjectionInterface(SC=SC, n_turns=2, bba=False, subtract_reference=False) + generator = measure_bba(interface=interface, bpm_name=bpm_name, config=config, + shots_per_orbit=shots_per_trajectory, n_corr_steps=n_corr_steps, + bipolar=True, skip_save=True, plane=plane, skip_cycle=True) - slopes = np.full((new_tmp_tra.shape[2]), np.nan) - slopes_err = np.full((new_tmp_tra.shape[2]), np.nan) - for j in range(min(new_tmp_tra.shape[2], num_downstream_bpms)): - y = tmp_slope[:, j] - y_err = tmp_slope_err[:, j] - y_mask = ~np.isnan(y) - if np.sum(y_mask) <= fit_order + 1: - continue - # TODO here do odr as the x values have also measurement errors - p, pcov = np.polyfit(x[y_mask], y[y_mask], fit_order, w=1 / y_err[y_mask], cov='unscaled') - if np.abs(p[0]) < 2 * np.sqrt(pcov[0, 0]): - continue - center[j] = -p[1] / (fit_order * p[0]) # zero-crossing if linear, minimum is quadratic - center_err[j] = np.sqrt(center[j] ** 2 * (pcov[0,0]/p[0]**2 + pcov[1,1]/p[1]**2 - 2 * pcov[0, 1] / p[0] / p[1])) - slopes[j] = p[0] - slopes_err[j] = np.sqrt(pcov[0,0]) + for _, measurement in generator: + pass - return slopes, slopes_err, center, center_err + if plane == 'H': + data = measurement.H_data + else: + data = measurement.V_data -def get_offset(center, center_err, mask): - from pySC.utils import stats try: - offset_change = stats.weighted_mean(center[mask], center_err[mask]) - offset_change_error = stats.weighted_error(center[mask]-offset_change, center_err[mask]) / np.sqrt(stats.effective_sample_size(center[mask], stats.weights_from_errors(center_err[mask]))) - except ZeroDivisionError as exc: + analysis_result = BBAAnalysis.analyze(data, n_downstream=n_downstream_bpms) + offset = analysis_result.offset + offset_error = analysis_result.offset_error + except Exception as exc: print(exc) - print('Failed to estimate offset!!') - print(f'Debug info: {center=}, {center_err=}, {mask=}') - print(f'Debug info: {center[mask]=}, {center_err[mask]=}') - offset_change = 0 - offset_change_error = np.nan + logger.warning(f'Failed to compute trajectory BBA for BPM {bpm_name}') + offset, offset_error = 0, np.nan - return offset_change, offset_change_error + return offset, offset_error diff --git a/pySC/tuning/tune.py b/pySC/tuning/tune.py index a2196fd..ecc78b9 100644 --- a/pySC/tuning/tune.py +++ b/pySC/tuning/tune.py @@ -1,10 +1,12 @@ -from typing import Dict, Optional, Union, TYPE_CHECKING +from typing import Optional, TYPE_CHECKING from pydantic import BaseModel, PrivateAttr, ConfigDict import numpy as np import logging import scipy.optimize -from ..core.numpy_type import NPARRAY +from ..core.control import KnobControl, KnobData +from ..core.types import NPARRAY +from ..apps.response_matrix import ResponseMatrix if TYPE_CHECKING: from .tuning_core import Tuning @@ -12,22 +14,36 @@ logger = logging.getLogger(__name__) class Tune(BaseModel, extra="forbid"): - tune_quad_controls_1: list[str] = [] - tune_quad_controls_2: list[str] = [] + knob_qx: str = 'qx_trim' + knob_qy: str = 'qy_trim' + controls_1: list[str] = [] + controls_2: list[str] = [] tune_response_matrix: Optional[NPARRAY] = None inverse_tune_response_matrix: Optional[NPARRAY] = None _parent: Optional['Tuning'] = PrivateAttr(default=None) model_config = ConfigDict(arbitrary_types_allowed=True) + @property + def controls(self) -> list[str]: + return self.controls_1 + self.controls_2 + @property def design_qx(self): return np.mod(self._parent._parent.lattice.twiss['qx'], 1) + @property + def integer_qx(self): + return np.floor(self._parent._parent.lattice.twiss['qx']) + @property def design_qy(self): return np.mod(self._parent._parent.lattice.twiss['qy'], 1) + @property + def integer_qy(self): + return np.floor(self._parent._parent.lattice.twiss['qy']) + def tune_response(self, quads: list[str], dk: float = 1e-5): SC = self._parent._parent twiss = SC.lattice.get_twiss(use_design=True) @@ -47,34 +63,62 @@ def tune_response(self, quads: list[str], dk: float = 1e-5): return dq1, dq2 def build_tune_response_matrix(self, dk: float = 1e-5) -> None: - if not len(self.tune_quad_controls_1) > 0: + if not len(self.controls_1) > 0: raise Exception('tune_quad_controls_1 is empty. Please set.') - if not len(self.tune_quad_controls_2) > 0: + if not len(self.controls_2) > 0: raise Exception('tune_quad_controls_2 is empty. Please set.') TRM = np.zeros((2,2)) - TRM[:, 0] = self.tune_response(self.tune_quad_controls_1, dk=dk) - TRM[:, 1] = self.tune_response(self.tune_quad_controls_2, dk=dk) - iTRM = np.linalg.inv(TRM) + TRM[:, 0] = self.tune_response(self.controls_1, dk=dk) + TRM[:, 1] = self.tune_response(self.controls_2, dk=dk) + return TRM - self.tune_response_matrix = TRM - self.inverse_tune_response_matrix = iTRM - return + def create_tune_knobs(self, delta: float = 1e-5) -> None: + if not len(self.controls) > 0: + raise Exception('tune.controls_1/tune.controls_2 are empty. Please set.') + + matrix = self.build_tune_response_matrix(dk=delta) + + tune_response_matrix = ResponseMatrix(matrix=matrix) + inverse_matrix = tune_response_matrix.build_pseudoinverse().matrix + + dk1_qx, dk2_qx = inverse_matrix[:,0] + dk1_qy, dk2_qy = inverse_matrix[:,1] + + n1 = len(self.controls_1) + n2 = len(self.controls_2) + qx_weights = [float(dk1_qx)] * n1 + [float(dk2_qx)] * n2 + qy_weights = [float(dk1_qy)] * n1 + [float(dk2_qy)] * n2 + + knob_data = KnobData(data={ + self.knob_qx: KnobControl(control_names=self.controls, weights=qx_weights), + self.knob_qy: KnobControl(control_names=self.controls, weights=qy_weights) + }) + + logger.info(f"{self.knob_qx}: sum(|weights|)={np.sum(np.abs(qx_weights)):.2e}") + logger.info(f"{self.knob_qy}: sum(|weights|)={np.sum(np.abs(qy_weights)):.2e}") + return knob_data def trim_tune(self, dqx: float = 0, dqy: float = 0, use_design: bool = False) -> None: + logger.warning('Deprecation: please use .trim instead of .trim_tune.') + return self.trim(dqx=dqx, dqy=dqy, use_design=use_design) + + def trim(self, dqx: float = 0, dqy: float = 0, use_design: bool = False) -> None: SC = self._parent._parent - if self.inverse_tune_response_matrix is None: - logger.info('Did not find inverse tune response matrix. Building now.') - self.build_tune_response_matrix() - - dk1, dk2 = np.dot(self.inverse_tune_response_matrix, [dqx, dqy]) - ref_data1 = SC.magnet_settings.get_many(self.tune_quad_controls_1, use_design=use_design) - ref_data2 = SC.magnet_settings.get_many(self.tune_quad_controls_2, use_design=use_design) - data1 = {key: ref_data1[key] + dk1 for key in ref_data1.keys()} - data2 = {key: ref_data2[key] + dk2 for key in ref_data2.keys()} - - SC.magnet_settings.set_many(data1, use_design=use_design) - SC.magnet_settings.set_many(data2, use_design=use_design) + + if use_design: + assert self.knob_qx in SC.design_magnet_settings.controls.keys() + assert self.knob_qy in SC.design_magnet_settings.controls.keys() + else: + assert self.knob_qx in SC.magnet_settings.controls.keys() + assert self.knob_qy in SC.magnet_settings.controls.keys() + + dqx0 = SC.magnet_settings.get(self.knob_qx, use_design=use_design) + dqy0 = SC.magnet_settings.get(self.knob_qy, use_design=use_design) + + SC.magnet_settings.set(self.knob_qx, dqx0 + dqx, use_design=use_design) + SC.magnet_settings.set(self.knob_qy, dqy0 + dqy, use_design=use_design) + return def measure_with_kick(self, kick_px=10e-6, kick_py=10e-6, n_turns=100): @@ -117,18 +161,23 @@ def correct(self, target_qx: Optional[float] = None, target_qy: Optional[float] gain : float, optional Gain for the correction. Default is 1. measurement_method : str, optional - Method to measure the tune. Options are 'kick', 'first_turn', 'orbit', 'cheat', 'cheat4d'. + Method to measure the tune. Options are 'kick', 'first_turn', 'orbit', + 'cheat', 'cheat4d', 'cheat_with_integer'. Default is 'kick'. ''' - if measurement_method not in ['kick', 'first_turn', 'orbit', 'cheat', 'cheat4d']: + if measurement_method not in ['kick', 'first_turn', 'orbit', 'cheat', 'cheat4d', 'cheat_with_integer']: raise NotImplementedError(f'{measurement_method=} not implemented yet.') if target_qx is None: target_qx = self.design_qx + if measurement_method == 'cheat_with_integer': + target_qx += self.integer_qx if target_qy is None: target_qy = self.design_qy + if measurement_method == 'cheat_with_integer': + target_qy += self.integer_qy for _ in range(n_iter): if measurement_method == 'kick': @@ -141,6 +190,8 @@ def correct(self, target_qx: Optional[float] = None, target_qy: Optional[float] qx, qy = self.cheat() elif measurement_method == 'cheat4d': qx, qy = self.cheat4d() + elif measurement_method == 'cheat_with_integer': + qx, qy = self.cheat_with_integer() else: raise Exception(f'Unknown measurement_method {measurement_method}') if qx is None or qy is None or qx != qx or qy != qy: @@ -150,7 +201,7 @@ def correct(self, target_qx: Optional[float] = None, target_qy: Optional[float] dqx = qx - target_qx dqy = qy - target_qy logger.info(f"Delta tune: delta_q1={dqx:.4f}, delta_q2={dqy:.4f}") - self.trim_tune(dqx=-gain*dqx, dqy=-gain*dqy) + self.trim(dqx=-gain*dqx, dqy=-gain*dqy) return def get_design_corrector_response_injection(self, corr: str, dk0: float = 1e-6): @@ -206,15 +257,15 @@ def get_average_xy(SC, n=10): ### do fit based on knobs def x_chi2(delta): - SC.tuning.tune.trim_tune(delta, 0, use_design=True) + SC.tuning.tune.trim(delta, 0, use_design=True) dx_ideal, _ = self.get_design_corrector_response_injection(hcorr) - SC.tuning.tune.trim_tune(-delta, 0, use_design=True) + SC.tuning.tune.trim(-delta, 0, use_design=True) return np.sum((dx0 - dx_ideal)**2) def y_chi2(delta): - SC.tuning.tune.trim_tune(0, delta, use_design=True) + SC.tuning.tune.trim(0, delta, use_design=True) _, dy_ideal = self.get_design_corrector_response_injection(vcorr) - SC.tuning.tune.trim_tune(0, -delta, use_design=True) + SC.tuning.tune.trim(0, -delta, use_design=True) return np.sum((dy0 - dy_ideal)**2) x_res = scipy.optimize.minimize_scalar(x_chi2, (-0.1, 0.1), method='Brent') @@ -287,15 +338,15 @@ def get_average_xy(SC, n=10): ### do fit based on knobs def x_chi2(delta): - SC.tuning.tune.trim_tune(delta, 0, use_design=True) + SC.tuning.tune.trim(delta, 0, use_design=True) dx_ideal, _ = self.get_design_corrector_response_orbit(hcorr, dk0=dk0) - SC.tuning.tune.trim_tune(-delta, 0, use_design=True) + SC.tuning.tune.trim(-delta, 0, use_design=True) return np.sum((dx0 - dx_ideal)**2) def y_chi2(delta): - SC.tuning.tune.trim_tune(0, delta, use_design=True) + SC.tuning.tune.trim(0, delta, use_design=True) _, dy_ideal = self.get_design_corrector_response_orbit(vcorr, dk0=dk0) - SC.tuning.tune.trim_tune(0, -delta, use_design=True) + SC.tuning.tune.trim(0, -delta, use_design=True) return np.sum((dy0 - dy_ideal)**2) x_res = scipy.optimize.minimize_scalar(x_chi2, (-0.1, 0.1), method='Brent') @@ -328,4 +379,16 @@ def cheat(self) -> tuple[float, float]: logger.info(f"Horizontal tune: Qx = {qx:.3f} (Δ = {delta_x:.3f})") logger.info(f"Vertical tune: Qy = {qy:.3f} (Δ = {delta_y:.3f})") + return qx, qy + + def cheat_with_integer(self) -> tuple[float, float]: + SC = self._parent._parent + twiss = SC.lattice.get_twiss() + qx = twiss['qx'] + qy = twiss['qy'] + delta_x = qx - self.design_qx - self.integer_qx + delta_y = qy - self.design_qy - self.integer_qy + logger.info(f"Horizontal tune: Qx = {qx:.3f} (Δ = {delta_x:.3f})") + logger.info(f"Vertical tune: Qy = {qy:.3f} (Δ = {delta_y:.3f})") + return qx, qy \ No newline at end of file diff --git a/pySC/tuning/tuning_core.py b/pySC/tuning/tuning_core.py index ced9f2c..89b6452 100644 --- a/pySC/tuning/tuning_core.py +++ b/pySC/tuning/tuning_core.py @@ -1,22 +1,25 @@ from pydantic import BaseModel, PrivateAttr from typing import Optional, Union, TYPE_CHECKING -from .response_matrix import ResponseMatrix +from ..apps.response_matrix import ResponseMatrix from .response_measurements import measure_TrajectoryResponseMatrix, measure_OrbitResponseMatrix, measure_RFFrequencyOrbitResponse from .trajectory_bba import Trajectory_BBA_Configuration, trajectory_bba, get_mag_s_pos from .orbit_bba import Orbit_BBA_Configuration, orbit_bba from .parallel import parallel_tbba_target, parallel_obba_target, get_listener_and_queue from .tune import Tune +from .chromaticity import Chromaticity +from .c_minus import CMinus from .rf_tuning import RF_tuning from ..core.control import IndivControl +from .pySC_interface import pySCInjectionInterface, pySCOrbitInterface +from ..apps import orbit_correction import numpy as np from pathlib import Path -import json import logging from multiprocessing import Process, Queue if TYPE_CHECKING: - from ..core.new_simulated_commissioning import SimulatedCommissioning + from ..core.simulated_commissioning import SimulatedCommissioning logger = logging.getLogger(__name__) @@ -27,6 +30,8 @@ class Tuning(BaseModel, extra="forbid"): multipoles: list[str] = [] tune: Tune = Tune() ## TODO: generate config from yaml file + chromaticity: Chromaticity = Chromaticity() ## TODO: generate config from yaml file + c_minus: CMinus = CMinus() rf: RF_tuning = RF_tuning() ## TODO: generate config from yaml file bba_magnets: list[str] = [] @@ -80,8 +85,8 @@ def get_inputs_plane(self, control_names): def calculate_model_trajectory_response_matrix(self, n_turns=1, dkick=1e-5, save_as: str = None): # assumes all bpms are dual plane - SC = self._parent RM_name = f'trajectory{n_turns}' + SC = self._parent input_names = SC.tuning.CORR output_names = SC.bpm_system.names * n_turns * 2 # two: one per plane and per turn matrix = measure_TrajectoryResponseMatrix(SC, n_turns=n_turns, dkick=dkick, use_design=True) @@ -94,8 +99,8 @@ def calculate_model_trajectory_response_matrix(self, n_turns=1, dkick=1e-5, save return def calculate_model_orbit_response_matrix(self, dkick=1e-5, save_as: str = None): - SC = self._parent RM_name = 'orbit' + SC = self._parent input_names = SC.tuning.CORR output_names = SC.bpm_system.names * 2 # two: one per plane matrix = measure_OrbitResponseMatrix(SC, dkick=dkick, use_design=True) @@ -115,68 +120,62 @@ def bad_outputs_from_bad_bpms(self, bad_bpms: list[int], n_turns: int = 1) -> li bad_outputs.append(bpm + turn * n_bpms + plane * n_turns * n_bpms) return bad_outputs + def wiggle_last_corrector(self, max_steps: int = 100, max_sp: float = 500e-6) -> None: + SC = self._parent + def first_turn_transmission(SC): + x, _ = SC.bpm_system.capture_injection() + bad_readings = sum(np.isnan(x)) + good_frac = (len(x) - bad_readings) / len(SC.bpm_system.indices) + last_good_bpm = np.where(~np.isnan(x))[0][-1] + last_good_bpm_index = SC.bpm_system.indices[last_good_bpm] + return good_frac, last_good_bpm_index + + initial_transmission, last_good_bpm_index = first_turn_transmission(SC) + if initial_transmission < 1.: + for corr in SC.tuning.HCORR: + hcor_name = corr.split('/')[0] + hcor_index = SC.magnet_settings.magnets[hcor_name].sim_index + if hcor_index < last_good_bpm_index: + last_hcor = corr + for corr in SC.tuning.VCORR: + vcor_name = corr.split('/')[0] + vcor_index = SC.magnet_settings.magnets[vcor_name].sim_index + if vcor_index < last_good_bpm_index: + last_vcor = corr + + for _ in range(max_steps): + SC.magnet_settings.set(last_hcor, SC.rng.uniform(-max_sp, max_sp)) + SC.magnet_settings.set(last_vcor, SC.rng.uniform(-max_sp, max_sp)) + transmission, _ = first_turn_transmission(SC) + if transmission > initial_transmission: + logger.info(f"Wiggling improved first-turn transmission from {initial_transmission} to {transmission}.") + return + logger.info("Wiggling failed. Reached maximum number of steps.") + else: + logger.info("No need to wiggle, full transmission through first-turn.") + + return + def correct_injection(self, n_turns=1, n_reps=1, method='tikhonov', parameter=100, gain=1, correct_to_first_turn=False, zerosum=False): RM_name = f'trajectory{n_turns}' self.fetch_response_matrix(RM_name, orbit=False, n_turns=n_turns) - RM = self.response_matrix[RM_name] - RM.bad_outputs = self.bad_outputs_from_bad_bpms(self.bad_bpms, n_turns=n_turns) - - for _ in range(n_reps): - trajectory_x, trajectory_y = self._parent.bpm_system.capture_injection(n_turns=n_turns) - trajectory = np.concat((trajectory_x.flatten(order='F'), trajectory_y.flatten(order='F'))) - - if correct_to_first_turn: - reference = np.zeros_like(trajectory) - n_per_turn = len(trajectory) // n_turns - for iturn in range(1, n_turns): - reference[iturn*n_per_turn:(iturn+1)*n_per_turn] = trajectory[0:n_per_turn] - else: - reference = np.zeros_like(trajectory) - - trims = RM.solve(trajectory - reference, method=method, parameter=parameter, zerosum=zerosum) - - settings = self._parent.magnet_settings - for control_name, trim in zip(self.CORR, trims): - setpoint = settings.get(control_name=control_name) - gain * trim - settings.set(control_name=control_name, setpoint=setpoint) - - trajectory_x, trajectory_y = self._parent.bpm_system.capture_injection(n_turns=n_turns) - trajectory_x = trajectory_x.flatten('F') - trajectory_y = trajectory_y.flatten('F') - rms_x = np.nanstd(trajectory_x) * 1e6 - rms_y = np.nanstd(trajectory_y) * 1e6 - bad_readings = sum(np.isnan(trajectory_x)) - good_turns = (len(trajectory_x) - bad_readings) / len(self._parent.bpm_system.indices) - logger.info(f'Corrected injection: transmission through {good_turns:.2f}/{n_turns} turns, {rms_x=:.1f} um, {rms_y=:.1f} um.') + response_matrix = self.response_matrix[RM_name] + response_matrix.bad_outputs = self.bad_outputs_from_bad_bpms(self.bad_bpms, n_turns=n_turns) - return - - def correct_pseudo_orbit_at_injection(self, n_turns=1, n_reps=1, method='tikhonov', parameter=100, gain=1, zerosum=False): - RM_name = 'orbit' - self.fetch_response_matrix(RM_name, orbit=True) - RM = self.response_matrix[RM_name] - RM.bad_outputs = self.bad_outputs_from_bad_bpms(self.bad_bpms) + SC = self._parent + interface = pySCInjectionInterface(SC=SC, n_turns=n_turns) for _ in range(n_reps): - trajectory_x, trajectory_y = self._parent.bpm_system.capture_injection(n_turns=n_turns) - pseudo_orbit_x = np.nanmean(trajectory_x, axis=1) - pseudo_orbit_y = np.nanmean(trajectory_y, axis=1) - pseudo_orbit = np.concat((pseudo_orbit_x, pseudo_orbit_y)) - - trims = RM.solve(pseudo_orbit, method=method, parameter=parameter, zerosum=zerosum) + _ = orbit_correction(interface=interface, response_matrix=response_matrix, reference=None, + method=method, parameter=parameter, gain=gain, apply=True) - settings = self._parent.magnet_settings - for control_name, trim in zip(self.CORR, trims): - setpoint = settings.get(control_name=control_name) - gain * trim - settings.set(control_name=control_name, setpoint=setpoint) - - trajectory_x, trajectory_y = self._parent.bpm_system.capture_injection(n_turns=n_turns) + trajectory_x, trajectory_y = SC.bpm_system.capture_injection(n_turns=n_turns) trajectory_x = trajectory_x.flatten('F') trajectory_y = trajectory_y.flatten('F') rms_x = np.nanstd(trajectory_x) * 1e6 rms_y = np.nanstd(trajectory_y) * 1e6 bad_readings = sum(np.isnan(trajectory_x)) - good_turns = (len(trajectory_x) - bad_readings) / len(self._parent.bpm_system.indices) + good_turns = (len(trajectory_x) - bad_readings) / len(SC.bpm_system.indices) logger.info(f'Corrected injection: transmission through {good_turns:.2f}/{n_turns} turns, {rms_x=:.1f} um, {rms_y=:.1f} um.') return @@ -184,26 +183,52 @@ def correct_pseudo_orbit_at_injection(self, n_turns=1, n_reps=1, method='tikhono def correct_orbit(self, n_reps=1, method='tikhonov', parameter=100, gain=1, zerosum=False): RM_name = 'orbit' self.fetch_response_matrix(RM_name, orbit=True) - RM = self.response_matrix[RM_name] - RM.bad_outputs = self.bad_outputs_from_bad_bpms(self.bad_bpms) + response_matrix = self.response_matrix[RM_name] + response_matrix.bad_outputs = self.bad_outputs_from_bad_bpms(self.bad_bpms) - for _ in range(n_reps): - orbit_x, orbit_y = self._parent.bpm_system.capture_orbit() - orbit = np.concat((orbit_x.flatten(order='F'), orbit_y.flatten(order='F'))) - - trims = RM.solve(orbit, method=method, parameter=parameter, zerosum=zerosum) + SC = self._parent + interface = pySCOrbitInterface(SC=SC) - settings = self._parent.magnet_settings - for control_name, trim in zip(self.CORR, trims): - setpoint = settings.get(control_name=control_name) - gain * trim - settings.set(control_name=control_name, setpoint=setpoint) + for _ in range(n_reps): + _ = orbit_correction(interface=interface, response_matrix=response_matrix, reference=None, + method=method, parameter=parameter, zerosum=zerosum, gain=gain, apply=True) - orbit_x, orbit_y = self._parent.bpm_system.capture_orbit() + orbit_x, orbit_y = SC.bpm_system.capture_orbit() rms_x = np.nanstd(orbit_x) * 1e6 rms_y = np.nanstd(orbit_y) * 1e6 logger.info(f'Corrected orbit: {rms_x=:.1f} um, {rms_y=:.1f} um.') return + # def correct_pseudo_orbit_at_injection(self, n_turns=1, n_reps=1, method='tikhonov', parameter=100, gain=1, zerosum=False): + # RM_name = 'orbit' + # self.fetch_response_matrix(RM_name, orbit=True) + # RM = self.response_matrix[RM_name] + # RM.bad_outputs = self.bad_outputs_from_bad_bpms(self.bad_bpms) + + # for _ in range(n_reps): + # trajectory_x, trajectory_y = self._parent.bpm_system.capture_injection(n_turns=n_turns) + # pseudo_orbit_x = np.nanmean(trajectory_x, axis=1) + # pseudo_orbit_y = np.nanmean(trajectory_y, axis=1) + # pseudo_orbit = np.concat((pseudo_orbit_x, pseudo_orbit_y)) + + # trims = RM.solve(pseudo_orbit, method=method, parameter=parameter, zerosum=zerosum) + + # settings = self._parent.magnet_settings + # for control_name, trim in zip(self.CORR, trims): + # setpoint = settings.get(control_name=control_name) - gain * trim + # settings.set(control_name=control_name, setpoint=setpoint) + + # trajectory_x, trajectory_y = self._parent.bpm_system.capture_injection(n_turns=n_turns) + # trajectory_x = trajectory_x.flatten('F') + # trajectory_y = trajectory_y.flatten('F') + # rms_x = np.nanstd(trajectory_x) * 1e6 + # rms_y = np.nanstd(trajectory_y) * 1e6 + # bad_readings = sum(np.isnan(trajectory_x)) + # good_turns = (len(trajectory_x) - bad_readings) / len(self._parent.bpm_system.indices) + # logger.info(f'Corrected injection: transmission through {good_turns:.2f}/{n_turns} turns, {rms_x=:.1f} um, {rms_y=:.1f} um.') + + # return + def fit_dispersive_orbit(self): SC = self._parent response = measure_RFFrequencyOrbitResponse(SC=SC, use_design=True) @@ -322,7 +347,7 @@ def do_trajectory_bba(self, bpm_names: Optional[list[str]] = None, shots_per_tra SC.bpm_system.bba_offsets_y[bpm_number] = offsets_y[ii] return offsets_x, offsets_y - def do_orbit_bba(self, bpm_names: Optional[list[str]] = None, shots_per_orbit: int = 1, skip_summary: bool = False, n_corr_steps: int = 7): + def do_orbit_bba(self, bpm_names: Optional[list[str]] = None, shots_per_orbit: int = 1, skip_summary: bool = False, n_corr_steps: int = 5): SC = self._parent if bpm_names is None: bpm_names = SC.bpm_system.names @@ -415,7 +440,7 @@ def do_parallel_trajectory_bba(self, bpm_names: Optional[list[str]] = None, shot SC.bpm_system.bba_offsets_y[bpm_number] = offsets_y[ii] return offsets_x, offsets_y - def do_parallel_orbit_bba(self, bpm_names: Optional[list[str]] = None, shots_per_orbit: int = 1, omp_num_threads: int = 2, n_corr_steps: int = 7): + def do_parallel_orbit_bba(self, bpm_names: Optional[list[str]] = None, shots_per_orbit: int = 1, omp_num_threads: int = 2, n_corr_steps: int = 5): SC = self._parent if bpm_names is None: bpm_names = SC.bpm_system.names diff --git a/pySC/utils/rdt.py b/pySC/utils/rdt.py new file mode 100644 index 0000000..a28a17a --- /dev/null +++ b/pySC/utils/rdt.py @@ -0,0 +1,247 @@ +import numpy as np +from typing import TYPE_CHECKING, Optional + +if TYPE_CHECKING: + from pySC import SimulatedCommissioning + +## FACTORIAL[n] = n! +FACTORIAL = np.array([ +1, +1, +2, +6, +24, +120, +720, +5040, +40320, +362880, +3628800, +39916800, +479001600, +6227020800, +87178291200, +1307674368000, +20922789888000, +355687428096000, +6402373705728000, +121645100408832000, +2432902008176640000, +]) + +S4 = np.array([[0., 1., 0., 0.], + [-1., 0., 0., 0.], + [ 0., 0., 0., 1.], + [ 0., 0.,-1., 0.]]) + +def binomial_coeff(n:int, k: int): + return FACTORIAL[n] / (FACTORIAL[k] * FACTORIAL[n-k]) + +def feeddown(AB: np.ndarray[complex], r0: complex, n: int): + maxN = len(AB) + value = 0j + for k in range(n, maxN): + value += AB[k] * binomial_coeff(k, n) * (r0 ** (k - n)) + return value + +def omega(x): + if x%2: + return 0 + else: + return 1 + +def get_integrated_strengths_with_feeddown(SC: "SimulatedCommissioning", use_design: bool = False): + twiss = SC.lattice.get_twiss(use_design=use_design) + if use_design: + magnet_settings = SC.design_magnet_settings + else: + magnet_settings = SC.magnet_settings + + N = len(twiss['s']) + temp_max_order = 0 + integrated_strengths = {'norm': {0: np.zeros(N)}, + 'skew': {0: np.zeros(N)} + } + + for magnet in magnet_settings.magnets.values(): + ii = magnet.sim_index + x_co = twiss['x'][ii] + y_co = twiss['y'][ii] + if not use_design: + dx, dy = SC.support_system.get_total_offset(ii) + roll, _, _ = SC.support_system.get_total_rotation(ii) + else: + dx = 0 + dy = 0 + roll = 0 + + r0 = dx - x_co + 1.j * (dy - y_co) + + while magnet.max_order > temp_max_order: + temp_max_order += 1 + integrated_strengths['norm'][temp_max_order] = np.zeros(N) + integrated_strengths['skew'][temp_max_order] = np.zeros(N) + + AB = (np.array(magnet.B) + 1.j*np.array(magnet.A)) * np.exp(1.j*roll) * magnet.length + for jj in range(magnet.max_order + 1): + AB_with_feeddown = feeddown(AB, r0, jj) + integrated_strengths['norm'][jj][ii] = AB_with_feeddown.real * FACTORIAL[jj] + integrated_strengths['skew'][jj][ii] = AB_with_feeddown.imag * FACTORIAL[jj] + + return integrated_strengths + +def calculate_c_minus(SC: Optional["SimulatedCommissioning"] = None, use_design: bool = False): + + M = SC.lattice.one_turn_matrix(use_design=use_design) + W, _, _, q1, q2 = linear_normal_form(M) + + c_r1 = np.sqrt(W[2,0]**2 + W[2,1]**2) / W[0,0] + c_r2 = np.sqrt(W[0,2]**2 + W[0,3]**2) / W[2,2] + c_phi1 = np.arctan2(W[2,1], W[2,0]) + + cmin_amp = (2 * np.sqrt(c_r1*c_r2) * np.abs(q1 - q2) / (1 + c_r1 * c_r2)) + c_minus = cmin_amp * np.exp(1j * c_phi1) + + return c_minus + +def hjklm(SC: Optional["SimulatedCommissioning"] = None, j: int = 0, k: int = 0, l: int = 0, m: int = 0, use_design: bool = False, + integrated_strengths: Optional[dict] = None, twiss: Optional[dict] = None): + n = j+k+l+m + assert n > 0 + + if integrated_strengths is None: + assert SC is not None + integrated_strengths = get_integrated_strengths_with_feeddown(SC, use_design=use_design) + if twiss is None: + assert SC is not None + twiss = SC.lattice.get_twiss(use_design=use_design) + + K = integrated_strengths['norm'][n-1] + J = integrated_strengths['skew'][n-1] + h = - (K * omega(l+m) + 1j * J * omega(l+m+1))/(FACTORIAL[j] * FACTORIAL[k] * FACTORIAL[l] * FACTORIAL[m] * 2**(n)) * (1.j)**(l+m) * twiss['betx']**((j+k)/2) * twiss['bety']**((l+m)/2) + return h + + +def fjklm(SC: Optional["SimulatedCommissioning"] = None, j: int = 0, k: int = 0, l: int = 0, m: int = 0, + use_design: bool = False, integrated_strengths: Optional[dict] = None, twiss: Optional[dict] = None, normalized: bool = True): + + assert j + k + l + m > 0 + + if twiss is None: + assert SC is not None + twiss = SC.lattice.get_twiss(use_design=use_design) + + qx = twiss['qx'] + qy = twiss['qy'] + denom = 1 - np.exp(1.j * 2 * np.pi * ((j-k) * qx + (l-m) * qy)) + h = hjklm(SC=SC, j=j, k=k, l=l, m=m, use_design=use_design, integrated_strengths=integrated_strengths, twiss=twiss) + mask = h != 0 + hm = h[mask] + mux = twiss['mux'][mask] + muy = twiss['muy'][mask] + ii = 0 + f = np.zeros_like(twiss['s'], dtype=complex) + for ii in range(len(twiss['s'])): + dphix = 2*np.pi*np.abs(twiss['mux'][ii] - mux) + dphiy = 2*np.pi*np.abs(twiss['muy'][ii] - muy) + expo = np.exp(1.j * ( (j-k) * dphix + (l-m) * dphiy)) + f[ii] = np.sum(hm * expo) + if normalized: + return f / denom + else: + return f + + +def Rot2D(mu): + return np.array([[ np.cos(mu), np.sin(mu)], + [-np.sin(mu), np.cos(mu)]]) + +def linear_normal_form(M): + w0, v0 = np.linalg.eig(M[:4,:4]) + + a0 = np.real(v0) + b0 = np.imag(v0) + + index_list = [0,1,2,3] + + ##### Sort modes in pairs of conjugate modes ##### + + conj_modes = np.zeros([2,2], dtype=int) + + conj_modes[0,0] = index_list[0] + del index_list[0] + + min_index = 0 + min_diff = abs(np.imag(w0[conj_modes[0,0]] + w0[index_list[min_index]])) + for i in range(1,len(index_list)): + diff = abs(np.imag(w0[conj_modes[0,0]] + w0[index_list[i]])) + if min_diff > diff: + min_diff = diff + min_index = i + + conj_modes[0,1] = index_list[min_index] + del index_list[min_index] + + conj_modes[1,0] = index_list[0] + conj_modes[1,1] = index_list[1] + + ################################################## + #### Select mode from pairs with positive (real @ S @ imag) ##### + + modes = np.empty(2, dtype=int) + for ii,ind in enumerate(conj_modes): + if np.matmul(np.matmul(a0[:,ind[0]], S4), b0[:,ind[0]]) > 0: + modes[ii] = ind[0] + else: + modes[ii] = ind[1] + + ################################################## + #### Sort modes such that (1,2) is close to (x,y) #### + + if abs(v0[:,modes[1]])[2] < abs(v0[:,modes[0]])[2]: + modes[0], modes[1] = modes[1], modes[0] + + ################################################## + #### Rotate eigenvectors to the Courant-Snyder parameterization #### + phase0 = np.log(v0[0,modes[0]]).imag + phase1 = np.log(v0[2,modes[1]]).imag + + v0[:,modes[0]] *= np.exp(-1.j*phase0) + v0[:,modes[1]] *= np.exp(-1.j*phase1) + + ################################################## + #### Construct W ################################# + + a1 = v0[:,modes[0]].real + a2 = v0[:,modes[1]].real + b1 = v0[:,modes[0]].imag + b2 = v0[:,modes[1]].imag + + n1 = 1./np.sqrt(np.matmul(np.matmul(a1, S4), b1)) + n2 = 1./np.sqrt(np.matmul(np.matmul(a2, S4), b2)) + + a1 *= n1 + a2 *= n2 + + b1 *= n1 + b2 *= n2 + + W = np.array([a1,b1,a2,b2]).T + W[abs(W) < 1.e-14] = 0. # Set very small numbers to zero. + invW = np.matmul(np.matmul(S4.T, W.T), S4) + + ################################################## + #### Get tunes and rotation matrix in the normalized coordinates #### + + mu1 = np.log(w0[modes[0]]).imag + mu2 = np.log(w0[modes[1]]).imag + + q1 = mu1/(2.*np.pi) + q2 = mu2/(2.*np.pi) + + R = np.zeros_like(W) + R[0:2,0:2] = Rot2D(mu1) + R[2:4,2:4] = Rot2D(mu2) + ################################################## + + return W, invW, R, q1, q2 \ No newline at end of file diff --git a/pySC/utils/stats.py b/pySC/utils/stats.py deleted file mode 100644 index 4f3d713..0000000 --- a/pySC/utils/stats.py +++ /dev/null @@ -1,345 +0,0 @@ -""" -Stats ------ - -Helper module providing statistical methods to compute various weighted averages along specified -axis and their errors as well as unbiased error estimator of infinite normal distribution from -finite-sized sample. -""" -import numpy as np -from scipy.special import erf -from scipy.stats import t - -import logging -LOGGER = logging.getLogger(__name__) - -CONFIDENCE_LEVEL = (1 + erf(1 / np.sqrt(2))) / 2 -PI2: float = 2 * np.pi -PI2I: complex = 2j * np.pi - - -def circular_mean(data, period=PI2, errors=None, axis=None): - """ - Computes weighted circular average along the specified axis. - - Parameters: - data: array-like - Contains the data to be averaged - period: scalar, optional, default (2 * np.pi) - Periodicity period of data - errors: array-like, optional - Contains errors associated with the values in data, it is used to calculated weights - axis: int or tuple of ints, optional - Axis or axes along which to average data - - Returns: - Returns the weighted circular average along the specified axis. - """ - phases = data * PI2I / period - weights = weights_from_errors(errors, period=period) - - return np.angle(np.average(np.exp(phases), axis=axis, weights=weights)) * period / PI2 - - -def circular_nanmean(data, period=PI2, errors=None, axis=None): - """"Wrapper around circular_mean with added nan handling""" - return circular_mean(data=np.ma.array(data, mask=np.isnan(data)), - period=period, - errors= None if errors is None else np.ma.array(errors, mask=np.isnan(data)), - axis=axis) - - -def circular_error(data, period=PI2, errors=None, axis=None, t_value_corr=True): - """ - Computes error of weighted circular average along the specified axis. - - Parameters: - data: array-like - Contains the data to be averaged - period: scalar, optional - Periodicity period of data, default is (2 * np.pi) - errors: array-like, optional - Contains errors associated with the values in data, it is used to calculated weights - axis: int or tuple of ints, optional - Axis or axes along which to average data - t_value_corr: bool, optional - Species if the error is corrected for small sample size, default True - - Returns: - Returns the error of weighted circular average along the specified axis. - """ - phases = data * PI2I / period - weights = weights_from_errors(errors, period=period) - complex_phases = np.exp(phases) - complex_average = np.average(complex_phases, axis=axis, weights=weights) - - (sample_variance, sum_of_weights) = np.average( - np.square(np.abs(complex_phases - complex_average.reshape(_get_shape( - complex_phases.shape, axis)))), weights=weights, axis=axis, returned=True) - if weights is not None: - sample_variance = sample_variance + 1. / sum_of_weights - error_of_complex_average = np.sqrt(sample_variance * unbias_variance(data, weights, axis=axis)) - phase_error = np.nan_to_num(error_of_complex_average / np.abs(complex_average)) - if t_value_corr: - phase_error = phase_error * t_value_correction(effective_sample_size(data, weights, axis=axis)) - return np.where(phase_error > 0.25 * PI2, 0.3 * period, phase_error * period / PI2) - - -def circular_nanerror(data, period=PI2, errors=None, axis=None, t_value_corr=True): - """"Wrapper around circular_error with added nan handling""" - return circular_error(data=np.ma.array(data, mask=np.isnan(data)), - period=period, - errors=None if errors is None else np.ma.array(errors, mask=np.isnan(data)), - axis=axis, - t_value_corr=t_value_corr) - - -def weighted_mean(data, errors=None, axis=None): - """ - Computes weighted average along the specified axis. - - Parameters: - data: array-like - Contains the data to be averaged - errors: array-like, optional - Contains errors associated with the values in data, it is used to calculated weights - axis: int or tuple of ints, optional - Axis or axes along which to average data - - Returns: - Returns the weighted average along the specified axis. - """ - weights = weights_from_errors(errors) - return np.average(data, axis=axis, weights=weights) - - -def weighted_nanmean(data, errors=None, axis=None): - """"Wrapper around weighted_mean with added nan handling""" - return weighted_mean(data=np.ma.array(data, mask=np.isnan(data)), - errors=None if errors is None else np.ma.array(errors, mask=np.isnan(data)), - axis=axis) - - -def _get_shape(orig_shape, axis): - new_shape = np.array(orig_shape) - if axis is None: - new_shape[:] = 1 - else: - new_shape[np.array(axis)] = 1 - return tuple(new_shape) - - -def weighted_error(data, errors=None, axis=None, t_value_corr=True): - """ - Computes error of weighted average along the specified axis. - This is similar to calculating the standard deviation on the data, - but with both, the average to which the deviation is calculated, - as well as then the averaging over the deviations weighted by - weights based on the errors. - - In addition, the weighted variance is unbiased by an unbias-factor - n / (n-1), where n is the :meth:`omc3.utils.stats.effective_sample_size` . - Additionally, a (student) t-value correction can be performed (done by default) - which corrects the estimate for small data sets. - - Parameters: - data: array-like - Contains the data on which the weighted error on the average is calculated. - errors: array-like, optional - Contains errors associated with the values in data, it is used to calculated weights - axis: int or tuple of ints, optional - Axis or axes along which to average data - t_value_corr: bool, optional - Species if the error is corrected for small sample size, default True - - Returns: - Returns the error of weighted average along the specified axis. - """ - weights = weights_from_errors(errors) - weighted_average = np.average(data, axis=axis, weights=weights) - weighted_average = weighted_average.reshape(_get_shape(data.shape, axis)) - (sample_variance, sum_of_weights) = np.ma.average( - np.square(np.abs(data - weighted_average)), - weights=weights, axis=axis, returned=True - ) - if weights is not None: - sample_variance = sample_variance + 1 / sum_of_weights - error = np.nan_to_num(np.sqrt(sample_variance * unbias_variance(data, weights, axis=axis))) - if t_value_corr: - error = error * t_value_correction(effective_sample_size(data, weights, axis=axis)) - return error - - -def circular_rms(data, period=PI2, axis=None): - """ - Computes the circular root mean square along the specified axis. - - Parameters: - data: array-like - Contains the data to be averaged - period: scalar, optional - Periodicity period of data, default is (2 * np.pi) - axis: int or tuple of ints, optional - Axis or axes along which to average data - - Returns: - Returns the circular root mean square along the specified axis. - """ - return np.sqrt(circular_mean(np.square(data / period), period=1, axis=axis) * period) - - -def rms(data, axis=None): - """ - Computes the root mean square along the specified axis. - - Parameters: - data: array-like - Contains the data to be averaged - axis: int or tuple of ints, optional - Axis or axes along which to average data - - Returns: - Returns the root mean square along the specified axis. - """ - return weighted_rms(data, axis=axis) - - -def weighted_rms(data, errors=None, axis=None): - """ - Computes weighted root mean square along the specified axis. - - Parameters: - data: array-like - Contains the data to be averaged - errors: array-like, optional - Contains errors associated with the values in data, it is used to calculated weights - axis: int or tuple of ints, optional - Axis or axes along which to average data - - Returns: - Returns weighted root mean square along the specified axis. - """ - weights = weights_from_errors(errors) - return np.sqrt(np.average(np.square(data), weights=weights, axis=axis)) - - -def weighted_nanrms(data, errors=None, axis=None): - """"Wrapper around weigthed_rms with added nan handling""" - return weighted_rms(data=np.ma.array(data, mask=np.isnan(data)), - errors=None if errors is None else np.ma.array(errors, mask=np.isnan(data)), - axis=axis) - - -def weights_from_errors(errors, period=PI2): - """ - Computes weights from measurement errors, weights are not output if errors contain zeros or nans - - Parameters: - errors: array-like - Contains errors which are used to calculated weights - period: scalar, optional - Periodicity period of data, default is (2 * np.pi) - - Returns: - Returns the error of weighted circular average along the specified axis. - """ - if errors is None: - return None - if np.any(np.isnan(errors)): - LOGGER.warning("NaNs found, weights are not used.") - return None - if np.any(np.logical_not(errors)): - LOGGER.warning("Zeros found, weights are not used.") - return None - return 1 / np.square(errors * PI2 / period) - - -def effective_sample_size(data, weights, axis=None): - r""" - Computes effective sample size of weighted data along specified axis, - the minimum value returned is 2 to avoid non-reasonable error blow-up. - - It is calculated via Kish's approximate formula - from the (not necessarily normalized) weights :math:`w_i` (see wikipedia): - - .. math:: - - n_\mathrm{eff} = \frac{\left(\sum_i w_i\right)^2}{\sum_i w_i^2} - - What it represents: - "In most instances, weighting causes a decrease in the statistical significance of results. - The effective sample size is a measure of the precision of the survey - (e.g., even if you have a sample of 1,000 people, an effective sample size of 100 would indicate - that the weighted sample is no more robust than a well-executed un-weighted - simple random sample of 100 people)." - - https://wiki.q-researchsoftware.com/wiki/Weights,_Effective_Sample_Size_and_Design_Effects - - Parameters: - data: array-like - weights: array-like - Contains weights associated with the values in data - axis: int or tuple of ints, optional - Axis or axes along which the effective sample size is computed - - Returns: - Returns the error of weighted circular average along the specified axis. - """ - if weights is None: - sample_size = np.sum(np.ones(data.shape), axis=axis) - else: - sample_size = np.square(np.sum(weights, axis=axis)) / np.sum(np.square(weights), axis=axis) - return np.where(sample_size > 2, sample_size, 2) - - -def unbias_variance(data, weights, axis=None): - r""" - Computes a correction factor to unbias variance of weighted average of data along specified axis, - e.g. transform the standard deviation 1 - - .. math:: - - \sigma^2 = \frac{1}{N} \sum_{i=1}^N (x_i - x_\mathrm{mean})^2 - - into an un-biased estimator - - .. math:: - - \sigma^2 = \frac{1}{N-1} \sum_{i=1}^N (x_i - x_\mathrm{mean})^2 - - Parameters: - data: array-like - weights: array-like - Contains weights associated with the values in data - axis: int or tuple of ints, optional - Axis or axes along which the effective sample size is computed - - Returns: - Returns the error of weighted circular average along the specified axis. - """ - sample_size = effective_sample_size(data, weights, axis=axis) - try: - return sample_size / (sample_size - 1) - except ZeroDivisionError or RuntimeWarning: - return np.zeros(sample_size.shape) - - -def t_value_correction(sample_size): - """ - Calculates the multiplicative correction factor to determine standard deviation of - a normally distributed quantity from standard deviation of its finite-sized sample. - The minimum allowed sample size is 2 to avoid non-reasonable error blow-up - for smaller sample sizes 2 is used instead. - - Note (jdilly): In other words, this transforms the area of 1 sigma under - the given student t distribution to the 1 sigma area of a normal distribution - (this transformation is where the ``CONFIDENCE LEVEL`` comes in). - I hope that makes the intention more clear. - - Args: - sample_size: array-like - - Returns: - multiplicative correction factor(s) of same shape as sample_size. - Can contain nans. - """ - return t.ppf(CONFIDENCE_LEVEL, np.where(sample_size > 2, sample_size, 2) - 1)