Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 1 addition & 12 deletions pyaml/arrays/array.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
"""
Magnet array configuration
"""
from ..common.element_holder import ElementHolder

import numpy as np
from pydantic import BaseModel,ConfigDict
from ..lattice.element_holder import ElementHolder
from ..control.deviceaccesslist import DeviceAccessList

class ArrayConfigModel(BaseModel):

Expand All @@ -15,11 +13,6 @@ class ArrayConfigModel(BaseModel):
"""Family name"""
elements: list[str]
"""List of pyaml element names"""
aggregator: DeviceAccessList | None = None
"""
Aggregator object. If none specified, writings and readings are serialized.
If no device list is specified, it is dynamically constructed.
"""

class ArrayConfig(object):
"""
Expand All @@ -30,7 +23,3 @@ def __init__(self, cfg: ArrayConfigModel):

def fill_array(self,holder:ElementHolder):
raise "Array.fill_array() is not subclassed"

def init_aggregator(self,holder:ElementHolder):
raise "Array.init_aggregator() is not subclassed"

15 changes: 15 additions & 0 deletions pyaml/arrays/bpm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from .array import ArrayConfigModel,ArrayConfig
from ..common.element_holder import ElementHolder

# Define the main class name for this module
PYAMLCLASS = "BPM"

class ConfigModel(ArrayConfigModel):...

class BPM(ArrayConfig):

def __init__(self, cfg: ArrayConfigModel):
super().__init__(cfg)

def fill_array(self,holder:ElementHolder):
holder.fill_bpm_array(self._cfg.name,self._cfg.elements)
109 changes: 109 additions & 0 deletions pyaml/arrays/bpm_array.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
from ..common.abstract import ReadFloatArray
from ..bpm.bpm import BPM
from ..control.deviceaccesslist import DeviceAccessList

import numpy as np

class RWBPMPosition(ReadFloatArray):

def __init__(self, name:str, bpms:list[BPM]):
self.__bpms = bpms
self.__name = name
self.__aggregator:DeviceAccessList = None

# Gets the values
def get(self) -> np.array:
if not self.__aggregator:
return np.array([b.positions.get() for b in self.__bpms])
else:
return self.__aggregator.get().reshape(len(self.__bpms),2)

# Gets the unit of the values
def unit(self) -> list[str]:
return [b.positions.unit() for b in self.__bpms]

# Set the aggregator (Control system only)
def set_aggregator(self,agg:DeviceAccessList):
self.__aggregator = agg


class RWBPMSinglePosition(ReadFloatArray):

def __init__(self, name:str, bpms:list[BPM],idx: int):
self.__bpms = bpms
self.__name = name
self.__idx = idx
self.__aggregator:DeviceAccessList = None

# Gets the values
def get(self) -> np.array:
if not self.__aggregator:
return np.array([b.positions.get()[self.__idx] for b in self.__bpms])
else:
return self.__aggregator.get()

# Gets the unit of the values
def unit(self) -> list[str]:
return [b.positions.unit() for b in self.__bpms]

# Set the aggregator (Control system only)
def set_aggregator(self,agg:DeviceAccessList):
self.__aggregator = agg



class BPMArray(list[BPM]):
"""
Class that implements access to a BPM array
"""

def __init__(self,arrayName:str,bpms:list[BPM],holder = None):
"""
Construct a BPM array

Parameters
----------
arrayName : str
Array name
bpms: list[BPM]
BPM iterator
holder : Element holder
Holder (Simulator or Control System) that contains element of this array used for aggregator
"""
super().__init__(i for i in bpms)
self.__name = arrayName
self.__hvpos = RWBPMPosition(arrayName,bpms)
self.__hpos = RWBPMSinglePosition(arrayName,bpms,0)
self.__vpos = RWBPMSinglePosition(arrayName,bpms,1)

if holder is not None:
aggs = holder.create_bpm_aggregators(bpms)
self.__hvpos.set_aggregator(aggs[0])
self.__hpos.set_aggregator(aggs[1])
self.__vpos.set_aggregator(aggs[2])

@property
def positions(self) -> RWBPMPosition:
"""
Give access to bpm posttions of each bpm of this array
"""
return self.__hvpos

@property
def h(self) -> RWBPMSinglePosition:
"""
Give access to bpm H posttions of each bpm of this array
"""
return self.__hpos

@property
def v(self) -> RWBPMSinglePosition:
"""
Give access to bpm V posttions of each bpm of this array
"""
return self.__vpos





11 changes: 1 addition & 10 deletions pyaml/arrays/magnet.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from .array import ArrayConfigModel,ArrayConfig
from ..lattice.element_holder import ElementHolder
from ..common.element_holder import ElementHolder

# Define the main class name for this module
PYAMLCLASS = "Magnet"
Expand All @@ -13,12 +13,3 @@ def __init__(self, cfg: ArrayConfigModel):

def fill_array(self,holder:ElementHolder):
holder.fill_magnet_array(self._cfg.name,self._cfg.elements)

def init_aggregator(self,holder:ElementHolder):
if self._cfg.aggregator is not None and len(self._cfg.aggregator)==0:
# Construct dynamically aggregator for magnets
mag = holder.get_magnets(self._cfg.name)
for m in mag:
devs = m.model.get_devices()
self._cfg.aggregator.add_devices(devs)
mag.set_aggregator(self._cfg.aggregator)
110 changes: 39 additions & 71 deletions pyaml/arrays/magnet_array.py
Original file line number Diff line number Diff line change
@@ -1,53 +1,32 @@
from ..control.abstract import ReadWriteFloatArray
from ..common.abstract import ReadWriteFloatArray
from ..magnet.magnet import Magnet
from ..common.abstract_aggregator import ScalarAggregator

import numpy as np
from ..control.deviceaccesslist import DeviceAccessList

class RWMagnetStrength(ReadWriteFloatArray):

def __init__(self, name:str, magnets:list[Magnet]):
self.__magnets = magnets
self.__name = name
self.aggregator:DeviceAccessList = None
self.__magnets = magnets
self.__nb = len(self.__magnets)
self.__aggregator:ScalarAggregator = None

# Gets the values
def get(self) -> np.array:
if not self.aggregator:
if not self.__aggregator:
return np.array([m.strength.get() for m in self.__magnets])
else:
allHardwareValues = self.aggregator.get() # Read all hardware setpoints
allStrength = np.zeros(len(self.__magnets))
mIdx = 0
idx = 0
for m in self.__magnets:
nbDev = len(m.model.get_devices())
allStrength[idx] = m.model.compute_strengths(allHardwareValues[idx:idx+nbDev])[m.strength.index()]
mIdx += 1
idx += nbDev
return allStrength
return self.__aggregator.get()

# Sets the values
def set(self, value:np.array):
if not self.aggregator:
nvalue = np.ones(self.__nb) * value if isinstance(value,float) else value
if not self.__aggregator:
for idx,m in enumerate(self.__magnets):
m.strength.set(value[idx])
m.strength.set(nvalue[idx])
else:
# TODO: if the array does not contains mappings to combined function
# magnets, the algorithm below can be optimized
allHardwareValues = self.aggregator.get() # Read all hardware setpoints
newHardwareValues = np.zeros(len(self.aggregator))
mIdx = 0
idx = 0
for m in self.__magnets:
# m is a single function magnet or a mapping to a
# combined function magnet (RWMapper)
nbDev = len(m.model.get_devices())
mStrengths = m.model.compute_strengths( allHardwareValues[idx:idx+nbDev] )
mStrengths[m.strength.index()] = value[mIdx]
newHardwareValues[idx:idx+nbDev] = m.model.compute_hardware_values(mStrengths)
mIdx += 1
idx += nbDev
self.aggregator.set(newHardwareValues)
self.__aggregator.set(nvalue)

# Sets the values and waits that the read values reach their setpoint
def set_and_wait(self, value:np.array):
Expand All @@ -58,37 +37,32 @@ def unit(self) -> list[str]:
return [m.strength.unit() for m in self.__magnets]

# Set the aggregator (Control system only)
def set_aggregator(self,agg:DeviceAccessList):
self.aggregator = agg
def set_aggregator(self,agg:ScalarAggregator):
self.__aggregator = agg

class RWMagnetHardware(ReadWriteFloatArray):

def __init__(self, name:str, magnets:list[Magnet]):
self.__name = name
self.__magnets = magnets
self.aggregator:DeviceAccessList = None
self.hasHardwareMapping = True
self.__nb = len(self.__magnets)
self.__aggregator:ScalarAggregator = None

# Gets the values
def get(self) -> np.array:
if not self.aggregator:
if not self.__aggregator:
return np.array([m.hardware.get() for m in self.__magnets])
else:
if not self.hasHardwareMapping:
raise Exception(f"Array {self.__name} contains elements that that do not support hardware units")
else:
return self.aggregator.get()
return self.__aggregator.get()

# Sets the values
def set(self, value:np.array):
if not self.aggregator:
nvalue = np.ones(self.__nb) * value if isinstance(value,float) else value
if not self.__aggregator:
for idx,m in enumerate(self.__magnets):
m.hardware.set(value[idx])
else:
if not self.hasHardwareMapping:
raise Exception(f"Array {self.__name} contains elements that that do not support hardware units")
else:
self.aggregator.set(value)
self.__aggregator.set(value)

# Sets the values and waits that the read values reach their setpoint
def set_and_wait(self, value:np.array):
Expand All @@ -98,44 +72,38 @@ def set_and_wait(self, value:np.array):
def unit(self) -> list[str]:
return [m.hardware.unit() for m in self.__magnets]

# Set the aggregator (Control system only)
def set_aggregator(self,agg:DeviceAccessList):
self.aggregator = agg
for m in self.__magnets:
self.hasHardwareMapping |= m.model.has_hardware()
# Set the aggregator
def set_aggregator(self,agg:ScalarAggregator):
self.__aggregator = agg

class MagnetArray(list[Magnet]):
"""
Class that implements access to a magnet array
"""

def __init__(self,arrayName:str,iterable):
def __init__(self,arrayName:str,magnets:list[Magnet],holder = None):
"""
Construct a magnet array

Parameters
----------
iterable
arrayName : str
Array name
magnets: list[Magnet]
Magnet iterator
holder : Element holder
Holder (Simulator or Control System) that contains element of this array used for aggregator
"""
super().__init__(i for i in iterable)
super().__init__(i for i in magnets)
self.__name = arrayName
self.__rwstrengths = RWMagnetStrength(arrayName,iterable)
self.__rwhardwares = RWMagnetHardware(arrayName,iterable)

def set_aggregator(self,agg:DeviceAccessList):
"""
Set an aggregator for this array.
Aggregator allow fast control system access by parallelizing
call to underlying hardware.

Parameters
----------
agg : DeviceAccessList
List of device access
"""
self.__rwstrengths.set_aggregator(agg)
self.__rwhardwares.set_aggregator(agg)
self.__rwstrengths = RWMagnetStrength(arrayName,magnets)
self.__rwhardwares = RWMagnetHardware(arrayName,magnets)

if holder is not None:
aggs = holder.create_magnet_strength_aggregator(magnets)
aggh = holder.create_magnet_harddware_aggregator(magnets)
self.__rwstrengths.set_aggregator(aggs)
self.__rwhardwares.set_aggregator(aggh)

@property
def strengths(self) -> RWMagnetStrength:
Expand Down
2 changes: 1 addition & 1 deletion pyaml/bpm/bpm.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from pyaml.lattice.element import Element, ElementConfigModel
from pyaml.lattice.abstract_impl import RBpmArray, RWBpmOffsetArray, RWBpmTiltScalar
from ..control.deviceaccess import DeviceAccess
from ..control import abstract
from ..common import abstract
from typing import Self
from pyaml.bpm.bpm_model import BPMModel

Expand Down
Loading
Loading