diff --git a/src/cditools/eiger_async.py b/src/cditools/eiger_async.py index f8b38b0..e1989cd 100644 --- a/src/cditools/eiger_async.py +++ b/src/cditools/eiger_async.py @@ -3,103 +3,54 @@ """ from __future__ import annotations - -# import asyncio -# from collections.abc import AsyncGenerator, AsyncIterator, Iterator, Sequence +import asyncio +import functools +import os +from collections.abc import AsyncGenerator, AsyncIterator, Sequence +from urllib.parse import urlunparse +from pathlib import Path from logging import getLogger -# from pathlib import Path from typing import Annotated as A -# from typing import Any, cast -# from urllib.parse import urlunparse - -import numpy as np # type: ignore[import-not-found] -# from bluesky.protocols import StreamAsset -# from event_model import ( # type: ignore[import-untyped] -# ComposeStreamResource, -# ComposeStreamResourceBundle, -# DataKey, # type: ignore[import-untyped] -# StreamDatum, -# StreamRange, -# StreamResource, -# ) +import numpy as np + +from ophyd_async.epics.adcore import ( + ADBaseIO, + NDFileIO, + ADImageMode, + AreaDetector, + NDPluginBaseIO, + trigger_info_from_num_images, +) +from ophyd_async.epics.core import PvSuffix, stop_busy_record from ophyd_async.core import ( - # DetectorTrigger, - # PathInfo, - # PathProvider, - # SignalDatatypeT, SignalR, SignalRW, StrictEnum, SubsetEnum, - # TriggerInfo, - # observe_value, + AsyncStatus, + DetectorArmLogic, + DetectorDataLogic, + DetectorTriggerLogic, + PathInfo, + PathProvider, + SignalDatatypeT, + StreamResourceDataProvider, + StreamResourceInfo, + TriggerInfo, + observe_value, + set_and_wait_for_other_value, ) -from ophyd_async.epics.adcore import ( - #ADBaseController, - #ADBaseDatasetDescriber, - ADBaseIO, - #ADImageMode, - #ADWriter, - # AreaDetector, - NDFileIO, - # NDPluginBaseIO, +from ophyd_async.core._data_providers import StreamableDataProvider +from ophyd_async.core._status import WatchableAsyncStatus +from ophyd_async.core._utils import ( + DEFAULT_TIMEOUT, + WatcherUpdate, + error_if_none, ) -from ophyd_async.epics.signal import PvSuffix logger = getLogger(__name__) -# class EigerDocumentComposer: -# def __init__( -# self, -# full_file_name: Path, -# datasets: list[Any], -# last_emitted_index: int = 0, -# hostname: str = "localhost", -# ) -> None: -# self._last_emitted = last_emitted_index -# self._hostname = hostname -# uri = urlunparse( -# ( -# "file", -# self._hostname, -# str(full_file_name.absolute()), -# "", -# "", -# None, -# ) -# ) -# bundler_composer = ComposeStreamResource() -# self._bundles: list[ComposeStreamResourceBundle] = [ -# bundler_composer( -# mimetype="application/x-hdf5", -# uri=uri, -# data_key=ds.data_key, -# parameters={ -# "dataset": ds.dataset, -# "chunk_shape": ds.chunk_shape, -# }, -# uid=None, -# validate=True, -# ) -# for ds in datasets -# ] - -# def stream_resources(self) -> Iterator[StreamResource]: -# for bundle in self._bundles: -# yield bundle.stream_resource_doc - -# def stream_data(self, indices_written: int) -> Iterator[StreamDatum]: -# if indices_written > self._last_emitted: -# indices: StreamRange = { -# "start": self._last_emitted, -# "stop": indices_written, -# } -# self._last_emitted = indices_written -# for bundle in self._bundles: -# yield bundle.compose_stream_datum(indices) - - # TODO - add extra options in eiger2 and revert to StrictEnum class EigerTriggerMode(SubsetEnum): """Trigger modes for the Eiger detector. @@ -172,6 +123,8 @@ class EigerStreamVersion(StrictEnum): See https://areadetector.github.io/areaDetector/ADEiger/eiger.html#stream-interface """ + # TODO - Stream or Stream1? + # STREAM1 = "Stream" STREAM1 = "Stream1" STREAM2 = "Stream2" @@ -309,8 +262,12 @@ class Eiger2DriverIO(EigerDriverIO): hv_state: A[SignalR[str], PvSuffix("HVState_RBV")] # Acquisition Setup + # TODO - ThresholdEnergy or Threshold? + # threshold: A[SignalRW[float], PvSuffix.rbv("ThresholdEnergy")] threshold: A[SignalRW[float], PvSuffix.rbv("Threshold")] threshold1_enable: A[SignalRW[bool], PvSuffix.rbv("Threshold1Enable")] + # TODO - Threshold2Energy or Threshold? + # threshold2: A[SignalRW[float], PvSuffix.rbv("Threshold2Energy")] threshold2: A[SignalRW[float], PvSuffix.rbv("Threshold2")] threshold2_enable: A[SignalRW[bool], PvSuffix.rbv("Threshold2Enable")] threshold_diff_enable: A[SignalRW[bool], PvSuffix.rbv("ThresholdDiffEnable")] @@ -325,344 +282,356 @@ class Eiger2DriverIO(EigerDriverIO): # Stream Interface stream_version: A[SignalRW[EigerStreamVersion], PvSuffix.rbv("StreamVersion")] + # TODO - which one? + # stream_hdr_appendix: None stream_hdr_appendix: A[SignalRW[str], PvSuffix.rbv("StreamHdrAppendix")] + # stream_img_appendix: None stream_img_appendix: A[SignalRW[str], PvSuffix.rbv("StreamImgAppendix")] # FileWriter Interface fw_hdf5_format: A[SignalRW[EigerHDF5Format], PvSuffix.rbv("FWHDF5Format")] -# class EigerWriter(ADWriter[EigerDriverIO]): # type: ignore[reportInvalidTypeArguments] -# """Eiger-specific file writer using the built-in FileWriter interface.""" - -# default_suffix: str = "cam1:" -# # Forced minimum number of images per file to force a single HDF5 file -# _min_num_images_per_file: int = 1_000_000_000 - -# def __init__( -# self, -# fileio: EigerDriverIO, -# path_provider: PathProvider, -# dataset_describer: ADBaseDatasetDescriber, -# plugins: dict[str, NDPluginBaseIO] | None = None, -# ): -# super().__init__( -# fileio, -# path_provider, -# dataset_describer, -# file_extension=".h5", -# mimetype="application/x-hdf5", -# plugins=plugins, -# ) - -# self._file_info: PathInfo | None = None -# self._datasets: list[Any] = [] -# self._master_file_path_cache: list[Path] = [] - -# async def open(self, name: str, exposures_per_event: int = 1) -> dict[str, DataKey]: -# """Setup file writing for acquisition.""" -# # Get file path info from path provider -# self._file_info = self._path_provider() -# self._master_file_path_cache.clear() - -# # Cache for use later -# self._exposures_per_event = exposures_per_event - -# # Set the name pattern with $id replacement similar to original -# name_pattern = f"{self._file_info.filename}_$id" - -# # Configure the Eiger FileWriter -# await asyncio.gather( -# self.fileio.file_path.set(self._file_info.directory_path.as_posix()), -# self.fileio.create_directory.set(self._file_info.create_dir_depth), -# self.fileio.fw_name_pattern.set(name_pattern), -# self.fileio.fw_enable.set(True), -# self.fileio.save_files.set(True), -# self.fileio.data_source.set(EigerDataSource.FILE_WRITER), -# self.fileio.num_capture.set(0), -# # Use array_counter to track the total number of images written -# self.fileio.array_counter.set(0), -# ) - -# if not await self.fileio.file_path_exists.get_value(): -# msg = f"File path {self._file_info.directory_path} does not exist" -# raise FileNotFoundError(msg) - -# if isinstance(self.fileio, Eiger2DriverIO): -# await self.fileio.fw_hdf5_format.set(EigerHDF5Format.LEGACY) - -# # Force the number of images per file to a large number to simplify the logic -# num_images_per_file = await self.fileio.fw_nimgs_per_file.get_value() -# if num_images_per_file < self._min_num_images_per_file: -# await self.fileio.fw_nimgs_per_file.set(self._min_num_images_per_file) -# logger.warning( -# "Setting fw_nimgs_per_file to %d to force writing to a single HDF5 file", -# self._min_num_images_per_file, -# ) - -# detector_shape = await self._dataset_describer.shape() - -# # TODO: Add these when empty shape datasets are supported by tiled -# # Add the master file datasets -# master_datasets = [] -# # master_datasets = [ -# # HDFDatasetDescription2( -# # data_key=f"{name}_y_pixel_size", -# # dataset="entry/instrument/detector/y_pixel_size", -# # shape=(), -# # dtype_numpy=np.dtype(np.float32).str, -# # chunk_shape=(), -# # join_method="stack", -# # ), -# # HDFDatasetDescription2( -# # data_key=f"{name}_x_pixel_size", -# # dataset="entry/instrument/detector/x_pixel_size", -# # shape=(), -# # dtype_numpy=np.dtype(np.float32).str, -# # chunk_shape=(), -# # join_method="stack", -# # ), -# # HDFDatasetDescription2( -# # data_key=f"{name}_detector_distance", -# # dataset="entry/instrument/detector/detector_distance", -# # shape=(), -# # dtype_numpy=np.dtype(np.float32).str, -# # chunk_shape=(), -# # join_method="stack", -# # ), -# # HDFDatasetDescription2( -# # data_key=f"{name}_incident_wavelength", -# # dataset="entry/instrument/detector/incident_wavelength", -# # shape=(), -# # dtype_numpy=np.dtype(np.float32).str, -# # chunk_shape=(), -# # join_method="stack", -# # ), -# # HDFDatasetDescription2( -# # data_key=f"{name}_frame_time", -# # dataset="entry/instrument/detector/frame_time", -# # shape=(), -# # dtype_numpy=np.dtype(np.float32).str, -# # chunk_shape=(), -# # join_method="stack", -# # ), -# # HDFDatasetDescription2( -# # data_key=f"{name}_beam_center_x", -# # dataset="entry/instrument/detector/beam_center_x", -# # shape=(), -# # dtype_numpy=np.dtype(np.float32).str, -# # chunk_shape=(), -# # join_method="stack", -# # ), -# # HDFDatasetDescription2( -# # data_key=f"{name}_beam_center_y", -# # dataset="entry/instrument/detector/beam_center_y", -# # shape=(), -# # dtype_numpy=np.dtype(np.float32).str, -# # chunk_shape=(), -# # join_method="stack", -# # ), -# # HDFDatasetDescription2( -# # data_key=f"{name}_count_time", -# # dataset="entry/instrument/detector/count_time", -# # shape=(), -# # dtype_numpy=np.dtype(np.float32).str, -# # chunk_shape=(), -# # join_method="stack", -# # ), -# # HDFDatasetDescription2( -# # data_key=f"{name}_pixel_mask", -# # dataset="entry/instrument/detector/detectorSpecific/pixel_mask", -# # shape=detector_shape, -# # dtype_numpy=np.dtype(np.uint32).str, -# # chunk_shape=detector_shape, -# # join_method="stack", -# # ), -# # ] - -# if any(s is None for s in detector_shape): -# chunk_shape = (1,) -# else: -# chunk_shape = cast(tuple[int, ...], (1, *detector_shape)) -# # frame_datasets = [ -# # HDFDatasetDescription( -# # data_key=f"{name}_image", -# # dataset=f"entry/data/data_{1:06d}", -# # shape=(exposures_per_event, *detector_shape), -# # # Always write as uint32 -# # dtype_numpy=np.dtype(np.uint32).str, -# # chunk_shape=chunk_shape, -# # ) -# # ] - -# # Cache descriptions for later use -# self._datasets = master_datasets + frame_datasets - -# return { -# ds.data_key: DataKey( -# source="ADEiger FileWriter", -# shape=list(ds.shape), -# dtype="array" -# if exposures_per_event > 1 or len(ds.shape) > 1 -# else "number", -# dtype_numpy=ds.dtype_numpy, -# external="STREAM:", -# ) -# for ds in self._datasets -# } - -# @property -# async def _master_file_path(self) -> Path | None: -# if self._file_info is None: -# logger.warning( -# "No master file path found for file info %s", -# self._file_info, -# ) -# return None -# sequence_id = await self.fileio.sequence_id.get_value() -# return Path( -# self._file_info.directory_path -# / f"{self._file_info.filename}_{sequence_id}_master.h5" -# ) - -# async def collect_stream_docs( -# self, name: str, indices_written: int -# ) -> AsyncIterator[StreamAsset]: -# """Generate stream documents for the written HDF5 files.""" -# if indices_written: -# master_file_path = await self._master_file_path -# if master_file_path is None: -# msg = f"Master file path is not set for {name}: {self._file_info}" -# raise ValueError(msg) - -# # Eiger generates a new master file for each trigger -# # so we need to create a new composer with a new -# # master file path -# composer = EigerDocumentComposer( -# master_file_path, -# self._datasets, -# last_emitted_index=indices_written - 1, -# ) - -# # For later validation -# self._master_file_path_cache.append(master_file_path) - -# for doc in composer.stream_resources(): -# yield "stream_resource", doc - -# for doc in composer.stream_data(indices_written): -# yield "stream_datum", doc - -# async def observe_indices_written( -# self, timeout: float -# ) -> AsyncGenerator[int, None]: -# async for num_captured in observe_value(self.fileio.array_counter, timeout): -# yield num_captured // self._exposures_per_event - -# async def get_indices_written(self) -> int: -# return await self.fileio.array_counter.get_value() // self._exposures_per_event - -# async def close(self) -> None: -# """Clean up file writing after acquisition and validate files exist.""" - -# # Check that the master files were written -# for master_file_path in self._master_file_path_cache: -# if not master_file_path.exists(): -# logger.warning("Master file was not written: %s", master_file_path) - -# self._file_info = None - - -# class EigerController(ADBaseController[EigerDriverIO]): -# """Controller for Eiger detector, handling trigger modes and acquisition setup.""" - -# def __init__( -# self, driver: EigerDriverIO, *args: Any, **kwargs: dict[str, Any] -# ) -> None: -# super().__init__(driver, *args, **kwargs) - -# def get_deadtime(self, exposure: float | None) -> float: -# """Get detector deadtime for the given exposure.""" -# default_deadtime = 0.000001 -# if exposure is not None: -# logger.warning( -# "Ignoring exposure to calculate deadtime: %s, defaulting to %s", -# exposure, -# default_deadtime, -# ) -# return default_deadtime - -# async def prepare(self, trigger_info: TriggerInfo) -> None: -# """Prepare the detector for acquisition.""" -# if (exposure := trigger_info.livetime) is not None: -# await self.driver.acquire_time.set(exposure) - -# # Configure trigger mode based on TriggerInfo -# if trigger_info.trigger == DetectorTrigger.INTERNAL: -# await self.driver.trigger_mode.set(EigerTriggerMode.INTERNAL_SERIES) -# elif trigger_info.trigger == DetectorTrigger.EDGE_TRIGGER: -# await self.driver.trigger_mode.set(EigerTriggerMode.EXTERNAL_SERIES) -# else: -# msg = f"Trigger mode {trigger_info.trigger} not supported" -# raise NotImplementedError(msg) - -# if trigger_info.total_number_of_exposures == 0: -# image_mode = ADImageMode.CONTINUOUS -# else: -# image_mode = ADImageMode.MULTIPLE - -# if isinstance(trigger_info.number_of_events, list): -# logger.warning( -# "Got a list for number of events, expected to be set up externally: %s", -# trigger_info.number_of_events, -# ) -# else: -# await self.driver.num_triggers.set(trigger_info.number_of_events) - -# await asyncio.gather( -# self.driver.num_images.set(trigger_info.exposures_per_event), -# self.driver.image_mode.set(image_mode), -# ) - - -# class EigerDetector(AreaDetector[EigerController]): -# """Eiger detector implementation using the AreaDetector pattern.""" - -# def __init__( -# self, -# prefix: str, -# path_provider: PathProvider, -# driver_suffix: str = "cam1:", -# writer_cls: type[ADWriter] = EigerWriter, # type: ignore[reportUnknownParameterType] -# fileio_suffix: str | None = None, -# name: str = "", -# config_sigs: Sequence[SignalR[SignalDatatypeT]] = (), -# plugins: dict[str, NDPluginBaseIO] | None = None, -# ): -# driver = EigerDriverIO(prefix + driver_suffix) -# controller = EigerController(driver) -# if issubclass(writer_cls, EigerWriter): -# dataset_describer = ADBaseDatasetDescriber(driver) -# # EigerWriter takes the driver as the fileio, since it relies on driver PVs -# writer = writer_cls( -# driver, -# path_provider, -# dataset_describer=dataset_describer, -# plugins=plugins, -# ) -# else: -# writer = writer_cls.with_io( -# prefix, -# path_provider, -# dataset_source=driver, -# fileio_suffix=fileio_suffix, -# plugins=plugins, -# ) - -# super().__init__( -# controller=controller, -# writer=writer, -# plugins=plugins, -# name=name, -# config_sigs=config_sigs, -# ) +class EigerController(DetectorTriggerLogic): + """Controller for Eiger detector, handling trigger modes and acquisition setup.""" + + def __init__(self, driver: EigerDriverIO) -> None: + self.driver = driver + + def get_deadtime(self, exposure: float | None) -> float: + """Get detector deadtime for the given exposure.""" + default_deadtime = 0.000001 + if exposure is not None: + logger.warning( + "Ignoring exposure to calculate deadtime: %s, defaulting to %s", + exposure, + default_deadtime, + ) + return default_deadtime + + async def prepare_internal(self, num: int, livetime: float, deadtime: float): + """Prepare the detector for acquisition. + + On Internal Series, num sets the number of images to take per trigger: + https://areadetector.github.io/areaDetector/ADEiger/eiger.html#implementation-of-standard-driver-parameters + """ + # TODO - should we do something with deadtime? + # TODO - put other awaits into the gather + + if livetime > 0: + await self.driver.acquire_time.set(livetime) + + await self.driver.trigger_mode.set(EigerTriggerMode.INTERNAL_SERIES) + + if num == 0: + image_mode = ADImageMode.CONTINUOUS + else: + image_mode = ADImageMode.MULTIPLE + + await asyncio.gather( + self.driver.num_triggers.set(num), + self.driver.image_mode.set(image_mode), + ) + + # TODO should num_triggers or num_images be set? + # TODO - put other awaits into the gather + async def prepare_edge(self, num: int, livetime: float): + """Prepare the detector to take external edge triggered exposures. + + :param num: the number of exposures to take + :param livetime: how long the exposure should be, 0 means what is currently set + """ + + await self.driver.acquire_time.set(livetime) + await self.driver.num_triggers.set(num) + if num == 0: + image_mode = ADImageMode.CONTINUOUS + else: + image_mode = ADImageMode.MULTIPLE + + await self.driver.trigger_mode.set(EigerTriggerMode.EXTERNAL_SERIES) + await asyncio.gather( + self.driver.image_mode.set(image_mode), + ) + + async def default_trigger_info(self): + return await trigger_info_from_num_images(self.driver) + + +class EigerDataLogic(DetectorDataLogic): + """Eiger-specific file writer using the built-in FileWriter interface.""" + + default_suffix: str = "cam1:" + # Forced minimum number of images per file to force a single HDF5 file + _min_num_images_per_file: int = 1_000_000_000 + + def __init__( + self, + fileio: EigerDriverIO, + path_provider: PathProvider, + ): + self.fileio = fileio + self._path_provider = path_provider + + self._file_info: PathInfo | None = None + self._datasets: list[StreamResourceDataProvider] = [] + self._master_file_path_cache: list[Path] = [] + + async def prepare_unbounded(self, datakey_name: str) -> StreamableDataProvider: + """Provider can work for an unbounded number of collections.""" + # Get file path info from path provider + # TODO: should probably just pass datakey_name + self._file_info = self._path_provider("eiger2-1") + self._master_file_path_cache.clear() + + # Set the name pattern with $id replacement similar to original + name_pattern = f"{self._file_info.filename}_$id" + + # Configure the Eiger FileWriter + await asyncio.gather( + self.fileio.file_path.set(self._file_info.directory_path.as_posix()), + self.fileio.create_directory.set(self._file_info.create_dir_depth), + self.fileio.fw_name_pattern.set(name_pattern), + self.fileio.fw_enable.set(True), + self.fileio.save_files.set(True), + self.fileio.num_capture.set(0), + # Use array_counter to track the total number of images written + self.fileio.array_counter.set(0), + self.fileio.manual_trigger.set(True), + # TODO sort out how to get this from the plan + self.fileio.num_triggers.set(5000), + ) + + await set_and_wait_for_other_value( + set_signal=self.fileio.acquire, + set_value=True, + match_signal=self.fileio.armed, + match_value=True, + wait_for_set_completion=False, + timeout=DEFAULT_TIMEOUT, + ) + + if not await self.fileio.file_path_exists.get_value(): + msg = f"File path {self._file_info.directory_path} does not exist" + raise FileNotFoundError(msg) + + if isinstance(self.fileio, Eiger2DriverIO): + await self.fileio.fw_hdf5_format.set(EigerHDF5Format.LEGACY) + + # Force the number of images per file to a large number to simplify the logic + # TODO: allow multiple files + num_images_per_file = await self.fileio.fw_nimgs_per_file.get_value() + if num_images_per_file < self._min_num_images_per_file: + await self.fileio.fw_nimgs_per_file.set(self._min_num_images_per_file) + logger.warning( + "Setting fw_nimgs_per_file to %d to force writing to a single HDF5 file", + self._min_num_images_per_file, + ) + driver = self.fileio + + shape = await asyncio.gather( + *[sig.get_value() for sig in [driver.array_size_y, driver.array_size_x]] + ) + datatype = "uint32" + # Remove entries in shape that are zero + shape = [x for x in shape if x > 0] + + mfp = await self._master_file_path + # TODO sort out how to get from parent + name = "eiger" + exposures_per_event = await self.fileio.num_images.get_value() + + # TODO sort out how to tell tiled about the additional data files. + return StreamResourceDataProvider( + uri=urlunparse(("file", "localhost", str(mfp), "", "", None)), + resources=[ + StreamResourceInfo( + data_key=f"{name}_image", + shape=(exposures_per_event, *shape), + # TODO sort out how to set this and mirror here + chunk_shape=(1, *shape), + dtype_numpy=np.dtype(datatype.lower()).str, + parameters={ + "dataset": f"entry/data/data_{1:06d}", + }, + # TODO put in better value + source="EIGER2_FILE_WRITER", + ) + ], + mimetype="application/x-hdf5", + collections_written_signal=self.fileio.array_counter, + ) + + @property + async def _master_file_path(self) -> Path | None: + if self._file_info is None: + logger.warning( + "No master file path found for file info %s", + self._file_info, + ) + return None + sequence_id = await self.fileio.sequence_id.get_value() + return Path( + self._file_info.directory_path + / f"{self._file_info.filename}_{sequence_id}_master.h5" + ) + + async def observe_indices_written( + self, timeout: float + ) -> AsyncGenerator[int, None]: + async for num_captured in observe_value(self.fileio.array_counter, timeout): + yield num_captured + + async def get_indices_written(self) -> int: + return await self.fileio.array_counter.get_value() + + async def stop(self) -> None: + """Clean up file writing after acquisition and validate files exist.""" + + # Check that the master files were written + # for master_file_path in self._master_file_path_cache: + # if not master_file_path.exists(): + # ... + + self._file_info = None + await self.fileio.fw_enable.set(False) + + +# TODO sort out if ths is the right name of things +class EigerArmLogic(DetectorArmLogic): + def __init__( + self, driver: Eiger2DriverIO, driver_armed_signal: SignalR[bool] | None = None + ): + self.driver = driver + # TODO - remove? driver_armed_signal doesn't seem to be a thing anywhere else + if driver_armed_signal is not None: + self.driver_armed_signal = driver_armed_signal + else: + self.driver_armed_signal = driver.acquire + self.acquire_status: AsyncStatus | None = None + self._rolling_image_counter = 0 + + async def arm(self): + self._rolling_image_counter = await self.driver.num_images_counter.get_value() + ret = await self.driver.trigger.set(1) + return ret + + async def wait_for_idle(self): + target_num_images, frame_acquire_period = await asyncio.gather(self.driver.num_images.get_value(), + self.driver.acquire_period.get_value()) + frame_timeout = frame_acquire_period + DEFAULT_TIMEOUT + done_timeout = frame_timeout * target_num_images + target_num_images += self._rolling_image_counter + async for images_complete in observe_value(self.driver.num_images_counter, timeout=frame_timeout, done_timeout=done_timeout): + if images_complete == target_num_images: + break + + async def disarm(self): + self._rolling_image_counter = 0 + await stop_busy_record(self.driver.acquire) + + await asyncio.gather( + self.driver.manual_trigger.set(False), + self.driver.num_triggers.set(1), + ) + + +class EigerDetector(AreaDetector[Eiger2DriverIO]): + """Eiger detector implementation using the AreaDetector pattern.""" + + def __init__( + self, + prefix: str, + path_provider: PathProvider, + driver_suffix: str = "cam1:", + name: str = "", + config_sigs: Sequence[SignalR[SignalDatatypeT]] = (), + plugins: dict[str, NDPluginBaseIO] | None = None, + ): + driver = Eiger2DriverIO(prefix + driver_suffix) + controller = EigerController(driver) + arm_logic = EigerArmLogic(driver) + super().__init__( + prefix=prefix, + driver=driver, + trigger_logic=controller, + writer_type=None, + name=name, + config_sigs=config_sigs, + plugins=plugins, + arm_logic=arm_logic, + ) + self.data_logic = EigerDataLogic(fileio=driver, path_provider=path_provider) + self.add_detector_logics(self.data_logic) + + # TODO remove this as it should be identical to upstream. + @WatchableAsyncStatus.wrap + async def trigger(self) -> AsyncIterator[WatcherUpdate[int]]: + """Trigger a single exposure. + + If [`prepare()`](#StandardDetector.prepare) has not been called since + the last [`stage()`](#StandardDetector.stage), an implicit prepare is + performed. When [](#OPHYD_ASYNC_PRESERVE_DETECTOR_STATE) is `YES` + [](#DetectorTriggerLogic.default_trigger_info) is called to read current + hardware state; otherwise a bare [`TriggerInfo()`](#TriggerInfo) is + used. + """ + if self._prepare_ctx is None: + # Opt-in: set OPHYD_ASYNC_PRESERVE_DETECTOR_STATE=YES to have + # trigger() read back current hardware state (e.g. num_images) via + # default_trigger_info() instead of always falling back to TriggerInfo(). + # See ADR 0013 for rationale. + # TODO: flip default to YES and remove this guard in a future PR once + # downstream code has had time to implement default_trigger_info(). + preserve_state = ( + os.environ.get("OPHYD_ASYNC_PRESERVE_DETECTOR_STATE", "NO").upper() + == "YES" + ) + if preserve_state and self._trigger_logic is not None: + + def _logic_supported(base_class, method) -> bool: + # If the function that is bound in a subclass is the same as the function + # attached to the superclass, then the subclass has not overridden it, so + # this method is not supported by the subclass. + return method.__func__ is not getattr(base_class, method.__name__) + + _trigger_logic_supported = functools.partial( + _logic_supported, DetectorTriggerLogic + ) + if not _trigger_logic_supported( + self._trigger_logic.default_trigger_info + ): + raise RuntimeError( + f"OPHYD_ASYNC_PRESERVE_DETECTOR_STATE=YES is set but " + f"'{self.name}' has no default_trigger_info() - implement " + "default_trigger_info() on your DetectorTriggerLogic subclass " + "or unset the environment variable." + ) + trigger_info = await self._trigger_logic.default_trigger_info() + else: + trigger_info = TriggerInfo() + await self.prepare(trigger_info) + else: + # Check the one that was provided is suitable for triggering + trigger_info = self._prepare_ctx.trigger_info + if trigger_info.number_of_events != 1: + msg = ( + "trigger() is not supported for multiple events, the detector was " + f"prepared with number_of_events={trigger_info.number_of_events}." + ) + raise ValueError(msg) + # Ensure the data provider is still usable + await self._update_prepare_context(trigger_info) + ctx = error_if_none(self._prepare_ctx, "Prepare should have been run") + # Arm the detector and wait for it to finish. + if self._arm_logic: + await self._arm_logic.arm() + + async for update in self._wait_for_index( + data_providers=ctx.streamable_data_providers, + trigger_info=ctx.trigger_info, + initial_collections_written=ctx.collections_written, + collections_requested=1, + wait_for_idle=True, + ): + yield update diff --git a/tests/test_eiger_async.py b/tests/test_eiger_async.py index da60bd8..a65cbe6 100644 --- a/tests/test_eiger_async.py +++ b/tests/test_eiger_async.py @@ -6,7 +6,7 @@ import asyncio import shutil -from collections.abc import Generator +from collections.abc import AsyncGenerator, Generator from pathlib import Path import bluesky.plans as bp @@ -24,7 +24,7 @@ TriggerInfo, init_devices, ) -from ophyd_async.epics.adcore import ADBaseDatasetDescriber, ADBaseDataType, ADImageMode +from ophyd_async.epics.adcore import ADBaseDataType, ADImageMode from ophyd_async.testing import ( callback_on_mock_put, set_mock_value, @@ -33,11 +33,11 @@ from cditools.eiger_async import ( EigerController, + EigerDataLogic, EigerDataSource, EigerDetector, EigerDriverIO, EigerTriggerMode, - EigerWriter, ) EIGER_DATA_PATH = Path("/tmp/pytest/eiger_data/") @@ -101,10 +101,19 @@ def mock_eiger_detector(RE: RunEngine) -> Generator[EigerDetector, None, None]: ) with init_devices(mock=True): detector = EigerDetector("MOCK:EIGER:", path_provider, name="test_eiger") - set_mock_value(detector.fileio.file_path_exists, True) + + set_mock_value(detector.driver.file_path_exists, True) set_mock_value(detector.driver.array_size_x, 2048) set_mock_value(detector.driver.array_size_y, 2048) set_mock_value(detector.driver.data_type, "UInt16") + set_mock_value(detector.driver.acquire, False) + set_mock_value(detector.data_logic.fileio.armed, False) + + # Sync acquire with armed when acquire is set + async def sync_fileio_armed(value: bool): + set_mock_value(detector.data_logic.fileio.armed, value) + + callback_on_mock_put(detector.driver.acquire, sync_fileio_armed) yield detector @@ -136,16 +145,24 @@ def mock_path_provider() -> PathProvider: @pytest.fixture -def eiger_writer( +async def eiger_writer( mock_eiger_driver: EigerDriverIO, mock_path_provider: PathProvider, -) -> Generator[EigerWriter, None, None]: +) -> AsyncGenerator[EigerDataLogic, None]: """Create an EigerWriter instance for testing.""" if not EIGER_DATA_PATH.exists(): EIGER_DATA_PATH.mkdir(parents=True) assert EIGER_DATA_PATH.exists() - dataset_describer = ADBaseDatasetDescriber(mock_eiger_driver) - yield EigerWriter(mock_eiger_driver, mock_path_provider, dataset_describer) + + with init_devices(mock=True): + datalogic = EigerDataLogic(mock_eiger_driver, mock_path_provider) + + async def sync_fileio_armed(value: bool): + set_mock_value(datalogic.fileio.armed, value) + + callback_on_mock_put(datalogic.fileio.acquire, sync_fileio_armed) + # yield EigerDataLogic(mock_eiger_driver, mock_path_provider) + yield datalogic if EIGER_DATA_PATH.exists(): shutil.rmtree(EIGER_DATA_PATH) @@ -157,11 +174,11 @@ def eiger_controller(mock_eiger_driver: EigerDriverIO) -> EigerController: @pytest.mark.asyncio async def test_eiger_writer_initialization( - eiger_writer: EigerWriter, + eiger_writer: EigerDataLogic, mock_eiger_driver: EigerDriverIO, mock_path_provider: PathProvider, ): - """Test that EigerWriter initializes correctly.""" + """Test that EigerDataLogic initializes correctly.""" assert eiger_writer.fileio is mock_eiger_driver assert eiger_writer._path_provider is mock_path_provider # type: ignore[reportPrivateUsage] assert eiger_writer._dataset_describer is not None # type: ignore[reportPrivateUsage] @@ -170,7 +187,7 @@ async def test_eiger_writer_initialization( @pytest.mark.asyncio async def test_eiger_writer_open( - eiger_writer: EigerWriter, + eiger_writer: EigerDataLogic, mock_eiger_driver: EigerDriverIO, ) -> None: """Test the open method configures the detector correctly.""" @@ -234,7 +251,7 @@ async def test_eiger_writer_open( @pytest.mark.asyncio async def test_eiger_writer_get_indices_written( - eiger_writer: EigerWriter, + eiger_writer: EigerDataLogic, mock_eiger_driver: EigerDriverIO, ): """Test getting the number of indices written.""" @@ -286,7 +303,7 @@ async def test_eiger_writer_get_indices_written( @pytest.mark.asyncio async def test_eiger_writer_observe_indices_written( - eiger_writer: EigerWriter, + eiger_writer: EigerDataLogic, mock_eiger_driver: EigerDriverIO, ) -> None: """Test observing indices as they are written.""" @@ -360,7 +377,7 @@ async def _complete(): @pytest.mark.asyncio async def test_eiger_writer_collect_stream_docs( - eiger_writer: EigerWriter, + eiger_writer: EigerDataLogic, mock_eiger_driver: EigerDriverIO, ) -> None: """Test collecting stream documents.""" @@ -417,7 +434,7 @@ async def collect_docs( @pytest.mark.asyncio async def test_eiger_writer_close( - eiger_writer: EigerWriter, + eiger_writer: EigerDataLogic, mock_eiger_driver: EigerDriverIO, ) -> None: """Test closing the writer.""" @@ -433,43 +450,77 @@ async def test_eiger_writer_close( await eiger_writer.close() assert eiger_writer._file_info is None # type: ignore[reportPrivateUsage] - @pytest.mark.asyncio -async def test_eiger_controller_prepare(eiger_controller: EigerController) -> None: +async def test_eiger_prepare(mock_eiger_detector: EigerDetector) -> None: trigger_info = TriggerInfo( - number_of_events=1, + trigger=DetectorTrigger.INTERNAL, livetime=0.01, deadtime=0.001, - trigger=DetectorTrigger.INTERNAL, + exposures_per_collection=1, + collections_per_event=1, + number_of_events=1, exposure_timeout=1.0, - exposures_per_event=1, ) - await eiger_controller.prepare(trigger_info) - assert await eiger_controller.driver.acquire_time.get_value() == 0.01 + await mock_eiger_detector.prepare(trigger_info) + assert await mock_eiger_detector.driver.acquire_time.get_value() == 0.01 assert ( - await eiger_controller.driver.trigger_mode.get_value() + await mock_eiger_detector.driver.trigger_mode.get_value() == EigerTriggerMode.INTERNAL_SERIES ) - assert await eiger_controller.driver.num_images.get_value() == 1 - assert await eiger_controller.driver.image_mode.get_value() == ADImageMode.MULTIPLE + assert await mock_eiger_detector.driver.num_images.get_value() == 1 + assert await mock_eiger_detector.driver.image_mode.get_value() == ADImageMode.MULTIPLE + # Implement tests for these other trigger_infos trigger_info = TriggerInfo( - number_of_events=10, + trigger=DetectorTrigger.EXTERNAL_EDGE, livetime=0.0, deadtime=0.0, - trigger=DetectorTrigger.EDGE_TRIGGER, + exposures_per_collection=5, + collections_per_event=1, + number_of_events=10, exposure_timeout=10.0, - exposures_per_event=5, ) - await eiger_controller.prepare(trigger_info) + +@pytest.mark.asyncio +async def test_eiger_data_logic_prepare_unbounded(eiger_writer: EigerDataLogic) -> None: + trigger_info = TriggerInfo( + trigger=DetectorTrigger.INTERNAL, + livetime=0.01, + deadtime=0.001, + exposures_per_collection=1, + collections_per_event=1, + number_of_events=1, + exposure_timeout=1.0, + ) + stream_resource = await eiger_writer.prepare_unbounded("test_eiger") + print(stream_resource) + +@pytest.mark.asyncio +async def test_eiger_controller_prepare_internal(eiger_controller: EigerController) -> None: + await eiger_controller.prepare_internal(num=1, livetime=0.01, deadtime=0.001) + assert await eiger_controller.driver.acquire_time.get_value() == 0.01 + assert ( + await eiger_controller.driver.trigger_mode.get_value() + == EigerTriggerMode.INTERNAL_SERIES + ) + assert await eiger_controller.driver.num_triggers.get_value() == 1 + assert await eiger_controller.driver.image_mode.get_value() == ADImageMode.MULTIPLE + +@pytest.mark.asyncio +async def test_eiger_controller_prepare_edge(eiger_controller: EigerController) -> None: + await eiger_controller.prepare_edge(num=5, livetime=0.0) assert await eiger_controller.driver.acquire_time.get_value() == 0.0 assert ( await eiger_controller.driver.trigger_mode.get_value() == EigerTriggerMode.EXTERNAL_SERIES ) - assert await eiger_controller.driver.num_images.get_value() == 5 + assert await eiger_controller.driver.num_triggers.get_value() == 5 assert await eiger_controller.driver.image_mode.get_value() == ADImageMode.MULTIPLE + +@pytest.mark.skip("Does this test reflect any kind of desired behavior?") +@pytest.mark.asyncio +async def test_eiger_controller_prepare_edge2(eiger_controller: EigerController) -> None: trigger_info = TriggerInfo( number_of_events=0, livetime=None, @@ -478,7 +529,7 @@ async def test_eiger_controller_prepare(eiger_controller: EigerController) -> No exposure_timeout=10.0, exposures_per_event=1, ) - await eiger_controller.prepare(trigger_info) + await eiger_controller.prepare_edge(num=0, livetime=None) assert await eiger_controller.driver.acquire_time.get_value() == 0.0 assert ( await eiger_controller.driver.trigger_mode.get_value() @@ -494,40 +545,62 @@ async def test_eiger_controller_prepare(eiger_controller: EigerController) -> No async def test_eiger_detector(mock_eiger_detector: EigerDetector) -> None: set_mock_value(mock_eiger_detector.driver.num_images, 1) set_mock_value(mock_eiger_detector.driver.acquire_period, 0.001) - set_mock_value(mock_eiger_detector.fileio.array_counter, 0) + set_mock_value(mock_eiger_detector.data_logic.fileio.array_counter, 0) + set_mock_value(mock_eiger_detector.driver.num_images_counter, 0) - async def _simulate_one_trigger(value: bool, wait: bool) -> None: + async def _simulate_one_trigger(value: bool) -> None: await asyncio.sleep(await mock_eiger_detector.driver.acquire_period.get_value()) - array_counter = await mock_eiger_detector.fileio.array_counter.get_value() - set_mock_value(mock_eiger_detector.fileio.array_counter, array_counter + 1) + array_counter = await mock_eiger_detector.data_logic.fileio.array_counter.get_value() + set_mock_value(mock_eiger_detector.data_logic.fileio.array_counter, array_counter + 1) + num_images_counter = await mock_eiger_detector.driver.num_images_counter.get_value() + set_mock_value(mock_eiger_detector.driver.num_images_counter, num_images_counter + 1) - callback_on_mock_put(mock_eiger_detector.driver.acquire, _simulate_one_trigger) + callback_on_mock_put(mock_eiger_detector.driver.trigger, _simulate_one_trigger) # Standalone methods + await mock_eiger_detector.prepare( + TriggerInfo( + trigger=DetectorTrigger.INTERNAL, + livetime=0.01, + deadtime=0.001, + exposures_per_collection=1, + collections_per_event=1, + number_of_events=1, + exposure_timeout=10.0, + ) + ) await mock_eiger_detector.describe() # Case 1 - Step Scan: stage, trigger, read, trigger, read, unstage await mock_eiger_detector.stage() await mock_eiger_detector.trigger() + print(2) assert ( await mock_eiger_detector.driver.data_source.get_value() == EigerDataSource.FILE_WRITER ) + print(3) await mock_eiger_detector.read() + print(4) await mock_eiger_detector.trigger() + print(5) await mock_eiger_detector.read() + print(6) await mock_eiger_detector.unstage() - set_mock_value(mock_eiger_detector.fileio.array_counter, 0) + print(7) + set_mock_value(mock_eiger_detector.data_logic.fileio.array_counter, 0) + set_mock_value(mock_eiger_detector.driver.num_images_counter, 0) # Case 2 - Fly Scan: prepare, kickoff, complete await mock_eiger_detector.prepare( TriggerInfo( - number_of_events=1, + trigger=DetectorTrigger.INTERNAL, livetime=0.01, deadtime=0.001, - trigger=DetectorTrigger.INTERNAL, + exposures_per_collection=1, + collections_per_event=1, + number_of_events=1, exposure_timeout=10.0, - exposures_per_event=1, ) ) await mock_eiger_detector.kickoff()