Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 42 additions & 4 deletions app/services/auralization_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,15 +288,25 @@ def run_auralization(auralizationId: int) -> None:

logger.debug("run auralization calculation")

#TODO: fix behavior for DG auralization, DG method output format
# should be changed. We want a single universal auralization method,
# without having to switch logic between them for each simulation method.
match simulation.simulationMethod:
case "DE":
_, _ = auralization_calculation(signal_file_name, pressure_file_name, wav_output_file_name)
case "DG":
_, _ = auralization_calculation_DG(signal_file_name, pressure_file_name, wav_output_file_name)

case _:
#TODO: We want a single universal auralization method,
# without having to switch logic between them for each simulation method.
# This will be implemented in the function mono_aural_auralization, which will be a
# general convolution-based auralization method using the RIR.
# This method does not rely on the pressure.csv file, but the wav file directly
pressure_file_name_wav = os.path.join(
DefaultConfig.UPLOAD_FOLDER_NAME, export.name.replace(".xlsx", ".wav")
)
mono_aural_auralization(
signal_file_name,
pressure_file_name_wav,
wav_output_file_name
)

auralization.status = Status.Completed

Expand All @@ -310,6 +320,34 @@ def run_auralization(auralizationId: int) -> None:
abort(400, "Error running this auralization")


def mono_aural_auralization(
signal_file_name: str,
impulse_response_file_name_wav: str,
wav_output_file_name: str,
) -> None:
"""Create a mono-aural auralization by convolution.

If the sampling rates do not match, the impulse response is resampled to
match the sampling rate of the dry input signal.

Parameters
----------
signal_file_name : str
The dry input signal file name (wav format).
impulse_response_file_name_wav : str
The impulse response file name (wav format).
wav_output_file_name : str
The convolved output signal file name (wav format).
"""

import pyfar as pf
dry_signal = pf.io.read_audio(signal_file_name)
rir = pf.io.read_audio(impulse_response_file_name_wav)
rir_resampled = pf.dsp.resample(rir, dry_signal.sampling_rate)
convolved_signal = pf.dsp.convolve(rir_resampled, dry_signal)
pf.io.write_audio(convolved_signal, wav_output_file_name)


Comment on lines +343 to +350
# TODO: too long code, refactor this function
def auralization_calculation_DG(
signal_file_name: Optional[str], impulse_response: str, wav_output_file_name: Optional[str] = None
Expand Down
102 changes: 86 additions & 16 deletions app/services/executors/local_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,41 +11,111 @@

logger = logging.getLogger(__name__)

#
def _is_running_in_container() -> bool:
"""
Used to check if we're running in a container or locally for development,
to determine how to resolve paths for Docker mounts.

Returns:
True when the current process appears to be running inside a container.
"""

if os.path.exists("/.dockerenv") or os.path.exists("/run/.containerenv"):
return True

cgroup_paths = ["/proc/self/cgroup", "/proc/1/cgroup"]
keywords = ("docker", "containerd", "kubepods", "podman", "lxc")

for path in cgroup_paths:
if not os.path.exists(path):
continue
try:
with open(path, "r", encoding="utf-8", errors="ignore") as handle:
contents = handle.read()
except OSError:
continue
if any(keyword in contents for keyword in keywords):
return True

return False


def _get_current_container(client):
"""Return the Docker container object for the current container."""
import socket

hostname = socket.gethostname()
try:
return client.containers.get(hostname)
except docker.errors.NotFound:
for container in client.containers.list(all=True):
if hostname == container.name or hostname in container.id:
return container
raise


def get_host_path_for_container_path(container_path: str) -> str:
"""
Resolves the host path corresponding to a given container path by inspecting
the current container's mounts using the Docker socket.

For local debugging (not in a container), returns the container_path as-is.

Args:
container_path (str): The absolute path inside the container to resolve.

Returns:
str: The corresponding absolute path on the host machine.
str: The corresponding absolute path on the host machine (or container_path if local).

Raises:
RuntimeError: If no mount is found covering the given container path.
Exception: If there is an error communicating with Docker or resolving the path.
RuntimeError: If no mount is found covering the given container path (in container only).
Exception: If there is an error communicating with Docker or resolving the path (in container only).
"""


# For local debugging, we assume the container_path is directly accessible on the host
if not _is_running_in_container():
logger.warning(
f"Running locally (not in container). Returning container_path as-is: {container_path}"
)
return container_path

try:
client = docker.from_env()
import socket
hostname = socket.gethostname()
container = client.containers.get(hostname)
for mount in container.attrs["Mounts"]:

destination = mount.get("Destination", "")
if destination == container_path:
host_source = mount["Source"]
relative = os.path.relpath(container_path, destination)
return os.path.join(host_source, relative).replace("\\", "/")
container = _get_current_container(client)
container_path = os.path.normpath(container_path)

best_mount = None
best_destination = None
for mount in container.attrs.get("Mounts", []):
destination = mount.get("Destination")
if not destination:
continue
destination = os.path.normpath(destination)
if (
container_path == destination
or destination == os.sep
or container_path.startswith(destination + os.sep)
):
if best_destination is None or len(destination) > len(best_destination):
best_mount = mount
best_destination = destination

if best_mount is None:
raise RuntimeError(
f"No mount found covering container path: {container_path}"
)

host_source = best_mount["Source"]
relative = os.path.relpath(container_path, best_destination)
if relative == ".":
return host_source.replace("\\", "/")
return os.path.join(host_source, relative).replace("\\", "/")

except Exception as e:
logger.error(f"Could not resolve host path for {container_path}: {e}")
raise

raise RuntimeError(f"No mount found covering container path: {container_path}")


class LocalExecutor(SimulationExecutor):
def __init__(self, work_dir=None):
Expand Down
154 changes: 103 additions & 51 deletions app/services/simulation_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ def start_solver_task(simulation_id):
"geo_path": geo_path,
"results": results_container,
"task_id": -1,
"fs_auralization": 44100
},
indent=4,
)
Expand Down Expand Up @@ -414,66 +415,109 @@ def run_solver(simulation_run_id: int, json_path: str):
"task_id": result_container["task_id"]
}

logger.info(f"{simulation_method} Simulation_service:...container has been spinned up.")
container = executor.execute(method_config, sim_config)
container.wait()
logger.info(f"{simulation_method} Simulation_service:...container has finished.")
logger.info(f"{simulation_method} Simulation_service:...container has been spun up.")
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To me it seems that all if this can be removed by uncommenting this code line here

So I would propose to revert this part.


cancel_flag_path = Path(json_path).parent / f"{result_container['task_id']}.cancel"
container = None
try:
container = executor.execute(method_config, sim_config)
container.wait()
logger.info(f"{simulation_method} Simulation_service:...container has finished.")
except Exception as ex:
logger.error(f"Error during container execution: {ex}")
raise Exception(f"Error during container execution: {ex}")
Comment thread
SilvinWillemsen marked this conversation as resolved.
Comment thread
SilvinWillemsen marked this conversation as resolved.
finally:
remove_method = getattr(container, "remove", None) if container is not None else None
if callable(remove_method):
try:
remove_method() # Clean up local containers after execution
except Exception as cleanup_ex:
# If cancelled, the container is already removed, so this exception will be thrown.
logger.warning(f"Failed to remove execution container: {cleanup_ex}")
Comment on lines +420 to +435
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To me it seems that all if this can be removed by uncommenting this code line here

So I would propose to revert this part.


# auralization: generate impulse response wav file
# TODO: move the auralization calculation to DE and write that
# to the JSON so that everything can be handled by the current
# default case and we can get rid of the match case.
match simulation_method:
case "DE":
# TODO: This function is not a general auralization function and should be renamed
imp_tot, fs = auralization_calculation(
None,
json_path.replace(".json", "_pressure.csv"),
json_path.replace(".json", ".wav"),
)

# this should be the only thing getting executed
case _:
import numpy as np
Comment on lines +441 to +452

with open(json_path, "r") as json_file:
result_container = json.load(json_file)

imp_tot = np.array(result_container["results"][0]["responses"][0]["receiverResults"])

with open(json_path, "r") as json_file:
input_data = json.load(json_file)
if "sampling_rate" in input_data["simulationSettings"]:
fs = input_data["simulationSettings"]["sampling_rate"]
else:
fs = input_data["fs_auralization"] # 44100 by default

rir_wav_file_name = json_path.replace(".json", ".wav")

import pyfar as pf
if imp_tot is None or len(imp_tot) == 0:
logger.warning("Impulse response data is empty or missing")
imp_tot = np.zeros(44100) # 1 second of silence at 44.1 kHz
norm_rir = pf.Signal(imp_tot, fs) # don't use the pf.dsp.normalize function on an empty signal, as it returns NaN values.
else:
rir = pf.Signal(imp_tot, fs)
# Normalise the rir. Some methods return pressure values that are too high, which causes issues when writing to wav.
norm_rir = pf.dsp.normalize(rir)
Comment on lines +469 to +476
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if imp_tot is None or len(imp_tot) == 0:
logger.warning("Impulse response data is empty or missing")
imp_tot = np.zeros(44100) # 1 second of silence at 44.1 kHz
norm_rir = pf.Signal(imp_tot, fs)
else:
rir = pf.Signal(imp_tot, fs)
norm_rir = pf.dsp.normalize(rir)
if imp_tot is None or len(imp_tot) == 0:
logger.warning("Impulse response data is empty or missing")
imp_tot = np.zeros(44100) # 1 second of silence at 44.1 kHz
rir = pf.Signal(imp_tot, fs)
norm_rir = pf.dsp.normalize(rir)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a conceptual comment here:
Why does the RIR get normalized by default?
This removes for example any distance information in the auralization and source strength differences. Not sure if that's something that makes a lot of sense as default.

Maybe we can implement something like a normalize checkbox for the auralization in the future instead?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm.. we need the normalisation because DG, for instance, returns crazy-large pressure values. This is a good comment though. We probably want to have a pressure to signal function (does pyfar have that?).

Regarding the changes you suggest, wouldn't the pf.dsp.normalize function return NaN if imp_tot is None because rir consists of zeros?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Ah, I see. Let's keep it at always normalize then.
  2. The normalize function by default normalizes to the maximum. So in case the data consists of only zeros it will contain nans afterwards, indeed.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add some comments, but will leave the functionality as is, alright? :)


pf.io.write_audio(norm_rir, rir_wav_file_name)
logger.info(f"Impulse response shape: {imp_tot.shape}, sampling rate: {fs}")

# logs = container.logs().decode("utf-8")
# logger.info(f"{simulation_method} container FULL logs:\n{logs}")

cancel_flag_path = Path(json_path).parent / f"{result_container['task_id']}.cancel"

Comment on lines +484 to +485
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can be reverted when using the remove functionality of the executor class

So I would propose to revert this part.

if os.path.exists(cancel_flag_path):
logger.info("Cancelled: do not save to xlsx")
# Keep the cancel flag in place so later status checks in this
# function can still detect that the simulation was cancelled.
else:
logger.info("Saving to xlsx...")

# save the simulation result json to xlsx
if not ExportHelper.parse_json_file_to_xlsx_file(
json_path, json_path.replace(".json", ".xlsx")
):
logger.error("Error saving the result to xlsx")
raise "Error saving the result to xlsx"

# db - save the xlsx file path
export = Export(
name=Path(json_path).name.replace(".json", ".xlsx"),
simulationId=simulation.id,
)
session.add(export)

# auralization: generate impulse response wav file
# TODO: fix DG method such that this auralization works,
# the idea is to have one shared pipeline across all
# methods.
match simulation_method:
case "DG":
imp_tot, fs = auralization_calculation_DG(
None,
json_path.replace(".json", "_pressure.csv"),
json_path.replace(".json", ".wav"),
)
# this should be the only thing getting executed
case _:
imp_tot, fs = auralization_calculation(
None,
json_path.replace(".json", "_pressure.csv"),
json_path.replace(".json", ".wav"),
)


# auralization: save the impulse response to xlsx
if not ExportHelper.write_data_to_xlsx_file(
json_path.replace(".json", ".xlsx"),
CustomExportParametersConfig.impulse_response,
{f"{fs}Hz": imp_tot},
):
logger.error(
"Error saving the impulse response to xlsx"
try:
logger.info("Saving to xlsx...")

# save the simulation result json to xlsx
if not ExportHelper.parse_json_file_to_xlsx_file(
json_path, json_path.replace(".json", ".xlsx")
):
logger.error("Error saving the result to xlsx")
raise "Error saving the result to xlsx"

# db - save the xlsx file path
export = Export(
name=Path(json_path).name.replace(".json", ".xlsx"),
simulationId=simulation.id,
)
raise "Error saving the impulse response to xlsx"

session.add(export)

# auralization: save the impulse response to xlsx
if not ExportHelper.write_data_to_xlsx_file(
json_path.replace(".json", ".xlsx"),
CustomExportParametersConfig.impulse_response,
{f"{fs}Hz": imp_tot},
):
logger.error(
"Error saving the impulse response to xlsx"
)
raise "Error saving the impulse response to xlsx"
except Exception as ex:
logger.error(f"Error during saving results: {ex}")
raise Exception(f"Error during saving results: {ex}")

result_container = {}
if json_path is not None:
Expand All @@ -492,6 +536,14 @@ def run_solver(simulation_run_id: int, json_path: str):
simulation.status = Status.Completed
simulation.completedAt = datetime.now()

if os.path.exists(cancel_flag_path):
# Clean up cancel flag after handling cancellation
try:
cancel_flag_path.unlink()
logger.info(f"Removed cancel flag file: {cancel_flag_path}")
except Exception as ex:
logger.warning(f"Failed to remove cancel flag file {cancel_flag_path}: {ex}")

Comment on lines +539 to +546
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this can be reverted as well when using the remove option in the executor

simulation_run.updatedAt = datetime.now()
simulation.updatedAt = datetime.now()

Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -156,4 +156,4 @@ paramiko
git+https://github.com/Building-acoustics-TU-Eindhoven/acousticDE.git@d32afb2498e27bd996fc7356d57dc4f1ed76aa44#egg=acousticDE
# git+https://github.com/dtu-act/deeponet-acoustic-wave-prop.git@3d3fc5ee952756eedcd4fec3c3674ad829825c7e#egg=deeponet-acoustics
git+https://github.com/Building-acoustics-TU-Eindhoven/edg-acoustics.git@08cac98da98ed14ba1366741b1c0644001503b82#egg=edg-acoustics

pyfar
Loading