diff --git a/app/services/auralization_service.py b/app/services/auralization_service.py index 252128a..09e6705 100644 --- a/app/services/auralization_service.py +++ b/app/services/auralization_service.py @@ -288,15 +288,25 @@ def run_auralization(auralizationId: int) -> None: logger.debug("run auralization calculation") - #TODO: fix behavior for DG auralization, DG method output format - # should be changed. We want a single universal auralization method, - # without having to switch logic between them for each simulation method. match simulation.simulationMethod: case "DE": _, _ = auralization_calculation(signal_file_name, pressure_file_name, wav_output_file_name) case "DG": _, _ = auralization_calculation_DG(signal_file_name, pressure_file_name, wav_output_file_name) - + case _: + #TODO: We want a single universal auralization method, + # without having to switch logic between them for each simulation method. + # This will be implemented in the function mono_aural_auralization, which will be a + # general convolution-based auralization method using the RIR. + # This method does not rely on the pressure.csv file, but the wav file directly + pressure_file_name_wav = os.path.join( + DefaultConfig.UPLOAD_FOLDER_NAME, export.name.replace(".xlsx", ".wav") + ) + mono_aural_auralization( + signal_file_name, + pressure_file_name_wav, + wav_output_file_name + ) auralization.status = Status.Completed @@ -310,6 +320,34 @@ def run_auralization(auralizationId: int) -> None: abort(400, "Error running this auralization") +def mono_aural_auralization( + signal_file_name: str, + impulse_response_file_name_wav: str, + wav_output_file_name: str, + ) -> None: + """Create a mono-aural auralization by convolution. + + If the sampling rates do not match, the impulse response is resampled to + match the sampling rate of the dry input signal. + + Parameters + ---------- + signal_file_name : str + The dry input signal file name (wav format). + impulse_response_file_name_wav : str + The impulse response file name (wav format). + wav_output_file_name : str + The convolved output signal file name (wav format). + """ + + import pyfar as pf + dry_signal = pf.io.read_audio(signal_file_name) + rir = pf.io.read_audio(impulse_response_file_name_wav) + rir_resampled = pf.dsp.resample(rir, dry_signal.sampling_rate) + convolved_signal = pf.dsp.convolve(rir_resampled, dry_signal) + pf.io.write_audio(convolved_signal, wav_output_file_name) + + # TODO: too long code, refactor this function def auralization_calculation_DG( signal_file_name: Optional[str], impulse_response: str, wav_output_file_name: Optional[str] = None diff --git a/app/services/executors/local_executor.py b/app/services/executors/local_executor.py index 588642e..5c07f05 100644 --- a/app/services/executors/local_executor.py +++ b/app/services/executors/local_executor.py @@ -11,41 +11,111 @@ logger = logging.getLogger(__name__) +# +def _is_running_in_container() -> bool: + """ + Used to check if we're running in a container or locally for development, + to determine how to resolve paths for Docker mounts. + + Returns: + True when the current process appears to be running inside a container. + """ + + if os.path.exists("/.dockerenv") or os.path.exists("/run/.containerenv"): + return True + + cgroup_paths = ["/proc/self/cgroup", "/proc/1/cgroup"] + keywords = ("docker", "containerd", "kubepods", "podman", "lxc") + + for path in cgroup_paths: + if not os.path.exists(path): + continue + try: + with open(path, "r", encoding="utf-8", errors="ignore") as handle: + contents = handle.read() + except OSError: + continue + if any(keyword in contents for keyword in keywords): + return True + + return False + + +def _get_current_container(client): + """Return the Docker container object for the current container.""" + import socket + + hostname = socket.gethostname() + try: + return client.containers.get(hostname) + except docker.errors.NotFound: + for container in client.containers.list(all=True): + if hostname == container.name or hostname in container.id: + return container + raise + def get_host_path_for_container_path(container_path: str) -> str: """ Resolves the host path corresponding to a given container path by inspecting the current container's mounts using the Docker socket. + For local debugging (not in a container), returns the container_path as-is. + Args: container_path (str): The absolute path inside the container to resolve. Returns: - str: The corresponding absolute path on the host machine. + str: The corresponding absolute path on the host machine (or container_path if local). Raises: - RuntimeError: If no mount is found covering the given container path. - Exception: If there is an error communicating with Docker or resolving the path. + RuntimeError: If no mount is found covering the given container path (in container only). + Exception: If there is an error communicating with Docker or resolving the path (in container only). """ - + + # For local debugging, we assume the container_path is directly accessible on the host + if not _is_running_in_container(): + logger.warning( + f"Running locally (not in container). Returning container_path as-is: {container_path}" + ) + return container_path + try: client = docker.from_env() - import socket - hostname = socket.gethostname() - container = client.containers.get(hostname) - for mount in container.attrs["Mounts"]: - - destination = mount.get("Destination", "") - if destination == container_path: - host_source = mount["Source"] - relative = os.path.relpath(container_path, destination) - return os.path.join(host_source, relative).replace("\\", "/") + container = _get_current_container(client) + container_path = os.path.normpath(container_path) + + best_mount = None + best_destination = None + for mount in container.attrs.get("Mounts", []): + destination = mount.get("Destination") + if not destination: + continue + destination = os.path.normpath(destination) + if ( + container_path == destination + or destination == os.sep + or container_path.startswith(destination + os.sep) + ): + if best_destination is None or len(destination) > len(best_destination): + best_mount = mount + best_destination = destination + + if best_mount is None: + raise RuntimeError( + f"No mount found covering container path: {container_path}" + ) + + host_source = best_mount["Source"] + relative = os.path.relpath(container_path, best_destination) + if relative == ".": + return host_source.replace("\\", "/") + return os.path.join(host_source, relative).replace("\\", "/") + except Exception as e: logger.error(f"Could not resolve host path for {container_path}: {e}") raise - raise RuntimeError(f"No mount found covering container path: {container_path}") - class LocalExecutor(SimulationExecutor): def __init__(self, work_dir=None): diff --git a/app/services/simulation_service.py b/app/services/simulation_service.py index 17ca354..ab1c62d 100644 --- a/app/services/simulation_service.py +++ b/app/services/simulation_service.py @@ -290,6 +290,7 @@ def start_solver_task(simulation_id): "geo_path": geo_path, "results": results_container, "task_id": -1, + "fs_auralization": 44100 }, indent=4, ) @@ -414,66 +415,109 @@ def run_solver(simulation_run_id: int, json_path: str): "task_id": result_container["task_id"] } - logger.info(f"{simulation_method} Simulation_service:...container has been spinned up.") - container = executor.execute(method_config, sim_config) - container.wait() - logger.info(f"{simulation_method} Simulation_service:...container has finished.") + logger.info(f"{simulation_method} Simulation_service:...container has been spun up.") - cancel_flag_path = Path(json_path).parent / f"{result_container['task_id']}.cancel" + container = None + try: + container = executor.execute(method_config, sim_config) + container.wait() + logger.info(f"{simulation_method} Simulation_service:...container has finished.") + except Exception as ex: + logger.error(f"Error during container execution: {ex}") + raise Exception(f"Error during container execution: {ex}") + finally: + remove_method = getattr(container, "remove", None) if container is not None else None + if callable(remove_method): + try: + remove_method() # Clean up local containers after execution + except Exception as cleanup_ex: + # If cancelled, the container is already removed, so this exception will be thrown. + logger.warning(f"Failed to remove execution container: {cleanup_ex}") + # auralization: generate impulse response wav file + # TODO: move the auralization calculation to DE and write that + # to the JSON so that everything can be handled by the current + # default case and we can get rid of the match case. + match simulation_method: + case "DE": + # TODO: This function is not a general auralization function and should be renamed + imp_tot, fs = auralization_calculation( + None, + json_path.replace(".json", "_pressure.csv"), + json_path.replace(".json", ".wav"), + ) + + # this should be the only thing getting executed + case _: + import numpy as np + + with open(json_path, "r") as json_file: + result_container = json.load(json_file) + + imp_tot = np.array(result_container["results"][0]["responses"][0]["receiverResults"]) + + with open(json_path, "r") as json_file: + input_data = json.load(json_file) + if "sampling_rate" in input_data["simulationSettings"]: + fs = input_data["simulationSettings"]["sampling_rate"] + else: + fs = input_data["fs_auralization"] # 44100 by default + + rir_wav_file_name = json_path.replace(".json", ".wav") + + import pyfar as pf + if imp_tot is None or len(imp_tot) == 0: + logger.warning("Impulse response data is empty or missing") + imp_tot = np.zeros(44100) # 1 second of silence at 44.1 kHz + norm_rir = pf.Signal(imp_tot, fs) # don't use the pf.dsp.normalize function on an empty signal, as it returns NaN values. + else: + rir = pf.Signal(imp_tot, fs) + # Normalise the rir. Some methods return pressure values that are too high, which causes issues when writing to wav. + norm_rir = pf.dsp.normalize(rir) + + pf.io.write_audio(norm_rir, rir_wav_file_name) + logger.info(f"Impulse response shape: {imp_tot.shape}, sampling rate: {fs}") + # logs = container.logs().decode("utf-8") # logger.info(f"{simulation_method} container FULL logs:\n{logs}") + cancel_flag_path = Path(json_path).parent / f"{result_container['task_id']}.cancel" + if os.path.exists(cancel_flag_path): logger.info("Cancelled: do not save to xlsx") + # Keep the cancel flag in place so later status checks in this + # function can still detect that the simulation was cancelled. else: - logger.info("Saving to xlsx...") - - # save the simulation result json to xlsx - if not ExportHelper.parse_json_file_to_xlsx_file( - json_path, json_path.replace(".json", ".xlsx") - ): - logger.error("Error saving the result to xlsx") - raise "Error saving the result to xlsx" - - # db - save the xlsx file path - export = Export( - name=Path(json_path).name.replace(".json", ".xlsx"), - simulationId=simulation.id, - ) - session.add(export) - - # auralization: generate impulse response wav file - # TODO: fix DG method such that this auralization works, - # the idea is to have one shared pipeline across all - # methods. - match simulation_method: - case "DG": - imp_tot, fs = auralization_calculation_DG( - None, - json_path.replace(".json", "_pressure.csv"), - json_path.replace(".json", ".wav"), - ) - # this should be the only thing getting executed - case _: - imp_tot, fs = auralization_calculation( - None, - json_path.replace(".json", "_pressure.csv"), - json_path.replace(".json", ".wav"), - ) - - - # auralization: save the impulse response to xlsx - if not ExportHelper.write_data_to_xlsx_file( - json_path.replace(".json", ".xlsx"), - CustomExportParametersConfig.impulse_response, - {f"{fs}Hz": imp_tot}, - ): - logger.error( - "Error saving the impulse response to xlsx" + try: + logger.info("Saving to xlsx...") + + # save the simulation result json to xlsx + if not ExportHelper.parse_json_file_to_xlsx_file( + json_path, json_path.replace(".json", ".xlsx") + ): + logger.error("Error saving the result to xlsx") + raise "Error saving the result to xlsx" + + # db - save the xlsx file path + export = Export( + name=Path(json_path).name.replace(".json", ".xlsx"), + simulationId=simulation.id, ) - raise "Error saving the impulse response to xlsx" - + session.add(export) + + # auralization: save the impulse response to xlsx + if not ExportHelper.write_data_to_xlsx_file( + json_path.replace(".json", ".xlsx"), + CustomExportParametersConfig.impulse_response, + {f"{fs}Hz": imp_tot}, + ): + logger.error( + "Error saving the impulse response to xlsx" + ) + raise "Error saving the impulse response to xlsx" + except Exception as ex: + logger.error(f"Error during saving results: {ex}") + raise Exception(f"Error during saving results: {ex}") result_container = {} if json_path is not None: @@ -492,6 +536,14 @@ def run_solver(simulation_run_id: int, json_path: str): simulation.status = Status.Completed simulation.completedAt = datetime.now() + if os.path.exists(cancel_flag_path): + # Clean up cancel flag after handling cancellation + try: + cancel_flag_path.unlink() + logger.info(f"Removed cancel flag file: {cancel_flag_path}") + except Exception as ex: + logger.warning(f"Failed to remove cancel flag file {cancel_flag_path}: {ex}") + simulation_run.updatedAt = datetime.now() simulation.updatedAt = datetime.now() diff --git a/requirements.txt b/requirements.txt index d1727ab..ff9e433 100644 --- a/requirements.txt +++ b/requirements.txt @@ -156,4 +156,4 @@ paramiko git+https://github.com/Building-acoustics-TU-Eindhoven/acousticDE.git@d32afb2498e27bd996fc7356d57dc4f1ed76aa44#egg=acousticDE # git+https://github.com/dtu-act/deeponet-acoustic-wave-prop.git@3d3fc5ee952756eedcd4fec3c3674ad829825c7e#egg=deeponet-acoustics git+https://github.com/Building-acoustics-TU-Eindhoven/edg-acoustics.git@08cac98da98ed14ba1366741b1c0644001503b82#egg=edg-acoustics - +pyfar \ No newline at end of file