Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 30 additions & 9 deletions pyaml/arrays/array.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,47 @@
"""

import numpy as np
from pydantic import BaseModel
from pydantic import BaseModel,ConfigDict
from ..lattice.element_holder import ElementHolder
from ..control.deviceaccesslist import DeviceAccessList

class ArrayModel(BaseModel):
class ArrayConfigModel(BaseModel):

model_config = ConfigDict(arbitrary_types_allowed=True,extra="forbid")

name: str
"""Family name"""
elements: list[str]
"""List of pyaml element names"""

class Array(object):
aggregator: DeviceAccessList | None = None
"""
Class that implements access to arrays (families)
Aggregator object. If none specified, writings and readings are serialized.
If no device list is specified, it is dynamically constructed.
"""

def __init__(self, cfg: ArrayModel):
class ArrayConfig(object):
"""
Class that implements configuration for access to arrays (families)
"""
def __init__(self, cfg: ArrayConfigModel):
self._cfg = cfg

def fill_array(self,holder:ElementHolder):
raise "Array.fill_array() is not subclassed"




def init_aggregator(self,holder:ElementHolder):
raise "Array.init_aggregator() is not subclassed"

class MagnetArrayConfig(ArrayConfig):

def __init__(self, cfg: ArrayConfigModel):
super().__init__(cfg)

def init_aggregator(self,holder:ElementHolder):
if self._cfg.aggregator is not None and len(self._cfg.aggregator)==0:
# Construct dynamically aggregator for magnets
mag = holder.get_magnets(self._cfg.name)
for m in mag:
devs = m.model.get_devices()
self._cfg.aggregator.add_devices(devs)
mag.set_aggregator(self._cfg.aggregator)
13 changes: 4 additions & 9 deletions pyaml/arrays/hcorrector.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,13 @@
from .array import ArrayModel
from .array import Array
from .array import ArrayConfigModel
from .array import MagnetArrayConfig
from ..lattice.element_holder import ElementHolder,MagnetType

# Define the main class name for this module
PYAMLCLASS = "HCorrector"

class ConfigModel(ArrayModel):...
class ConfigModel(ArrayConfigModel):...

class HCorrector(Array):
"""
Class that implements access to arrays (families)
"""
def __init__(self, cfg: ArrayModel):
super().__init__(cfg)
class HCorrector(MagnetArrayConfig):

def fill_array(self,holder:ElementHolder):
holder.fill_magnet_array(MagnetType.HCORRECTOR,self._cfg.name,self._cfg.elements)
Expand Down
114 changes: 101 additions & 13 deletions pyaml/arrays/magnet_array.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,53 @@
from ..control.abstract import ReadWriteFloatArray
from ..magnet.magnet import Magnet
import numpy as np
from ..control.deviceaccesslist import DeviceAccessList

class RWMagnetStrength(ReadWriteFloatArray):

def __init__(self, magnets:list[Magnet]):
def __init__(self, name:str, magnets:list[Magnet]):
self.__magnets = magnets
self.__name = name
self.aggregator:DeviceAccessList = None

# Gets the values
def get(self) -> np.array:
return np.array([m.strength.get() for m in self.__magnets])
if not self.aggregator:
return np.array([m.strength.get() for m in self.__magnets])
else:
allHardwareValues = self.aggregator.get() # Read all hardware setpoints
allStrength = np.zeros(len(self.__magnets))
mIdx = 0
idx = 0
for m in self.__magnets:
nbDev = len(m.model.get_devices())
allStrength[idx] = m.model.compute_strengths(allHardwareValues[idx:idx+nbDev])[m.strength.index()]
mIdx += 1
idx += nbDev
return allStrength

# Sets the values
def set(self, value:np.array):
for idx,m in enumerate(self.__magnets):
m.strength.set(value[idx])
if not self.aggregator:
for idx,m in enumerate(self.__magnets):
m.strength.set(value[idx])
else:
# TODO: if the array does not contains mappings to combined function
# magnets, the algorithm below can be optimized
allHardwareValues = self.aggregator.get() # Read all hardware setpoints
newHardwareValues = np.zeros(len(self.aggregator))
mIdx = 0
idx = 0
for m in self.__magnets:
# m is a single function magnet or a mapping to a
# combined function magnet (RWMapper)
nbDev = len(m.model.get_devices())
mStrengths = m.model.compute_strengths( allHardwareValues[idx:idx+nbDev] )
mStrengths[m.strength.index()] = value[mIdx]
newHardwareValues[idx:idx+nbDev] = m.model.compute_hardware_values(mStrengths)
mIdx += 1
idx += nbDev
self.aggregator.set(newHardwareValues)

# Sets the values and waits that the read values reach their setpoint
def set_and_wait(self, value:np.array):
Expand All @@ -24,19 +57,38 @@ def set_and_wait(self, value:np.array):
def unit(self) -> list[str]:
return [m.strength.unit() for m in self.__magnets]

# Set the aggregator (Control system only)
def set_aggregator(self,agg:DeviceAccessList):
self.aggregator = agg

class RWMagnetHardware(ReadWriteFloatArray):

def __init__(self, magnets:list[Magnet]):
def __init__(self, name:str, magnets:list[Magnet]):
self.__name = name
self.__magnets = magnets
self.aggregator:DeviceAccessList = None
self.hasHardwareMapping = True

# Gets the values
def get(self) -> np.array:
return np.array([m.hardware.get() for m in self.__magnets])
if not self.aggregator:
return np.array([m.hardware.get() for m in self.__magnets])
else:
if not self.hasHardwareMapping:
raise Exception(f"Array {self.__name} contains elements that that do not support hardware units")
else:
return self.aggregator.get()

# Sets the values
def set(self, value:np.array):
for idx,m in enumerate(self.__magnets):
m.hardware.set(value[idx])
if not self.aggregator:
for idx,m in enumerate(self.__magnets):
m.hardware.set(value[idx])
else:
if not self.hasHardwareMapping:
raise Exception(f"Array {self.__name} contains elements that that do not support hardware units")
else:
self.aggregator.set(value)

# Sets the values and waits that the read values reach their setpoint
def set_and_wait(self, value:np.array):
Expand All @@ -46,21 +98,57 @@ def set_and_wait(self, value:np.array):
def unit(self) -> list[str]:
return [m.hardware.unit() for m in self.__magnets]

# Set the aggregator (Control system only)
def set_aggregator(self,agg:DeviceAccessList):
self.aggregator = agg
for m in self.__magnets:
self.hasHardwareMapping |= m.model.hasHardwareMapping()

class MagnetArray(list[Magnet]):
"""
Class that implements access to a magnet array
"""
def __init__(self,iterable):
super().__init__(i for i in iterable)
self.__rwstrengths = RWMagnetStrength(iterable)
self.__rwhardwares = RWMagnetHardware(iterable)

@property
def __init__(self,arrayName:str,iterable):
"""
Construct a magnet array

Parameters
----------
iterable
Magnet iterator
"""
super().__init__(i for i in iterable)
self.__name = arrayName
self.__rwstrengths = RWMagnetStrength(arrayName,iterable)
self.__rwhardwares = RWMagnetHardware(arrayName,iterable)

def set_aggregator(self,agg:DeviceAccessList):
"""
Set an aggregator for this array.
Aggregator allow fast control system access by parallelizing
call to underlying hardware.

Parameters
----------
agg : DeviceAccessList
List of device access
"""
self.__rwstrengths.set_aggregator(agg)
self.__rwhardwares.set_aggregator(agg)

@property
def strengths(self) -> RWMagnetStrength:
"""
Give access to strength of each magnet of this array
"""
return self.__rwstrengths

@property
def hardwares(self) -> RWMagnetHardware:
"""
Give access to hardware value of each magnet of this array
"""
return self.__rwhardwares


Expand Down
13 changes: 4 additions & 9 deletions pyaml/arrays/octupole.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,13 @@
from .array import ArrayModel
from .array import Array
from .array import ArrayConfigModel
from .array import MagnetArrayConfig
from ..lattice.element_holder import ElementHolder,MagnetType

# Define the main class name for this module
PYAMLCLASS = "Octupole"

class ConfigModel(ArrayModel):...
class ConfigModel(ArrayConfigModel):...

class Octupole(Array):
"""
Class that implements access to arrays (families)
"""
def __init__(self, cfg: ArrayModel):
super().__init__(cfg)
class Octupole(MagnetArrayConfig):

def fill_array(self,holder:ElementHolder):
holder.fill_magnet_array(MagnetType.OCTUPOLE,self._cfg.name,self._cfg.elements)
Expand Down
16 changes: 5 additions & 11 deletions pyaml/arrays/quadrupole.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,13 @@
from .array import ArrayModel
from .array import Array
from .array import ArrayConfigModel
from .array import MagnetArrayConfig
from ..lattice.element_holder import ElementHolder,MagnetType

# Define the main class name for this module
PYAMLCLASS = "Quadrupole"

class ConfigModel(ArrayModel):...

class Quadrupole(Array):
"""
Class that implements access to arrays (families)
"""
def __init__(self, cfg: ArrayModel):
super().__init__(cfg)
class ConfigModel(ArrayConfigModel):...

class Quadrupole(MagnetArrayConfig):

def fill_array(self,holder:ElementHolder):
holder.fill_magnet_array(MagnetType.QUADRUPOLE,self._cfg.name,self._cfg.elements)

13 changes: 4 additions & 9 deletions pyaml/arrays/sextupole.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,13 @@
from .array import ArrayModel
from .array import Array
from .array import ArrayConfigModel
from .array import MagnetArrayConfig
from ..lattice.element_holder import ElementHolder,MagnetType

# Define the main class name for this module
PYAMLCLASS = "Sextupole"

class ConfigModel(ArrayModel):...
class ConfigModel(ArrayConfigModel):...

class Sextupole(Array):
"""
Class that implements access to arrays (families)
"""
def __init__(self, cfg: ArrayModel):
super().__init__(cfg)
class Sextupole(MagnetArrayConfig):

def fill_array(self,holder:ElementHolder):
holder.fill_magnet_array(MagnetType.SEXTUPOLE,self._cfg.name,self._cfg.elements)
Expand Down
13 changes: 4 additions & 9 deletions pyaml/arrays/skewoctu.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,13 @@
from .array import ArrayModel
from .array import Array
from .array import ArrayConfigModel
from .array import MagnetArrayConfig
from ..lattice.element_holder import ElementHolder,MagnetType

# Define the main class name for this module
PYAMLCLASS = "SkewOctu"

class ConfigModel(ArrayModel):...
class ConfigModel(ArrayConfigModel):...

class SkewOctu(Array):
"""
Class that implements access to arrays (families)
"""
def __init__(self, cfg: ArrayModel):
super().__init__(cfg)
class SkewOctu(MagnetArrayConfig):

def fill_array(self,holder:ElementHolder):
holder.fill_magnet_array(MagnetType.SKEWOCTU,self._cfg.name,self._cfg.elements)
Expand Down
13 changes: 4 additions & 9 deletions pyaml/arrays/skewquad.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,13 @@
from .array import ArrayModel
from .array import Array
from .array import ArrayConfigModel
from .array import MagnetArrayConfig
from ..lattice.element_holder import ElementHolder,MagnetType

# Define the main class name for this module
PYAMLCLASS = "SkewQuad"

class ConfigModel(ArrayModel):...
class ConfigModel(ArrayConfigModel):...

class SkewQuad(Array):
"""
Class that implements access to arrays (families)
"""
def __init__(self, cfg: ArrayModel):
super().__init__(cfg)
class SkewQuad(MagnetArrayConfig):

def fill_array(self,holder:ElementHolder):
holder.fill_magnet_array(MagnetType.SKEWQUAD,self._cfg.name,self._cfg.elements)
Expand Down
13 changes: 4 additions & 9 deletions pyaml/arrays/skewsext.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,13 @@
from .array import ArrayModel
from .array import Array
from .array import ArrayConfigModel
from .array import MagnetArrayConfig
from ..lattice.element_holder import ElementHolder,MagnetType

# Define the main class name for this module
PYAMLCLASS = "SkewSext"

class ConfigModel(ArrayModel):...
class ConfigModel(ArrayConfigModel):...

class SkewSext(Array):
"""
Class that implements access to arrays (families)
"""
def __init__(self, cfg: ArrayModel):
super().__init__(cfg)
class SkewSext(MagnetArrayConfig):

def fill_array(self,holder:ElementHolder):
holder.fill_magnet_array(MagnetType.SKEWSEXT,self._cfg.name,self._cfg.elements)
Expand Down
Loading
Loading