Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 60 additions & 16 deletions pyaml/common/element_holder.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
Module handling element references for simulators and control system
"""

import fnmatch
import re
from typing import TYPE_CHECKING

from ..arrays.bpm_array import BPMArray
Expand Down Expand Up @@ -35,15 +37,24 @@ class ElementHolder(object):

def __init__(self):
# Device handle
self.__MAGNETS: dict = {}
self.__CFM_MAGNETS: dict = {}
self.__SERIALIZED_MAGNETS: dict = {}
self.__BPMS: dict = {}
self.__RFPLANT: dict = {}
self.__RFTRANSMITTER: dict = {}
self.__DIAG: dict = {}
self.__TUNING_TOOLS = {}
self.__ALL: dict = {}
self.__MAGNETS: dict[str, Magnet] = {}
self.__CFM_MAGNETS: dict[str, CombinedFunctionMagnet] = {}
self.__SERIALIZED_MAGNETS: dict[str, SerializedMagnets] = {}
self.__BPMS: dict[str, BPM] = {}
self.__RFPLANT: dict[str, RFPlant] = {}
self.__RFTRANSMITTER: dict[str, RFTransmitter] = {}
self.__DIAG: dict[str, Element] = {}
self.__TUNING_TOOLS: dict[str, Element] = {}
self.__ALL: dict[str, Element] = {}

self.__by_class_elements: dict[type, dict] = {
Magnet: self.__MAGNETS,
CombinedFunctionMagnet: self.__CFM_MAGNETS,
SerializedMagnets: self.__SERIALIZED_MAGNETS,
BPM: self.__BPMS,
RFPlant: self.__RFPLANT,
RFTransmitter: self.__RFTRANSMITTER,
}

# Array handle
self.__MAGNET_ARRAYS: dict = {}
Expand All @@ -62,24 +73,53 @@ def post_init(self):
def fill_device(self, elements: list[Element]):
raise PyAMLException("ElementHolder.fill_device() is not subclassed")

def find_elements(self, filter: str) -> list[str]:
if filter.startswith("re:"):
pattern = re.compile(rf"{filter[3:]}")
elements = [k for k in self.__ALL.keys() if pattern.fullmatch(k)]
elif "*" in filter or "?" in filter:
elements = [k for k in self.__ALL.keys() if fnmatch.fnmatch(k, filter)]
else:
elements = [filter]

return elements

def fill_array(
self, arrayName: str, elementNames: list[str], get_func, constructor, ARR: dict
self,
array_name: str,
element_names: list[str],
get_func,
constructor,
ARR: dict,
):
# Handle wildcard, regexp and exclusion pattern
all_names: list[str] = []
excluded_names: list[str] = []
for name in element_names:
if name.startswith("~"):
names = self.find_elements(name[1:])
excluded_names.extend(names)
else:
names = self.find_elements(name)
all_names.extend(names)

[all_names.remove(name) for name in excluded_names]

a = []
for name in elementNames:
for n in all_names:
try:
m = get_func(name)
m = get_func(n)
except Exception as err:
raise PyAMLException(
f"{constructor.__name__} {arrayName} : {err} @index {len(a)}"
f"{constructor.__name__} {array_name} : {err} @index {len(a)}"
) from None
if m in a:
raise PyAMLException(
f"{constructor.__name__} {arrayName} : "
f"{constructor.__name__} {array_name} : "
f"duplicate name {name} @index {len(a)}"
) from None
a.append(m)
ARR[arrayName] = constructor(arrayName, a)
ARR[array_name] = constructor(array_name, a)

def __add(self, array, element: Element):
if element.get_name() in self.__ALL: # Ensure name unicity
Expand Down Expand Up @@ -187,7 +227,11 @@ def get_all_serialized_magnets(self) -> list[SerializedMagnets]:

def fill_bpm_array(self, arrayName: str, elementNames: list[str]):
self.fill_array(
arrayName, elementNames, self.get_bpm, BPMArray, self.__BPM_ARRAYS
arrayName,
elementNames,
self.get_bpm,
BPMArray,
self.__BPM_ARRAYS,
)

def get_bpm(self, name: str) -> Element:
Expand Down
Loading
Loading