diff --git a/.gitignore b/.gitignore index d97e2b0..743ab08 100644 --- a/.gitignore +++ b/.gitignore @@ -111,5 +111,8 @@ venv.bak/ .idea/ .idea/* +# vscode +.vscode/ + *yacctab.py *lextab.py diff --git a/src/pymodaq_plugins_basler/daq_viewer_plugins/plugins_2D/daq_2Dviewer_BaslerWithLECO.py b/src/pymodaq_plugins_basler/daq_viewer_plugins/plugins_2D/daq_2Dviewer_BaslerWithLECO.py deleted file mode 100755 index c237b9d..0000000 --- a/src/pymodaq_plugins_basler/daq_viewer_plugins/plugins_2D/daq_2Dviewer_BaslerWithLECO.py +++ /dev/null @@ -1,751 +0,0 @@ -import numpy as np -import os -import imageio as iio -import h5py -import json -from uuid6 import uuid7 - -from pymodaq.utils.parameter import Parameter -from pymodaq.utils.data import Axis, DataFromPlugins, DataToExport -from pymodaq.utils.daq_utils import ThreadCommand -from pymodaq.control_modules.viewer_utility_classes import main, DAQ_Viewer_base, comon_parameters, params - -from typing import Optional - - -# Suppress only NumPy RuntimeWarnings (bc of crosshair bug) -import warnings -warnings.filterwarnings("ignore", category=RuntimeWarning, module="numpy") - -from pymodaq_plugins_basler.hardware.basler import BaslerCamera, TemperatureMonitor -from pymodaq_plugins_basler.resources.extended_publisher import ExtendedPublisher -from qtpy import QtWidgets, QtCore - -if not hasattr(QtCore, "pyqtSignal"): - QtCore.pyqtSignal = QtCore.Signal # type: ignore - - -class DAQ_2DViewer_BaslerWithLECO(DAQ_Viewer_base): - """Viewer for Basler cameras - """ - controller: BaslerCamera - live_mode_available = True - - # For Basler, this returns a list of user defined camera names - camera_list = [cam.GetFriendlyName() for cam in BaslerCamera.list_cameras()] - - # Default place to store qsettings for this module - settings_basler = QtCore.QSettings("PyMoDAQ", "Basler") - - # Update the params - params = comon_parameters + [{'title': 'Camera List:', 'name': 'camera_list', 'type': 'list', 'value': '', 'limits': camera_list}, - {"title": "Device Info", "name": "device_info", "type": "group", "children": [ - {"title": "Device Model Name", "name": "DeviceModelName", "type": "str", "value": "", "readonly": True}, - {"title": "Device Serial Number", "name": "DeviceSerialNumber", "type": "str", "value": "", "readonly": True}, - {"title": "Device Version", "name": "DeviceVersion", "type": "str", "value": "", "readonly": True}, - {"title": "Device User ID", "name": "DeviceUserID", "type": "str", "value": ""} - ]}, - {'title': 'ROI', 'name': 'roi', 'type': 'group', 'children': [ - {'title': 'Update ROI', 'name': 'update_roi', 'type': 'bool_push', 'value': False, 'default': False}, - {'title': 'Clear ROI+Bin', 'name': 'clear_roi', 'type': 'bool_push', 'value': False, 'default': False}, - {'title': 'Binning', 'name': 'binning', 'type': 'list', 'limits': [1, 2], 'default': 1}, - {'title': 'Image Width', 'name': 'width', 'type': 'int', 'value': 1280, 'readonly': True}, - {'title': 'Image Height', 'name': 'height', 'type': 'int', 'value': 960, 'readonly': True}, - ]}, - {'title': 'LECO Logging', 'name': 'leco_log', 'type': 'group', 'children': [ - {'title': 'Send Frame Data ?', 'name': 'leco_send', 'type': 'led_push', 'value': False, 'default': False, - 'tip': 'This leads to huge performance drop as of now. Only use for single grabs, not continuous'}, - {'title': 'Publisher Name', 'name': 'publisher_name', 'type': 'str', 'value': ''}, - {'title': 'Proxy Server Address', 'name': 'proxy_address', 'type': 'str', 'value': 'localhost', 'default': 'localhost', - 'tip': 'Either IP or hostname of LECO proxy server'}, - {'title': 'Proxy Server Port', 'name': 'proxy_port', 'type': 'int', 'value': 11100, 'default': 11100}, - {'title': 'Metadata', 'name': 'leco_metadata', 'type': 'str', 'value': '', 'readonly': True}, - {'title': 'Saving Base Path:', 'name': 'leco_basepath', 'type': 'browsepath', 'value': '', 'filetype': False, - 'tip': 'This is the base directory for a file path sent from a remote director in the metadata'}, - ]} - ] - - def ini_attributes(self): - """Initialize attributes""" - - self.controller: None - self.user_id = None - - self.data_shape = None - self.save_frame = False - - # For LECO operation - self.metadata = None - self.data_publisher = None - self.send_frame_leco = False - - def init_controller(self) -> BaslerCamera: - # Init camera - self.user_id = self.settings.param('camera_list').value() - self.emit_status(ThreadCommand('Update_Status', [f"Trying to connect to {self.user_id}", 'log'])) - camera_list = BaslerCamera.list_cameras() - for devInfo in camera_list: - if devInfo.GetFriendlyName() == self.user_id: - return BaslerCamera(info=devInfo, callback=self.emit_data_callback) - self.emit_status(ThreadCommand('Update_Status', ["Camera not found", 'log'])) - raise ValueError(f"Camera with name {self.user_id} not found anymore.") - - def ini_detector(self, controller=None): - """Detector communication initialization - - Parameters - ---------- - controller: (object) - custom object of a PyMoDAQ plugin (Slave case). None if only one actuator/detector by controller - (Master case) - - Returns - ------- - info: str - initialized: bool - False if initialization failed otherwise True - """ - # Initialize camera class - self.ini_detector_init(old_controller=controller, - new_controller=self.init_controller()) - - # Setup continuous acquisition & allow adjustable frame rate - self.controller.setup_acquisition() - - # Connect camera lost event - self.controller.configurationEventHandler.signals.cameraRemoved.connect(self.camera_lost) - - # Update the UI with available and current camera parameters - self.add_attributes_to_settings() - self.update_params_ui() - for param in self.settings.children(): - if param.name() == 'device_info': - continue - param.sigValueChanged.emit(param, param.value()) - if param.hasChildren(): - for child in param.children(): - child.sigValueChanged.emit(child, child.value()) - - # Update image parameters - (x0, xend, y0, yend, xbin, ybin) = self.controller.get_roi() - height = xend - x0 - width = yend - y0 - self.settings.child('roi', 'binning').setValue(xbin) - self.settings.child('roi', 'width').setValue(width) - self.settings.child('roi', 'height').setValue(height) - - # Setup data publisher for LECO if data publisher name is set (ideally it should match the LECO actor name) - publisher_name = self.settings.child('leco_log', 'publisher_name').value() - proxy_address = self.settings.child('leco_log', 'proxy_address').value() - proxy_port = self.settings.child('leco_log', 'proxy_port').value() - if publisher_name == '': - print("Publisher name is not set ! Set this first and then reinitialize for LECO logging.") - self.emit_status(ThreadCommand('Update_Status', ["Publisher name is not set ! Set this first and then reinitialize for LECO logging."])) - else: - self.data_publisher = ExtendedPublisher(full_name=publisher_name, host=proxy_address, port=proxy_port) - print(f"Data publisher {publisher_name} initialized for LECO logging") - self.emit_status(ThreadCommand('Update_Status', [f"Data publisher {publisher_name} initialized for LECO logging"])) - - - try: - base_path = self.settings_basler.value('leco_log/basepath', os.path.join(os.path.expanduser('~'), 'Downloads')) - except Exception as e: - print(f"Error finding LECO base path: {e}") - base_path = '' - self.settings.child('leco_log', 'leco_basepath').setValue(base_path) - - - self._prepare_view() - info = "Initialized camera" - print(f"{self.user_id} camera initialized successfully") - self.emit_status(ThreadCommand('Update_Status', [f"{self.user_id} camera initialized successfully"])) - initialized = True - return info, initialized - - def commit_settings(self, param: Parameter): - """Apply the consequences of a change of value in the detector settings - - Parameters - ---------- - param: Parameter - A given parameter (within detector_settings) whose value has been changed by the user - """ - name = param.name() - value = param.value() - - if name == "camera_list": - if self.controller != None: - self.close() - self.ini_detector() - - if name == "device_state_save": - self.controller.save_device_state() - param = self.settings.child('device_state', 'device_state_save') - param.setValue(False) - param.sigValueChanged.emit(param, False) - return - - if name == "device_state_load": - self.controller.stop_grabbing() - self.controller.load_device_state() - # Reinitialize what is needed - self.controller.setup_acquisition() - # Update the UI with available and current camera parameters - self.add_attributes_to_settings() - self.update_params_ui() - for param in self.settings.children(): - param.sigValueChanged.emit(param, param.value()) - if param.hasChildren(): - for child in param.children(): - child.sigValueChanged.emit(child, child.value()) - self._prepare_view() - self.controller.start_grabbing(self.settings.param('AcquisitionFrameRateAbs').value()) - self.emit_status(ThreadCommand('Update_Status', [f"Device state loaded"])) - return - - if name == 'PixelFormat': - self.controller.stop_grabbing() - self.controller.camera.PixelFormat.SetValue(value) - self._prepare_view() - self.controller.start_grabbing(self.settings.param('AcquisitionFrameRateAbs').value()) - return - - if name == 'TriggerSave': - if not self.settings.child('trigger', 'TriggerMode').value(): - print("Trigger mode is not active ! Start triggering first !") - self.emit_status(ThreadCommand('Update_Status', ["Trigger mode is not active ! Start triggering first !"])) - param = self.settings.child('trigger', 'TriggerSaveOptions', 'TriggerSave') - param.setValue(False) # Turn off save on trigger if triggering is off - param.sigValueChanged.emit(param, False) - return - if value: - self.save_frame = True - return - else: - self.save_frame = False - return - - if name == 'leco_send': - if value: - self.send_frame_leco = True - else: - self.send_frame_leco = False - return - if name == 'leco_basepath': - base_path = value - if not os.path.exists(base_path): - print(f"LECO saving base path {base_path} does not exist !") - self.emit_status(ThreadCommand('Update_Status', [f"LECO saving base path {base_path} does not exist !"])) - else: - try: - self.settings_basler.setValue('leco_log/basepath', base_path) - print(f"LECO saving base path set to {base_path}") - self.emit_status(ThreadCommand('Update_Status', [f"LECO saving base path set to {base_path}"])) - except Exception as e: - print(f"Error setting LECO saving base path: {e}") - self.emit_status(ThreadCommand('Update_Status', [f"Error setting LECO saving base path: {e}"])) - if name == 'leco_metadata': - self.metadata = json.loads(value) - - if name in self.controller.attribute_names: - # Special cases - if 'ExposureTime' in name: - value = int(value * 1e3) - if 'Gain' in name and 'Auto' not in name: - value = int(value) - if name == "DeviceUserID": - self.user_id = value - self.controller.camera.DeviceUserID.SetValue(value) - # Update the camera list to account for name change - camera_list = [cam.GetFriendlyName() for cam in BaslerCamera.list_cameras()] - param = self.settings.param('camera_list') - param.setLimits(camera_list) - param.sigLimitsChanged.emit(param, camera_list) - return - if name == 'TriggerMode': - camera_attr = getattr(self.controller.camera, name) - if value: - camera_attr.SetIntValue(1) - else: - self.save_frame = False - camera_attr.SetIntValue(0) - param = self.settings.child('trigger', 'TriggerSaveOptions', 'TriggerSave') - param.setValue(False) # Turn off save on trigger if we turn off triggering - param.sigValueChanged.emit(param, False) - return - if name == 'GainAuto': - camera_attr = getattr(self.controller.camera, name) - if value: - camera_attr.SetIntValue(1) - else: - camera_attr.SetIntValue(0) - return - if name == 'ExposureAuto': - camera_attr = getattr(self.controller.camera, name) - if value: - camera_attr.SetIntValue(1) - else: - camera_attr.SetIntValue(0) - return - # we only need to reference these, nothing to do with the cam - if name == 'TriggerSaveLocation': - return - if name == 'TriggerSaveIndex': - return - if name == 'Filetype': - return - if name == 'Prefix': - return - if name == 'TemperatureMonitor': - if value: - # Start thread for camera temp. monitoring - self.start_temperature_monitoring() - else: - # Stop background threads - self.stop_temp_monitoring() - return - - # All the rest, just do : - camera_attr = getattr(self.controller.camera, name) - camera_attr.SetValue(value) - - if name == "update_roi": - if value: # Switching on ROI - - # We handle ROI and binning separately for clarity - (old_x, _, old_y, _, xbin, ybin) = self.controller.get_roi() # Get current binning - y0, x0 = self.roi_info.origin.coordinates - height, width = self.roi_info.size.coordinates - - # Values need to be rescaled by binning factor and shifted by current x0,y0 to be correct. - new_x = (old_x + x0) * xbin - new_y = (old_y + y0) * xbin - new_width = width * ybin - new_height = height * ybin - - new_roi = (new_x, new_width, xbin, new_y, new_height, ybin) - self.update_rois(new_roi) - param.setValue(False) - param.sigValueChanged.emit(param, False) - elif name == 'binning': - # We handle ROI and binning separately for clarity - (x0, w, y0, h, *_) = self.controller.get_roi() # Get current ROI - xbin = self.settings.child('roi', 'binning').value() - ybin = self.settings.child('roi', 'binning').value() - new_roi = (x0, w, xbin, y0, h, ybin) - self.update_rois(new_roi) - elif name == "clear_roi": - if value: # Switching on ROI - wdet, hdet = self.controller.get_detector_size() - self.settings.child('roi', 'binning').setValue(1) - - new_roi = (0, wdet, 1, 0, hdet, 1) - self.update_rois(new_roi) - param.setValue(False) - param.sigValueChanged.emit(param, False) - - - def _prepare_view(self): - """Preparing a data viewer by emitting temporary data. Typically, needs to be called whenever the - ROIs are changed""" - - (hstart, hend, vstart, vend, *binning) = self.controller.get_roi() - try: - xbin, ybin = binning - except ValueError: # some Pylablib `get_roi` do return just four values instead of six - xbin = ybin = 1 - height = hend - hstart - width = vend - vstart - - self.settings.child('roi', 'width').setValue(width) - self.settings.child('roi', 'height').setValue(height) - - mock_data = np.zeros((height, width)) - - self.x_axis = Axis(label='Pixels', data=np.linspace(1, width, width), index=1) - - if width != 1 and height != 1: - data_shape = 'Data2D' - self.y_axis = Axis(label='Pixels', data=np.linspace(1, height, height), index=0) - self.axes = [self.y_axis, self.x_axis] - else: - data_shape = 'Data1D' - self.axes = [self.x_axis] - - if data_shape != self.data_shape: - self.data_shape = data_shape - self.dte_signal_temp.emit( - DataToExport(f'{self.user_id}', - data=[DataFromPlugins(name=f'{self.user_id}', - data=[np.squeeze(mock_data)], - dim=self.data_shape, - labels=[f'{self.user_id}_{self.data_shape}'], - axes=self.axes)])) - - QtWidgets.QApplication.processEvents() - - def update_rois(self, new_roi): - (new_x, new_width, new_xbinning, new_y, new_height, new_ybinning) = new_roi - if new_roi != self.controller.get_roi(): - # self.controller.set_attribute_value("ROIs",[new_roi]) - self.controller.set_roi(hstart=new_x, - hend=new_x + new_width, - vstart=new_y, - vend=new_y + new_height, - hbin=new_xbinning, - vbin=new_ybinning) - self.emit_status(ThreadCommand('Update_Status', [f'Changed ROI: {new_roi}'])) - self.controller.clear_acquisition() - self.controller.setup_acquisition() - # Finally, prepare view for displaying the new data - self._prepare_view() - - def grab_data(self, Naverage: int = 1, live: bool = False, **kwargs) -> None: - try: - self._prepare_view() - if "Acquisition Frame Rate" in self.controller.attributes: - frame_rate = self.settings.param('AcquisitionFrameRateAbs').value() - else: - frame_rate = None - if live: - self.controller.start_grabbing(frame_rate) - else: - self.controller.start_grabbing(frame_rate) - while not self.controller.imageEventHandler.frame_ready: - pass # do nothing until a frame is ready - self.controller.stop_grabbing() - except Exception as e: - self.emit_status(ThreadCommand('Update_Status', [str(e), "log"])) - - - def emit_data_callback(self, frame_data: dict) -> None: - frame = frame_data['frame'] - timestamp = frame_data['timestamp'] - shape = frame.shape - # First emit data to the GUI - dte = DataToExport(f'{self.user_id}', data=[DataFromPlugins( - name=f'{self.user_id}', - data=[np.squeeze(frame)], - dim=self.data_shape, - labels=[f'{self.user_id}_{self.data_shape}'], - axes=self.axes)]) - self.dte_signal.emit(dte) - - # Now, handle data saving with filepath given by user in trigger save settings or from metadata set remotely with LECO - if self.save_frame: - self.handle_metadata_and_saving(frame, timestamp, shape) - - # Prepare for next frame - self.metadata = None - self.controller.imageEventHandler.frame_ready = False - - def handle_metadata_and_saving(self, frame, timestamp, shape): - if not self.settings.child('trigger', 'TriggerMode').value(): - return - metadata = self.get_metadata_and_save(frame, timestamp, shape) - if self.send_frame_leco: - self.publish_metadata(metadata, frame) - else: - self.publish_metadata(metadata) - - def stop(self): - self.controller.camera.StopGrabbing() - return '' - - def close(self): - """Terminate the communication protocol""" - self.controller.attributes = None - self.controller.close() - - # Stop any background threads - try: - self.stop_temp_monitoring() - except Exception: - pass # no temp settings - - # Just set these to false if camera disconnected for clean GUI - try: - param = self.settings.child('trigger', 'TriggerMode') - param.setValue(False) # Turn off save on trigger if triggering is off - param.sigValueChanged.emit(param, False) - param = self.settings.child('trigger', 'TriggerSaveOptions', 'TriggerSave') - param.setValue(False) # Turn off save on trigger if triggering is off - param.sigValueChanged.emit(param, False) - except Exception: - pass # no trigger settings - - self.status.initialized = False - self.status.controller = None - self.status.info = "" - print(f"{self.user_id} communication terminated successfully") - self.emit_status(ThreadCommand('Update_Status', [f"{self.user_id} communication terminated successfully"])) - - def get_metadata_and_save(self, frame, timestamp, shape): - if self.save_frame: - index = self.settings.child('trigger', 'TriggerSaveOptions', 'TriggerSaveIndex') - filetype = self.settings.child('trigger', 'TriggerSaveOptions', 'Filetype').value() - if self.metadata is not None: - metadata = self.metadata - filepath = self.metadata['file_metadata']['filepath'] - filename = self.metadata['file_metadata']['filename'] - self.metadata['burst_metadata']['user_id'] = self.user_id - basepath = self.settings.child('leco_log', 'leco_basepath').value() - filepath = os.path.normpath(os.path.join(basepath, filepath.lstrip(os.path.sep))) - else: - filepath = self.settings.child('trigger', 'TriggerSaveOptions', 'TriggerSaveLocation').value() - prefix = self.settings.child('trigger', 'TriggerSaveOptions', 'Prefix').value() - if not filepath: - filepath = os.path.join(os.path.expanduser('~'), 'Downloads') - filename = f"{prefix}{index.value()}.{filetype}" - metadata = {'burst_metadata':{}, 'file_metadata': {}, 'detector_metadata': {}} - metadata['burst_metadata']['uuid'] = str(uuid7()) - metadata['burst_metadata']['user_id'] = self.user_id - metadata['burst_metadata']['timestamp'] = timestamp - metadata['file_metadata']['filepath'] = filepath - metadata['file_metadata']['filename'] = filename - index.setValue(index.value()+1) - index.sigValueChanged.emit(index, index.value()) - - metadata['detector_metadata']['fuzziness'] = 0.1 # Account for some uncertainty in timestamp of frame, assume ~100 us for now - count = 0 - for name in self.controller.attribute_names: - if 'Gain' in name and 'Auto' not in name: - metadata['detector_metadata']['gain'] = self.settings.child('gain', name).value() - count += 1 - if 'Exposure' in name and 'Auto' not in name: - metadata['detector_metadata']['exposure_time'] = self.settings.child('exposure', name).value() - count += 1 - if count == 2: - break - metadata['detector_metadata']['shape'] = shape - if filetype == 'h5': - if not filename.endswith('.h5'): - filename += '.h5' - full_path = os.path.join(filepath, filename) - os.makedirs(os.path.dirname(full_path), exist_ok=True) - with h5py.File(full_path, 'w') as f: - dataset_name = f"frame_{timestamp}" - f.create_dataset(dataset_name, data=frame) - f.attrs['uuid'] = metadata['burst_metadata']['uuid'] - f.attrs['user_id'] = metadata['burst_metadata']['user_id'] - f.attrs['timestamp'] = timestamp - f.attrs['exposure_time'] = metadata['detector_metadata']['exposure_time'] - f.attrs['gain'] = metadata['detector_metadata']['gain'] - f.attrs['shape'] = metadata['detector_metadata']['shape'] - f.attrs['fuzziness'] = metadata['detector_metadata']['fuzziness'] - f.attrs['format_version'] = 'hdf5-v0.1' - else: - if not filename.endswith(f".{filetype}"): - filename += f".{filetype}" - if filetype not in ['png', 'jpg', 'jpeg', 'tiff', 'tif']: - print(f"Unsupported file type {filetype} for saving frame. Supported types are: png, jpg, jpeg, tiff, tif, h5") - self.emit_status(ThreadCommand('Update_Status', [f"Unsupported file type {filetype} for saving frame. Supported types are: png, jpg, jpeg, tiff, tif, h5"])) - return - full_path = os.path.join(filepath, f"{filename}") - os.makedirs(os.path.dirname(full_path), exist_ok=True) - iio.imwrite(full_path, frame) - return metadata - - def publish_metadata(self, metadata, frame: Optional[np.ndarray] = None): - if self.data_publisher is not None and self.save_frame: - if self.send_frame_leco: - self.data_publisher.send_data2({self.settings.child('leco_log', 'publisher_name').value(): - {'frame': frame, 'metadata': metadata, - 'message_type': 'detector', - 'serial_number': self.controller.device_info.GetSerialNumber(), - 'format_version': 'hdf5-v0.1'}}) - else: - self.data_publisher.send_data2({self.settings.child('leco_log', 'publisher_name').value(): - {'metadata': metadata, - 'message_type': 'detector', - 'serial_number': self.controller.device_info.GetSerialNumber(), - 'format_version': 'hdf5-v0.1'}}) - - def roi_select(self, roi_info, ind_viewer): - self.roi_info = roi_info - - def crosshair(self, crosshair_info, ind_viewer=0): - self.crosshair_info = crosshair_info - # Adding a small delay improves performance - QtCore.QTimer.singleShot(200, QtWidgets.QApplication.processEvents) - - def camera_lost(self): - self.close() - print(f"Lost connection to {self.user_id}") - self.emit_status(ThreadCommand('Update_Status', [f"Lost connection to {self.user_id}"])) - - def start_temperature_monitoring(self): - self.temp_thread = QtCore.QThread() - self.temp_worker = TemperatureMonitor(self.controller.camera) - - self.temp_worker.moveToThread(self.temp_thread) - - self.temp_thread.started.connect(self.temp_worker.run) - self.temp_worker.temperature_updated.connect(self.on_temperature_update) - self.temp_worker.finished.connect(self.temp_thread.quit) - self.temp_worker.finished.connect(self.temp_worker.deleteLater) - self.temp_thread.finished.connect(self.temp_thread.deleteLater) - - self.temp_thread.start() - - def stop_temp_monitoring(self): - if hasattr(self, 'temp_worker') and self.temp_worker is not None: - self.temp_worker.stop() - self.temp_worker = None - if hasattr(self, 'temp_thread') and self.temp_thread is not None: - try: - self.temp_thread.quit() - self.temp_thread.wait() - except RuntimeError: - pass # Already deleted - self.temp_thread = None - # Make sure temp. monitoring param is false in GUI - param = self.settings.child('temperature', 'TemperatureMonitor') - param.setValue(False) - param.sigValueChanged.emit(param, param.value()) - - def on_temperature_update(self, temp: float): - param = self.settings.child('temperature', 'TemperatureAbs') - param.setValue(temp) - param.sigValueChanged.emit(param, temp) - # TODO maybe close device here if temperature is too high, and allow user to set this threshold ? - if temp > 60: - self.emit_status(ThreadCommand('Update_Status', [f"WARNING: {self.user_id} camera is hot !!"])) - - - # This will add self.attributes, which is derived from the model config file, to self.settings - def add_attributes_to_settings(self): - existing_group_names = {child.name() for child in self.settings.children()} - - for attr in self.controller.attributes: - attr_name = attr['name'] - if attr.get('type') == 'group': - if attr_name not in existing_group_names: - self.settings.addChild(attr) - else: - group_param = self.settings.child(attr_name) - - existing_children = {child.name(): child for child in group_param.children()} - - expected_children = attr.get('children', []) - for expected in expected_children: - expected_name = expected['name'] - if expected_name not in existing_children: - for old_name, old_child in existing_children.items(): - if old_child.opts.get('title') == expected.get('title') and old_name != expected_name: - self.settings.child(attr_name, old_name).show(False) - break - - group_param.addChild(expected) - else: - if attr_name not in existing_group_names: - self.settings.addChild(attr) - - # This will ensure that the UI shows the current camera parameters values and limits - def update_params_ui(self): - - # Common syntax for any camera model - param = self.settings.child('device_info','DeviceModelName').setValue(self.controller.model_name) - self.settings.child('device_info','DeviceSerialNumber').setValue(self.controller.device_info.GetSerialNumber()) - self.settings.child('device_info','DeviceVersion').setValue(self.controller.device_info.GetDeviceVersion()) - self.settings.child('device_info','DeviceUserID').setValue(self.controller.device_info.GetFriendlyName()) - - - for param in self.controller.attributes: - param_type = param['type'] - param_name = param['name'] - - # Already handled - if param_name == "device_info": - continue - if param_name == "device_state": - continue - if param_name == "temperature": - continue - - if param_type == 'group': - # Recurse over children in groups - for child in param['children']: - child_name = child['name'] - child_type = child['type'] - # Special case: skip these - if child_name == 'TriggerSaveOptions': - continue - - camera_attr = getattr(self.controller.camera, child_name) - - try: - if child_type in ['float', 'slide', 'int', 'str']: - value = camera_attr.GetValue() - elif child_type == 'led_push': - if child_name != 'GammaEnable': - value = bool(camera_attr.GetIntValue()) - else: - value = camera_attr.GetValue() - else: - continue # Unsupported type, skip - - # Special case: if parameter is related to ExposureTime, convert to ms from us - if 'Exposure' in child_name and 'Auto' not in child_name: - value *= 1e-3 - - # Set the value - self.settings.child(param_name, child_name).setValue(value) - - # Set limits if defined - if 'limits' in child and child_type in ['float', 'slide', 'int'] and not child.get('readonly', False): - try: - min_limit = camera_attr.GetMin() - max_limit = camera_attr.GetMax() - - if 'Exposure' in child_name and 'Auto' not in child_name: - min_limit *= 1e-3 - max_limit *= 1e-3 - - self.settings.child(param_name, child_name).setLimits([min_limit, max_limit]) - except Exception: - pass - - except Exception: - pass - else: - camera_attr = getattr(self.controller.camera, param_name) - try: - if param_type in ['float', 'slide', 'int', 'str']: - value = camera_attr.GetValue() - elif param_type == 'led_push': - if param_name != 'GammaEnable': - value = bool(camera_attr.GetIntValue()) - else: - value = camera_attr.GetValue() - else: - continue # Unsupported type, skip - - # Special case: if parameter is related to ExposureTime, convert to ms from us - if 'Exposure' in param_name and 'Auto' not in param_name: - value *= 1e-3 - - # Set the value - self.settings.param(param_name).setValue(value) - - if 'limits' in param and param_type in ['float', 'slide', 'int'] and not param.get('readonly', False): - try: - min_limit = camera_attr.GetMin() - max_limit = camera_attr.GetMax() - - - if 'Exposure' in param_name and 'Auto' not in param_name: - min_limit *= 1e-3 - max_limit *= 1e-3 - - self.settings.param(param_name).setLimits([min_limit, max_limit]) - - except Exception: - pass - - except Exception: - pass - - -if __name__ == '__main__': - main(__file__, init=False) \ No newline at end of file diff --git a/src/pymodaq_plugins_basler/hardware/basler.py b/src/pymodaq_plugins_basler/hardware/basler.py index 186a894..0bdb8c9 100755 --- a/src/pymodaq_plugins_basler/hardware/basler.py +++ b/src/pymodaq_plugins_basler/hardware/basler.py @@ -2,7 +2,6 @@ from typing import Any, Callable, List, Optional, Tuple, Union import platform import traceback -import threading from numpy.typing import NDArray from pypylon import pylon @@ -24,17 +23,26 @@ MB_OK = 0x00 MB_ICONINFORMATION = 0x40 MB_ICONERROR = 0x10 + MB_TOPMOST = 0x00040000 IDYES = 6 IDNO = 7 IDOK = 1 def _win_message_box(title, text, buttons="yesno", icon="info"): - icon_map = {"info": MB_ICONINFORMATION, "question": MB_ICONQUESTION, "error": MB_ICONERROR} + icon_map = { + "info": MB_ICONINFORMATION, + "question": MB_ICONQUESTION, + "error": MB_ICONERROR + } + flags = icon_map.get(icon, MB_ICONINFORMATION) + flags |= MB_TOPMOST + if buttons == "yesno": flags |= MB_YESNO else: flags |= MB_OK + return ctypes.windll.user32.MessageBoxW(0, text, title, flags) @@ -57,7 +65,6 @@ def __init__(self, info: str, callback: Optional[Callable] = None, **kwargs): self.camera = pylon.InstantCamera() self.model_name = info.GetModelName() self.device_info = info - self._msg_opener = None # Default directory for parameter config files if platform.system() == 'Windows': @@ -304,7 +311,6 @@ def create_default_config_if_not_exists(self): if os.path.exists(config_path): return else: - self._msg_opener = DefaultConfigMsg() msg = QtWidgets.QMessageBox() msg.setIcon(QtWidgets.QMessageBox.Question) msg.setWindowTitle("Missing Config File") @@ -397,18 +403,6 @@ def handle_user_choice(self, user_choice, config_path, model_name): def safe_exec_messagebox(self, msgbox: QtWidgets.QMessageBox, buttons: str = "yesno") -> int: result_container = {} - finished_event = threading.Event() - - def show_dialog(): - try: - result_container["choice"] = int(msgbox.exec_()) - except Exception: - result_container["choice"] = int(QtWidgets.QMessageBox.No) - finally: - finished_event.set() - - if self._msg_opener is None: - self._msg_opener = DefaultConfigMsg() # Non-GUI thread (Windows only safe path) if sys.platform.startswith("win"): @@ -436,29 +430,12 @@ def show_dialog(): except Exception: return int(QtWidgets.QMessageBox.No) + # Linux path (just create new config; no pop-up window) else: - QtCore.QMetaObject.invokeMethod( - self._msg_opener, - "run_box", - QtCore.Qt.ConnectionType.AutoConnection, - QtCore.Q_ARG(object, show_dialog) - ) - - if QtCore.QThread.currentThread() != QtWidgets.QApplication.instance().thread(): - finished_event.wait() - QtCore.QTimer.singleShot(0, QtWidgets.QApplication.processEvents) - else: - while not finished_event.is_set(): - QtWidgets.QApplication.processEvents(QtCore.QEventLoop.AllEvents, 50) - - return result_container.get("choice", int(QtWidgets.QMessageBox.No)) - -class DefaultConfigMsg(QtCore.QObject): - def __init__(self): - super().__init__() - @QtCore.Slot(object) - def run_box(self, fn): - fn() + try: + return result_container.get("choice", int(QtWidgets.QMessageBox.Yes)) + except Exception: + return result_container.get("choice", int(QtWidgets.QMessageBox.No)) class ConfigurationHandler(pylon.ConfigurationEventHandler): """Handle the configuration events."""