diff --git a/pyaml/accelerator.py b/pyaml/accelerator.py index 09bf35f9..c82c355d 100644 --- a/pyaml/accelerator.py +++ b/pyaml/accelerator.py @@ -8,11 +8,13 @@ from .arrays.array import ArrayConfig from .common.element import Element +from .common.element_holder import ElementHolder from .common.exception import PyAMLConfigException from .configuration.factory import Factory from .configuration.fileloader import load, set_root_folder from .control.controlsystem import ControlSystem from .lattice.simulator import Simulator +from .yellow_pages import YellowPages # Define the main class name for this module PYAMLCLASS = "Accelerator" @@ -66,24 +68,28 @@ def __init__(self, cfg: ConfigModel): self._cfg = cfg __design = None __live = None + self._controls: dict[str, ElementHolder] = {} + self._simulators: dict[str, ElementHolder] = {} if cfg.controls is not None: for c in cfg.controls: if c.name() == "live": self.__live = c else: - # Add as dynacmic attribute + # Add as dynamic attribute setattr(self, c.name(), c) c.fill_device(cfg.devices) + self._controls[c.name()] = c if cfg.simulators is not None: for s in cfg.simulators: if s.name() == "design": self.__design = s else: - # Add as dynacmic attribute + # Add as dynamic attribute setattr(self, s.name(), s) s.fill_device(cfg.devices) + self._simulators[s.name()] = s if cfg.arrays is not None: for a in cfg.arrays: @@ -97,6 +103,8 @@ def __init__(self, cfg: ConfigModel): if cfg.energy is not None: self.set_energy(cfg.energy) + self._yellow_pages = YellowPages(self) + self.post_init() def set_energy(self, E: float): @@ -156,6 +164,25 @@ def design(self) -> Simulator: """ return self.__design + @property + def yellow_pages(self) -> YellowPages: + return self._yellow_pages + + def simulators(self) -> dict[str, "ElementHolder"]: + """Return all registered simulator modes.""" + return self._simulators + + def controls(self) -> dict[str, "ElementHolder"]: + """Return all registered control modes.""" + return self._controls + + def modes(self) -> dict[str, "ElementHolder"]: + """Return all registered control and simulator modes.""" + modes: dict[str, "ElementHolder"] = {} + modes.update(self._simulators) + modes.update(self._controls) + return modes + def __repr__(self): return repr(self._cfg).replace("ConfigModel", self.__class__.__name__) @@ -182,9 +209,7 @@ def from_dict(config_dict: dict, ignore_external=False) -> "Accelerator": return Factory.depth_first_build(config_dict, ignore_external) @staticmethod - def load( - filename: str, use_fast_loader: bool = False, ignore_external=False - ) -> "Accelerator": + def load(filename: str, use_fast_loader: bool = False, ignore_external=False) -> "Accelerator": """ Load an accelerator from a config file. diff --git a/pyaml/apidoc/gen_api.py b/pyaml/apidoc/gen_api.py index 3a4cd81a..cccef14e 100644 --- a/pyaml/apidoc/gen_api.py +++ b/pyaml/apidoc/gen_api.py @@ -82,6 +82,7 @@ "pyaml.tuning_tools.orbit_response_matrix", "pyaml.tuning_tools.response_matrix", "pyaml.tuning_tools.tune", + "pyaml.yellow_pages", ] @@ -111,17 +112,16 @@ def generate_selective_module(m): file.write(f".. automodule:: {p.__name__}\n") if len(all_cls) > 0: # Exclude classes that will be treated by autoclass - file.write( - f" :exclude-members: {','.join([c.__name__ for c in all_cls])}\n\n" - ) + file.write(f" :exclude-members: {','.join([c.__name__ for c in all_cls])}\n\n") for c in all_cls: file.write(f" .. autoclass:: {c.__name__}\n") file.write(" :members:\n") if m in ["pyaml.arrays.element_array"]: # Include special members for operator overloading - file.write( - " :special-members: __add__, __and__, __or__, __sub__ \n" - ) + file.write(" :special-members: __add__, __and__, __or__, __sub__ \n") + if m in ["pyaml.yellow_pages"]: + # Include special members for exploratory overloading + file.write(" :special-members: __dir__, __getattr__, __getitem__, __repr__, __str__ \n") file.write(" :exclude-members: model_config\n") file.write(" :undoc-members:\n") file.write(" :show-inheritance:\n\n") @@ -129,9 +129,7 @@ def generate_selective_module(m): def generate_toctree(filename: str, title: str, level: int, module: str): sub_modules = [m for m in modules if m.startswith(module)] - level_path = sorted( - set([m.split(".")[level + 1 : level + 2][0] for m in sub_modules]) - ) + level_path = sorted(set([m.split(".")[level + 1 : level + 2][0] for m in sub_modules])) paths = [] @@ -166,9 +164,7 @@ def gen_doc(): while len(paths) > 0: npaths = [] for p in paths: - npaths.extend( - generate_toctree(f"api/{'pyaml.' + p}.rst", f"{p}", level, "pyaml." + p) - ) + npaths.extend(generate_toctree(f"api/{'pyaml.' + p}.rst", f"{p}", level, "pyaml." + p)) paths = npaths level += 1 print("done") diff --git a/pyaml/arrays/element_array.py b/pyaml/arrays/element_array.py index d9415729..8a3a4770 100644 --- a/pyaml/arrays/element_array.py +++ b/pyaml/arrays/element_array.py @@ -96,9 +96,7 @@ def __create_array(self, array_name: str, element_type: type, elements: list): elif issubclass(element_type, Element): return ElementArray(array_name, elements, self.__use_aggregator) else: - raise PyAMLException( - f"Unsupported sliced array for type {str(element_type)}" - ) + raise PyAMLException(f"Unsupported sliced array for type {str(element_type)}") def __eval_field(self, attribute_name: str, element: Element) -> str: function_name = "get_" + attribute_name @@ -109,16 +107,14 @@ def __ensure_compatible_operand(self, other: object) -> "ElementArray": """Validate the operand used for set-like operations between arrays.""" if not isinstance(other, ElementArray): raise TypeError( - f"Unsupported operand type(s) for set operation: " - f"'{type(self).__name__}' and '{type(other).__name__}'" + f"Unsupported operand type(s) for set operation: '{type(self).__name__}' and '{type(other).__name__}'" ) if len(self) > 0 and len(other) > 0: if self.get_peer() is not None and other.get_peer() is not None: if self.get_peer() != other.get_peer(): raise PyAMLException( - f"{self.__class__.__name__}: cannot operate on arrays " - "attached to different peers" + f"{self.__class__.__name__}: cannot operate on arrays attached to different peers" ) return other @@ -170,9 +166,7 @@ def __is_bool_mask(self, other: object) -> bool: pass # --- python sequence of bools (but not a string/bytes) --- - if isinstance(other, Sequence) and not isinstance( - other, (str, bytes, bytearray) - ): + if isinstance(other, Sequence) and not isinstance(other, (str, bytes, bytearray)): # Avoid treating ElementArray as a mask if isinstance(other, ElementArray): return False @@ -195,8 +189,7 @@ def __and__(self, other: object): If ``other`` is an ElementArray, the result contains elements whose names are present in both arrays. - Example - ------- + **Example** .. code-block:: python @@ -208,8 +201,8 @@ def __and__(self, other: object): If ``other`` is a boolean mask (list[bool] or numpy.ndarray of bool), elements are kept where the mask is True. - Example - ------- + **Example** + .. code-block:: python >>> mask = cell1.mask_by_type(Magnet) @@ -232,8 +225,7 @@ def __and__(self, other: object): mask = list(other) # works for list/tuple and numpy arrays if len(mask) != len(self): raise ValueError( - f"{self.__class__.__name__}: mask length ({len(mask)}) " - f"does not match array length ({len(self)})" + f"{self.__class__.__name__}: mask length ({len(mask)}) does not match array length ({len(self)})" ) res = [e for e, keep in zip(self, mask, strict=True) if bool(keep)] return self.__auto_array(res) @@ -261,8 +253,7 @@ def __sub__(self, other: object): If ``other`` is an ElementArray, the result contains elements whose names are present in ``self`` but not in ``other``. - Example - ------- + **Example** .. code-block:: python @@ -275,8 +266,7 @@ def __sub__(self, other: object): elements are removed where the mask is True. This is the inverse of ``& mask``. - Example - ------- + **Example** .. code-block:: python @@ -300,8 +290,7 @@ def __sub__(self, other: object): mask = list(other) if len(mask) != len(self): raise ValueError( - f"{self.__class__.__name__}: mask length ({len(mask)}) " - f"does not match array length ({len(self)})" + f"{self.__class__.__name__}: mask length ({len(mask)}) does not match array length ({len(self)})" ) res = [e for e, remove in zip(self, mask, strict=True) if not bool(remove)] return self.__auto_array(res) diff --git a/pyaml/common/element_holder.py b/pyaml/common/element_holder.py index 5b345ebd..72abd0b2 100644 --- a/pyaml/common/element_holder.py +++ b/pyaml/common/element_holder.py @@ -110,13 +110,10 @@ def fill_array( try: m = get_func(n) except Exception as err: - raise PyAMLException( - f"{constructor.__name__} {array_name} : {err} @index {len(a)}" - ) from None + raise PyAMLException(f"{constructor.__name__} {array_name} : {err} @index {len(a)}") from None if m in a: raise PyAMLException( - f"{constructor.__name__} {array_name} : " - f"duplicate name {name} @index {len(a)}" + f"{constructor.__name__} {array_name} : duplicate name {name} @index {len(a)}" ) from None a.append(m) ARR[array_name] = constructor(array_name, a) @@ -124,8 +121,7 @@ def fill_array( def __add(self, array, element: Element): if element.get_name() in self.__ALL: # Ensure name unicity raise PyAMLException( - f"Duplicate element {element.__class__.__name__} " - "name {element.get_name()}" + f"Duplicate element {element.__class__.__name__} name {{element.get_name()}}" ) from None array[element.get_name()] = element self.__ALL[element.get_name()] = element @@ -157,9 +153,7 @@ def get_all_elements(self) -> list[Element]: # Magnets def fill_magnet_array(self, arrayName: str, elementNames: list[str]): - self.fill_array( - arrayName, elementNames, self.get_magnet, MagnetArray, self.__MAGNET_ARRAYS - ) + self.fill_array(arrayName, elementNames, self.get_magnet, MagnetArray, self.__MAGNET_ARRAYS) def get_magnet(self, name: str) -> Magnet: return self.__get("Magnet", name, self.__MAGNETS) @@ -191,9 +185,7 @@ def add_cfm_magnet(self, m: Magnet): self.__add(self.__CFM_MAGNETS, m) def get_cfm_magnets(self, name: str) -> CombinedFunctionMagnetArray: - return self.__get( - "CombinedFunctionMagnet array", name, self.__CFM_MAGNET_ARRAYS - ) + return self.__get("CombinedFunctionMagnet array", name, self.__CFM_MAGNET_ARRAYS) def get_all_cfm_magnets(self) -> list[CombinedFunctionMagnet]: return [value for key, value in self.__CFM_MAGNETS.items()] @@ -216,9 +208,7 @@ def add_serialized_magnet(self, m: Magnet): self.__add(self.__SERIALIZED_MAGNETS, m) def get_serialized_magnets(self, name: str) -> SerializedMagnetsArray: - return self.__get( - "SerializedMagnets array", name, self.__SERIALIZED_MAGNETS_ARRAYS - ) + return self.__get("SerializedMagnets array", name, self.__SERIALIZED_MAGNETS_ARRAYS) def get_all_serialized_magnets(self) -> list[SerializedMagnets]: return [value for key, value in self.__SERIALIZED_MAGNETS.items()] @@ -318,3 +308,63 @@ def add_dispersion_tuning(self, dispersion: Element): @property def dispersion(self) -> "Dispersion": return self.get_dispersion_tuning("DEFAULT_DISPERSION") + + def _get_array(self, name: str): + """ + Generic array resolver used by YellowPages. + + The method returns the array object referenced by 'name', regardless of its + concrete type. + """ + if name in self.__BPM_ARRAYS: + return self.__BPM_ARRAYS[name] + if name in self.__MAGNET_ARRAYS: + return self.__MAGNET_ARRAYS[name] + if name in self.__CFM_MAGNET_ARRAYS: + return self.__CFM_MAGNET_ARRAYS[name] + if name in self.__SERIALIZED_MAGNETS_ARRAYS: + return self.__SERIALIZED_MAGNETS_ARRAYS[name] + if name in self.__ELEMENT_ARRAYS: + return self.__ELEMENT_ARRAYS[name] + + raise PyAMLException(f"Array {name} not defined") + + def _get_tool(self, name: str): + """ + Generic tuning tool resolver used by YellowPages. + """ + if name not in self.__TUNING_TOOLS: + raise PyAMLException(f"Tool {name} not defined") + return self.__TUNING_TOOLS[name] + + def _get_diagnostic(self, name: str): + """ + Generic diagnostic resolver used by YellowPages. + """ + if name not in self.__DIAG: + raise PyAMLException(f"Diagnostic {name} not defined") + return self.__DIAG[name] + + def _list_arrays(self) -> list[str]: + """ + Return all array identifiers available in this holder. + """ + arrays: list[str] = [] + arrays.extend(self.__BPM_ARRAYS.keys()) + arrays.extend(self.__MAGNET_ARRAYS.keys()) + arrays.extend(self.__CFM_MAGNET_ARRAYS.keys()) + arrays.extend(self.__SERIALIZED_MAGNETS_ARRAYS.keys()) + arrays.extend(self.__ELEMENT_ARRAYS.keys()) + return arrays + + def _list_tools(self) -> list[str]: + """ + Return all tuning tool identifiers available in this holder. + """ + return list(self.__TUNING_TOOLS.keys()) + + def _list_diagnostics(self) -> list[str]: + """ + Return all diagnostic identifiers available in this holder. + """ + return list(self.__DIAG.keys()) diff --git a/pyaml/yellow_pages.py b/pyaml/yellow_pages.py new file mode 100644 index 00000000..cc8793aa --- /dev/null +++ b/pyaml/yellow_pages.py @@ -0,0 +1,672 @@ +""" +yellow_pages.py + +Fully dynamic YellowPages service attached to :class:`~pyaml.accelerator.Accelerator`. + +Key points: + - Auto-discovery only: arrays, tools and diagnostics are discovered at runtime + by scanning all modes. + - No caching: every call reflects current runtime state. + - Simple query syntax for identifiers: + - wildcard / fnmatch: + yp["OH4*"] + - regular expression: + yp["re:^SH1A-C0[12]-H$"] + +:class:`~pyaml.accelerator.Accelerator` interface +------------------------------ + - controls() -> dict[str, ElementHolder] + - simulators() -> dict[str, ElementHolder] + - modes() -> dict[str, ElementHolder] +""" + +import fnmatch +import re +from enum import Enum +from typing import TYPE_CHECKING, Any + +from pyaml import PyAMLException + +if TYPE_CHECKING: + from .accelerator import Accelerator + + +class YellowPagesCategory(str, Enum): + ARRAYS = "Arrays" + TOOLS = "Tools" + DIAGNOSTICS = "Diagnostics" + + +class YellowPagesError(PyAMLException): + """YellowPages-specific error with clear user-facing messages.""" + + +class YellowPagesQueryError(YellowPagesError): + """Raised when a YellowPages query string cannot be evaluated.""" + + +_VALID_KEY_RE = re.compile(r"^[A-Z0-9_]+$") + + +class YellowPages: + r""" + Dynamic discovery service for accelerator objects. + + :class:`YellowPages` provides a unified access layer to arrays, + tuning tools and diagnostics available in an + :class:`~pyaml.accelerator.Accelerator`. + + Entries are discovered dynamically by scanning all + :class:`~pyaml.element_holder.ElementHolder` instances + associated with the :class:`~pyaml.accelerator.Accelerator` control and simulation modes. + + Notes + ----- + The :class:`~pyaml.accelerator.Accelerator` must provide: + + - ``controls() -> dict[str, ElementHolder]`` + - ``simulators() -> dict[str, ElementHolder]`` + - ``modes() -> dict[str, ElementHolder]`` + + Examples + -------- + + Print the global overview: + + .. code-block:: python + + >>> print(sr.yellow_pages) + + Resolve an entry across all modes: + + .. code-block:: python + + >>> sr.yellow_pages.get("BPM") + + Resolve in a specific mode: + + .. code-block:: python + + >>> sr.yellow_pages.get("BPM", mode="live") + + Search identifiers using wildcards: + + .. code-block:: python + + >>> sr.yellow_pages["OH4*"] + + Search identifiers using a regular expression: + + .. code-block:: python + + >>> sr.yellow_pages["re:^SH1A-C0[12]-H$"] + """ + + def __init__(self, accelerator: "Accelerator"): + self._acc = accelerator + + def has(self, key: str) -> bool: + r""" + Check whether a YellowPages key exists. + + Parameters + ---------- + key : str + Entry name. + + Returns + ------- + bool + ``True`` if the key exists. + + Examples + -------- + + .. code-block:: python + + >>> sr.yellow_pages.has("BPM") + True + + .. code-block:: python + + >>> sr.yellow_pages.has("UNKNOWN") + False + + """ + return key in self._all_keys() + + def categories(self) -> list[str]: + r""" + Return the list of available categories. + + Only categories that contain entries are returned. + + Returns + ------- + list[str] + + Examples + -------- + + .. code-block:: python + + >>> sr.yellow_pages.categories() + ['Arrays', 'Tools', 'Diagnostics'] + + """ + discovered = self._discover() + present = {cat for cat, keys in discovered.items() if keys} + return [c.value for c in YellowPagesCategory if c in present] + + def keys(self, category: str | YellowPagesCategory | None = None) -> list[str]: + r""" + Return available YellowPages keys. + + Parameters + ---------- + category : str or YellowPagesCategory, optional + Restrict the result to a specific category. + + Returns + ------- + list[str] + + Examples + -------- + + All entries: + + .. code-block:: python + + >>> sr.yellow_pages.keys() + + Only arrays: + + .. code-block:: python + + >>> sr.yellow_pages.keys("Arrays") + + Using enum: + + .. code-block:: python + + >>> sr.yellow_pages.keys(YellowPagesCategory.ARRAYS) + + """ + discovered = self._discover() + + if category is None: + return self._all_keys() + + cat = YellowPagesCategory(category) + return discovered.get(cat, []) + + def __dir__(self): + """ + Extend ``dir()`` with attribute-friendly discovered keys. + """ + default = super().__dir__() + return sorted(set(default) | {k for k in self._all_keys() if _VALID_KEY_RE.match(k)}) + + def __getattr__(self, name): + """ + Allow attribute-style access for valid discovered keys. + + Examples + -------- + + .. code-block:: python + + >>> sr.yellow_pages.BPM + """ + if name in self._all_keys(): + return self._get_object(name) + raise AttributeError(f"'YellowPages' object has no attribute '{name}'") + + def availability(self, key: str) -> set[str]: + r""" + Return the set of modes where a key is available. + + Parameters + ---------- + key : str + Entry name. + + Returns + ------- + set[str] + + Examples + -------- + + .. code-block:: python + + >>> sr.yellow_pages.availability("BPM") + >>> sr.yellow_pages.availability("DEFAULT_ORBIT_CORRECTION") + """ + self._require_key(key) + avail: set[str] = set() + for mode_name, holder in self._acc.modes().items(): + if self._try_resolve_in_holder(key, holder) is not None: + avail.add(mode_name) + return avail + + def _get_object(self, key: str, *, mode: str | None = None): + r""" + Resolve a YellowPages entry. + + Parameters + ---------- + key : str + Entry name. + mode : str, optional + Restrict the resolution to a specific mode. + + Returns + ------- + object or dict[str, object] + + If ``mode`` is specified, returns the resolved object. + + Otherwise returns a dictionary mapping mode names + to resolved objects. + + Raises + ------ + KeyError + If the key does not exist. + YellowPagesError + If the mode is unknown or the key is not available + in the requested mode. + + Examples + -------- + + Resolve across all modes: + + .. code-block:: python + + >>> sr.yellow_pages.get("BPM") + + Resolve in a specific mode: + + .. code-block:: python + + >>> sr.yellow_pages.get("BPM", mode="live") + + Using attribute access: + + .. code-block:: python + + >>> sr.yellow_pages.BPM + + """ + self._require_key(key) + + if mode is not None: + holder = self._acc.modes().get(mode) + if holder is None: + raise YellowPagesError(f"Unknown mode '{mode}'.") + obj = self._try_resolve_in_holder(key, holder) + if obj is None: + raise YellowPagesError(f"YellowPages key '{key}' not available in mode '{mode}'.") + return obj + + out: dict[str, Any] = {} + for mode_name, holder in self._acc.modes().items(): + obj = self._try_resolve_in_holder(key, holder) + if obj is not None: + out[mode_name] = obj + return out + + def __getitem__(self, query: str) -> list[str]: + """ + Alias for :meth:`get`. + """ + return self.get(query) + + def get(self, query: str, mode: str | None = None) -> list[str]: + """ + Search identifiers using a wildcard or regular expression. + + Parameters + ---------- + query : str + Search expression. + mode : str, optional + Restrict the search to a specific :class:`~pyaml.accelerator.Accelerator` mode. + + Returns + ------- + list[str] + + Examples + -------- + + .. code-block:: python + + >>> sr.yellow_pages.get("OH4*") + + >>> sr.yellow_pages.get("OH4*", mode="live") + + >>> sr.yellow_pages.get("re:^SH1A-C0[12]-H$") + + """ + if not query or not query.strip(): + raise YellowPagesQueryError("Empty YellowPages query.") + + query = query.strip() + + if mode is not None: + holder = self._acc.modes().get(mode) + if holder is None: + raise YellowPagesError(f"Unknown mode '{mode}'.") + + if query in holder._list_arrays(): + return self._object_to_ids(holder._get_array(query)) + else: + ids = self._ids_from_holder(holder) + else: + if query in self.keys(YellowPagesCategory.ARRAYS): + if mode is None: + arr_list: list[str] = [] + for a_mode in self._acc.modes(): + try: + obj = self._get_object(query, mode=a_mode) + array_ids = self._object_to_ids(obj) + self._extend_unique(arr_list, array_ids) + except Exception: + continue + return arr_list + + return self._object_to_ids(self._get_object(query, mode=mode)) + ids = self._all_known_ids() + + if query.startswith("re:"): + pattern = query[3:] + try: + rx = re.compile(pattern) + except re.error as ex: + raise YellowPagesQueryError(f"Invalid regex '{pattern}': {ex}") from ex + + return [i for i in ids if rx.search(i)] + + return [i for i in ids if fnmatch.fnmatch(i, query)] + + def __repr__(self) -> str: + r""" + Return a human-readable overview of the YellowPages content. + + The representation lists: + + - controls + - simulators + - discovered arrays, tools and diagnostics + + The displayed type corresponds to the Python module + defining the resolved object. + + Examples + -------- + + .. code-block:: python + + >>> print(sr.yellow_pages) + + Example output: + + .. code-block:: text + + Controls: + live + . + + Simulators: + design + . + + Arrays: + BPM (pyaml.arrays.bpm_array) size=224 + HCORR (pyaml.arrays.magnet_array) size=96 + . + + Tools: + DEFAULT_ORBIT_CORRECTION (pyaml.tuning_tools.orbit) + . + + Diagnostics: + BETATRON_TUNE (pyaml.diagnostics.tune_monitor) + . + """ + lines: list[str] = [] + + lines.append("Controls:") + controls = self._acc.controls() + if controls: + for name in controls.keys(): + lines.append(f" {name}") + lines.append(" .") + lines.append("") + + lines.append("Simulators:") + simulators = self._acc.simulators() + if simulators: + for name in simulators.keys(): + lines.append(f" {name}") + lines.append(" .") + lines.append("") + + discovered = self._discover() + for cat in YellowPagesCategory: + keys = discovered.get(cat, set()) + if not keys: + continue + + lines.append(f"{cat.value}:") + for key in keys: + lines.append(self._format_key(cat, key)) + lines.append(" .") + lines.append("") + + return "\n".join(lines).rstrip() + + def __str__(self) -> str: + return self.__repr__() + + def _discover(self) -> dict[YellowPagesCategory, list[str]]: + arrays: list[str] = [] + tools: list[str] = [] + diags: list[str] = [] + + for _, holder in self._acc.modes().items(): + try: + self._extend_unique(arrays, holder._list_arrays()) + except Exception: + pass + try: + self._extend_unique(tools, holder._list_tools()) + except Exception: + pass + try: + self._extend_unique(diags, holder._list_diagnostics()) + except Exception: + pass + + return { + YellowPagesCategory.ARRAYS: arrays, + YellowPagesCategory.TOOLS: tools, + YellowPagesCategory.DIAGNOSTICS: diags, + } + + def _extend_unique(self, target: list[str], values: list[str]) -> None: + """ + Append values to target while preserving insertion order and uniqueness. + """ + for value in values: + if value not in target: + target.append(value) + + def _all_keys(self) -> list[str]: + discovered = self._discover() + out: list[str] = [] + for keys in discovered.values(): + self._extend_unique(out, keys) + return out + + def _require_key(self, key: str) -> None: + if key not in self._all_keys(): + raise KeyError(self._unknown_key_message(key)) + + def _unknown_key_message(self, key: str) -> str: + available = ", ".join(self._all_keys()) + return f"Unknown YellowPages key '{key}'. Available keys: {available if available else ''}" + + def _try_resolve_in_holder(self, key: str, holder: Any) -> Any | None: + """ + Resolve a discovered key in a holder. + + Resolution order: + - arrays + - tools + - diagnostics + """ + try: + if key in holder._list_arrays(): + return holder._get_array(key) + except Exception: + pass + + try: + if key in holder._list_tools(): + return holder._get_tool(key) + except Exception: + pass + + try: + if key in holder._list_diagnostics(): + return holder._get_diagnostic(key) + except Exception: + pass + + return None + + def _get_type_name_from_resolved(self, resolved: dict[str, Any]) -> str | None: + """ + Return the public type name used in ``__repr__``. + + Only the module path is displayed, not the concrete class name. + + Examples + -------- + + .. code-block:: text + + pyaml.arrays.bpm_array + pyaml.tuning_tools.orbit + pyaml.diagnostics.tune_monitor + """ + for obj in resolved.values(): + if obj is None: + continue + return obj.__class__.__module__ + return None + + def _format_key(self, category: YellowPagesCategory, key: str) -> str: + """ + Format one discovered key for ``__repr__``. + """ + resolved = self._get_object(key) + type_name = self._get_type_name_from_resolved(resolved) + type_part = f" ({type_name})" if type_name else "" + + modes = list(resolved.keys()) + all_modes = list(self._acc.modes().keys()) + + availability_part = "" + if set(modes) != set(all_modes): + missing = [mode for mode in all_modes if mode not in modes] + availability_part = f" modes={modes} missing={missing}" + + if category == YellowPagesCategory.ARRAYS: + sizes: dict[str, int] = {} + for mode_name, obj in resolved.items(): + try: + sizes[mode_name] = len(obj) + except Exception: + sizes[mode_name] = 0 + + if modes == all_modes and sizes and len(set(sizes.values())) == 1: + size_part = f" size={next(iter(sizes.values()))}" + else: + size_part = " size={" + ", ".join(f"{m}:{n}" for m, n in sizes.items()) + "}" + + return f" {key:<21}{type_part:<40}{size_part}{availability_part}" + + return f" {key}{type_part}{availability_part}" + + def _object_to_ids(self, obj: Any) -> list[str]: + """ + Convert a resolved object into a set of identifiers. + """ + if obj is None: + return [] + + if isinstance(obj, (list, tuple, set)) and all(isinstance(x, str) for x in obj): + return list(obj) + + ids: list[str] = list() + try: + for x in obj: + if isinstance(x, str): + if x not in ids: + ids.append(x) + elif hasattr(x, "get_name") and callable(x.get_name): + if x.get_name() not in ids: + ids.append(x.get_name()) + elif hasattr(x, "name") and callable(x.name): + if x.name() not in ids: + ids.append(x.name()) + elif hasattr(x, "name") and isinstance(x.name, str): + if x.name not in ids: + ids.append(x.name) + else: + if str(x) not in ids: + ids.append(str(x)) + return ids + except TypeError: + if isinstance(obj, str): + return [obj] + if hasattr(obj, "get_name") and callable(obj.get_name): + return [obj.get_name()] + return [str(obj)] + + def _ids_for_key_union_all_modes(self, key: str) -> list[str]: + out: list[str] = [] + resolved = self._get_object(key) + for obj in resolved.values(): + self._extend_unique(out, self._object_to_ids(obj)) + return out + + def _all_known_ids(self) -> list[str]: + """ + Collect all identifiers from all discovered arrays across all modes. + """ + all_ids: list[str] = [] + for array_name in self.keys(YellowPagesCategory.ARRAYS): + try: + self._extend_unique(all_ids, self._ids_for_key_union_all_modes(array_name)) + except Exception: + continue + return all_ids + + def _ids_from_holder(self, holder) -> list[str]: + ids: list[str] = [] + + try: + for name in holder._list_arrays(): + arr = holder._get_array(name) + self._extend_unique(ids, self._object_to_ids(arr)) + except Exception: + pass + + return ids diff --git a/tests/test_yellow_pages.py b/tests/test_yellow_pages.py new file mode 100644 index 00000000..9aa31cb1 --- /dev/null +++ b/tests/test_yellow_pages.py @@ -0,0 +1,328 @@ +# tests/test_yellow_pages.py +# +# Self-contained tests for fully dynamic YellowPages. +# Comments are in English. + +import re + +import pytest + +from pyaml.yellow_pages import ( + YellowPages, + YellowPagesCategory, + YellowPagesError, + YellowPagesQueryError, +) + +# --------------------------- +# Minimal test doubles +# --------------------------- + + +class _Array: + """Minimal array-like object exposing names and a stable module path.""" + + __module__ = "pyaml.arrays.test_array" + + def __init__(self, names): + self._names = list(names) + + def __len__(self): + return len(self._names) + + def __iter__(self): + return iter(self._names) + + +class _Tool: + """Minimal tool-like object exposing a stable module path.""" + + __module__ = "pyaml.tuning_tools.test_tool" + + +class _Diagnostic: + """Minimal diagnostic-like object exposing a stable module path.""" + + __module__ = "pyaml.diagnostics.test_diagnostic" + + +class _Holder: + """Minimal ElementHolder double with discovery + resolution.""" + + def __init__(self, arrays=None, tools=None, diagnostics=None): + self._arrays = dict(arrays or {}) + self._tools = dict(tools or {}) + self._diagnostics = dict(diagnostics or {}) + + # Discovery + def _list_arrays(self) -> list[str]: + return list(self._arrays.keys()) + + def _list_tools(self) -> list[str]: + return list(self._tools.keys()) + + def _list_diagnostics(self) -> list[str]: + return list(self._diagnostics.keys()) + + # Resolution + def _get_array(self, name: str): + if name not in self._arrays: + raise KeyError(name) + return self._arrays[name] + + def _get_tool(self, name: str): + if name not in self._tools: + raise KeyError(name) + return self._tools[name] + + def _get_diagnostic(self, name: str): + if name not in self._diagnostics: + raise KeyError(name) + return self._diagnostics[name] + + +class _Accelerator: + """Minimal Accelerator double.""" + + def __init__(self, controls: dict[str, _Holder], simulators: dict[str, _Holder]): + self._controls = dict(controls) + self._simulators = dict(simulators) + self._modes = {**self._controls, **self._simulators} + + def controls(self) -> dict[str, _Holder]: + return dict(self._controls) + + def simulators(self) -> dict[str, _Holder]: + return dict(self._simulators) + + def modes(self) -> dict[str, _Holder]: + return dict(self._modes) + + +# --------------------------- +# Fixtures +# --------------------------- + + +@pytest.fixture() +def accelerator(): + # Controls + live = _Holder( + arrays={ + "BPM": _Array(["BPM_C01-01", "BPM_C01-02", "BPM_C02-01"]), + "HCORR": _Array(["CH_C01-01", "CH_C01-02"]), + }, + tools={"DEFAULT_ORBIT_CORRECTION": _Tool()}, + diagnostics={"DEFAULT_BETATRON_TUNE_MONITOR": _Diagnostic()}, + ) + + tango = _Holder( + arrays={ + "BPM": _Array(["BPM_C01-01", "BPM_C01-02", "BPM_C02-01"]), + # HCORR intentionally missing in tango + }, + tools={}, + diagnostics={}, + ) + + # Simulators + design = _Holder( + arrays={ + "BPM": _Array(["BPM_C01-01", "BPM_C01-02", "BPM_C02-01"]), + "HCORR": _Array(["CH_C01-01"]), + }, + tools={"DEFAULT_ORBIT_CORRECTION": _Tool()}, + diagnostics={}, + ) + + return _Accelerator( + controls={"live": live, "tango": tango}, + simulators={"design": design}, + ) + + +@pytest.fixture() +def yp(accelerator): + return YellowPages(accelerator) + + +# --------------------------- +# Discovery API +# --------------------------- + + +def test_has_and_keys_discovery(yp): + assert yp.has("BPM") + assert yp.has("HCORR") + assert yp.has("DEFAULT_ORBIT_CORRECTION") + assert yp.has("DEFAULT_BETATRON_TUNE_MONITOR") + assert not yp.has("DOES_NOT_EXIST") + + assert yp.keys(YellowPagesCategory.ARRAYS) == ["BPM", "HCORR"] + assert yp.keys(YellowPagesCategory.TOOLS) == ["DEFAULT_ORBIT_CORRECTION"] + assert yp.keys(YellowPagesCategory.DIAGNOSTICS) == ["DEFAULT_BETATRON_TUNE_MONITOR"] + + +def test_categories_returns_only_present_categories(yp): + # All three categories are present in our fixture + assert yp.categories() == ["Arrays", "Tools", "Diagnostics"] + + +def test_dir_includes_attribute_friendly_keys_only(yp): + d = dir(yp) + # Attribute-friendly discovered keys should be included + assert "BPM" in d + assert "HCORR" in d + assert "DEFAULT_ORBIT_CORRECTION" in d + + # Non-existent keys not present + assert "DOES_NOT_EXIST" not in d + + +def test_getattr_returns_multimode_resolution_dict(yp): + bpm = yp.BPM + assert isinstance(bpm, dict) + assert set(bpm.keys()) == {"live", "tango", "design"} + + +# --------------------------- +# Resolution +# --------------------------- + + +def test_get_object_without_mode_returns_only_available_modes(yp): + hcorr = yp._get_object("HCORR") + assert set(hcorr.keys()) == {"live", "design"} + + +def test_get_object_with_mode_returns_object_or_raises(yp): + bpm_live = yp._get_object("BPM", mode="live") + assert len(bpm_live) == 3 + + with pytest.raises(YellowPagesError, match=r"Unknown mode"): + yp._get_object("BPM", mode="does_not_exist") + + with pytest.raises(YellowPagesError, match=r"not available in mode 'tango'"): + yp._get_object("HCORR", mode="tango") + + +def test_availability(yp): + assert yp.availability("BPM") == {"live", "tango", "design"} + assert yp.availability("HCORR") == {"live", "design"} + + +# --------------------------- +# Query API (public get + __getitem__) +# --------------------------- + + +def test_get_and_getitem_are_equivalent_for_wildcards(yp): + assert yp.get("BPM_C01*") == yp["BPM_C01*"] + + +def test_get_and_getitem_are_equivalent_for_regex(yp): + assert yp.get("re:^BPM_C01-0[12]$") == yp["re:^BPM_C01-0[12]$"] + + +def test_unknown_key_errors(yp): + with pytest.raises(KeyError, match=r"Unknown YellowPages key"): + yp._get_object("DOES_NOT_EXIST") + + +# --------------------------- +# __repr__ formatting (controls/simulators + types + sizes + modes) +# --------------------------- + + +def test_repr_has_controls_and_simulators_headers(yp): + s = repr(yp) + + assert "Controls:" in s + assert "Simulators:" in s + + assert re.search(r"^\s+live$", s, re.M) is not None + assert re.search(r"^\s+tango$", s, re.M) is not None + assert re.search(r"^\s+design$", s, re.M) is not None + + +def test_repr_array_size_compaction_when_identical_everywhere(yp): + s = repr(yp) + bpm_line = next(line for line in s.splitlines() if line.strip().startswith("BPM")) + assert "(pyaml.arrays.test_array)" in bpm_line + assert "size=3" in bpm_line + assert "size={" not in bpm_line + + +def test_repr_array_size_dict_when_different_or_not_everywhere(yp): + s = repr(yp) + hcorr_line = next(line for line in s.splitlines() if line.strip().startswith("HCORR")) + assert "(pyaml.arrays.test_array)" in hcorr_line + assert "size={" in hcorr_line + assert "live:2" in hcorr_line + assert "design:1" in hcorr_line + assert "modes=" in hcorr_line + assert "missing=" in hcorr_line + + +def test_repr_tools_show_type_and_modes_missing(yp): + s = repr(yp) + tool_line = next(line for line in s.splitlines() if "DEFAULT_ORBIT_CORRECTION" in line) + assert "(pyaml.tuning_tools.test_tool)" in tool_line + assert "modes=" in tool_line + assert "missing=" in tool_line + + +def test_repr_diagnostics_show_type(yp): + s = repr(yp) + diag_line = next(line for line in s.splitlines() if "DEFAULT_BETATRON_TUNE_MONITOR" in line) + assert "(pyaml.diagnostics.test_diagnostic)" in diag_line + + +# --------------------------- +# Query language (wildcard + regex) +# --------------------------- + + +def test_query_wildcard(yp): + out = yp["BPM_C01*"] + assert out == ["BPM_C01-01", "BPM_C01-02"] + + +def test_query_regex(yp): + out = yp["re:^BPM_C01-0[12]$"] + assert out == ["BPM_C01-01", "BPM_C01-02"] + + +def test_query_regex_alone_filters_all_known_ids(yp): + out = yp["re:^BPM_"] + assert out == ["BPM_C01-01", "BPM_C01-02", "BPM_C02-01"] + + +def test_query_wildcard_on_hcorr(yp): + out = yp["CH_C01*"] + assert out == ["CH_C01-01", "CH_C01-02"] + + +def test_query_empty_raises(yp): + with pytest.raises(YellowPagesQueryError, match=r"Empty YellowPages query"): + _ = yp[""] + + +def test_query_invalid_regex_raise(yp): + with pytest.raises(YellowPagesQueryError, match=r"Invalid regex"): + _ = yp["re:("] + + +def test_query_with_mode_wildcard(yp): + out = yp.get("CH_C01*", mode="live") + assert out == ["CH_C01-01", "CH_C01-02"] + + +def test_query_with_mode_regex(yp): + out = yp.get("re:^BPM_C01", mode="design") + assert out == ["BPM_C01-01", "BPM_C01-02"] + + +def test_query_with_unknown_mode(yp): + with pytest.raises(YellowPagesError, match=r"Unknown mode"): + yp.get("BPM*", mode="invalid") diff --git a/tests/test_yellow_pages_basic.py b/tests/test_yellow_pages_basic.py new file mode 100644 index 00000000..17ae3879 --- /dev/null +++ b/tests/test_yellow_pages_basic.py @@ -0,0 +1,39 @@ +# ruff: noqa: E501 +from pathlib import Path + +from pyaml.accelerator import Accelerator + +ebs_orbit_str_repr = """Controls: + live + . + +Simulators: + design + . + +Arrays: + BPM (pyaml.arrays.bpm_array) size=320 + HCorr (pyaml.arrays.magnet_array) size=288 + VCorr (pyaml.arrays.magnet_array) size=288 + Skews (pyaml.arrays.magnet_array) size=288 + . + +Tools: + DEFAULT_ORBIT_CORRECTION (pyaml.tuning_tools.orbit) + DEFAULT_ORBIT_RESPONSE_MATRIX (pyaml.tuning_tools.orbit_response_matrix) + DEFAULT_DISPERSION (pyaml.tuning_tools.dispersion) + . + +Diagnostics: + BETATRON_TUNE (pyaml.diagnostics.tune_monitor) + .""" + + +def test_load_conf_with_code(): + parent_folder = Path(__file__).parent + config_path = parent_folder.joinpath("config", "EBSOrbit.yaml").resolve() + + sr: Accelerator = Accelerator.load(config_path) + str_repr = str(sr.yellow_pages) + print(str_repr) + assert ebs_orbit_str_repr == str_repr