diff --git a/components/dags/src/pinta_dags/dags/calculate_dem_diff.py b/components/dags/src/pinta_dags/dags/calculate_dem_diff.py index ec660f2..ca0935a 100644 --- a/components/dags/src/pinta_dags/dags/calculate_dem_diff.py +++ b/components/dags/src/pinta_dags/dags/calculate_dem_diff.py @@ -14,6 +14,7 @@ from pinta_dags.config import AirflowVariable from pinta_dags.tasks import ( build_job_connection_uri_task, + find_production_area_tile_geometries, get_database_name, initialize_dem_tables, merge_dem_staging_tables, @@ -75,26 +76,6 @@ def calculate_dem_diff_dag() -> None: # Precondition: the production area must already have its job database # provisioned and database_name set for production area by orchestrator DAG. - @task.docker(**config.PINTA_CONTAINER_TASK_ARGS) - def find_production_area( - connection_uri: str, - production_area_id: str, - ) -> list[str]: - import sqlalchemy - import sqlmodel - from geoalchemy2.shape import to_shape - from pinta_db.primary_db.models.management import ProductionArea - - engine = sqlalchemy.create_engine(connection_uri) - with sqlmodel.Session(engine) as session: - statement = sqlmodel.select(ProductionArea).where( - ProductionArea.id == production_area_id - ) - area_in_db = session.exec(statement).first() - if not area_in_db: - return [] - return [to_shape(tile.geom).wkt for tile in area_in_db.tiles] - @task.docker( **config.PINTA_CONTAINER_TASK_ARGS, max_active_tis_per_dag=_get_max_parallel_pipelines(), @@ -155,7 +136,9 @@ def cluster_diff_polygons(job_connection_uri: str) -> None: database_name=database_name, ), ) - tile_wkt_list = find_production_area(primary_connection_uri, prod_area_id) + tile_wkt_list = find_production_area_tile_geometries.override( + task_id="find_production_area" + )(primary_connection_uri, prod_area_id) init_diff_task = initialize_dem_tables.override( task_id="initialize_diff_tables" diff --git a/components/dags/src/pinta_dags/dags/calculate_rasters_for_production_area.py b/components/dags/src/pinta_dags/dags/calculate_rasters_for_production_area.py index 2c40712..7eabd49 100644 --- a/components/dags/src/pinta_dags/dags/calculate_rasters_for_production_area.py +++ b/components/dags/src/pinta_dags/dags/calculate_rasters_for_production_area.py @@ -13,6 +13,9 @@ from pinta_dags import config +# How often each triggered child DAG is polled for completion. +_TRIGGER_POKE_INTERVAL_SECONDS = 5 + def create_calculate_rasters_for_production_area_dag( # noqa: PLR0915 *, @@ -47,6 +50,11 @@ def create_calculate_rasters_for_production_area_dag( # noqa: PLR0915 type="boolean", description="Cluster difference polygons in the DEM diff DAG", ), + "initialize_dem_preview": Param( + default=True, + type="boolean", + description="Initialize DEM preview table", + ), }, is_paused_upon_creation=False, ) @@ -105,6 +113,7 @@ def should_run(*, requested: bool) -> bool: trigger_dag_id=constants.DAG_ID_CALCULATE_REFERENCE_DEM, conf={"id": "{{ params.id }}"}, wait_for_completion=True, + poke_interval=_TRIGGER_POKE_INTERVAL_SECONDS, ) trigger_calculate_dem_diff = TriggerDagRunOperator( @@ -115,6 +124,15 @@ def should_run(*, requested: bool) -> bool: "cluster": "{{ params.cluster_diff_polygons }}", }, wait_for_completion=True, + poke_interval=_TRIGGER_POKE_INTERVAL_SECONDS, + ) + + trigger_initialize_dem_preview = TriggerDagRunOperator( + task_id="trigger_initialize_dem_preview", + trigger_dag_id=constants.DAG_ID_INITIALIZE_DEM_PREVIEW, + conf={"id": "{{ params.id }}"}, + wait_for_completion=True, + poke_interval=_TRIGGER_POKE_INTERVAL_SECONDS, ) @task.docker( @@ -184,8 +202,17 @@ def set_processing_status_failed( task_id="should_calculate_dem_diff", trigger_rule=TriggerRule.NONE_FAILED, )(requested=cast("bool", "{{ params.calculate_dem_diff }}")) - - triggers = [trigger_calculate_reference_dem, trigger_calculate_dem_diff] + # The DEM preview copy is independent of the reference DEM -> DEM diff + # chain, so it runs in parallel straight off ensure_database. + preview_gate = should_run.override( + task_id="should_initialize_dem_preview", + )(requested=cast("bool", "{{ params.initialize_dem_preview }}")) + + triggers = [ + trigger_calculate_reference_dem, + trigger_calculate_dem_diff, + trigger_initialize_dem_preview, + ] status_completed = set_processing_status_completed( primary_connection_uri, prod_area_id ) @@ -197,6 +224,8 @@ def set_processing_status_failed( # DEM so it can compare against freshly computed reference rasters. ensure_database >> reference_gate >> trigger_calculate_reference_dem trigger_calculate_reference_dem >> diff_gate >> trigger_calculate_dem_diff + # Runs in parallel with the reference DEM -> DEM diff chain. + ensure_database >> preview_gate >> trigger_initialize_dem_preview # Also depend on ensure_database so a failure there (before any trigger # runs) still resolves the processing status instead of leaving the # production area stuck in STARTED. diff --git a/components/dags/src/pinta_dags/dags/calculate_reference_dem.py b/components/dags/src/pinta_dags/dags/calculate_reference_dem.py index 2485a0b..b2a8e36 100644 --- a/components/dags/src/pinta_dags/dags/calculate_reference_dem.py +++ b/components/dags/src/pinta_dags/dags/calculate_reference_dem.py @@ -14,6 +14,7 @@ from pinta_dags.config import AirflowVariable from pinta_dags.tasks import ( build_job_connection_uri_task, + find_production_area_tile_paths, get_database_name, initialize_dem_tables, merge_dem_staging_tables, @@ -65,25 +66,6 @@ def calculate_reference_dem_dag() -> None: # Precondition: the production area must already have its job database # provisioned and database_name set for production area by orchestrator DAG. - @task.docker(**config.PINTA_CONTAINER_TASK_ARGS) - def find_production_area( - connection_uri: str, - production_area_id: str, - ) -> list[str]: - import sqlalchemy - import sqlmodel - from pinta_db.primary_db.models.management import ProductionArea - - engine = sqlalchemy.create_engine(connection_uri) - with sqlmodel.Session(engine) as session: - statement = sqlmodel.select(ProductionArea).where( - ProductionArea.id == production_area_id - ) - area_in_db = session.exec(statement).first() - if not area_in_db: - return [] - return [tile.file_path for tile in area_in_db.tiles] - @task.docker( **config.PINTA_CONTAINER_TASK_ARGS, max_active_tis_per_dag=_get_max_parallel_pipelines(), @@ -133,7 +115,9 @@ def blast2dem( # noqa: PLR0913 database_name = cast( "str", get_database_name(primary_connection_uri, prod_area_id) ) - file_paths = find_production_area(primary_connection_uri, prod_area_id) + file_paths = find_production_area_tile_paths.override( + task_id="find_production_area" + )(primary_connection_uri, prod_area_id) job_db_uri = cast( "str", diff --git a/components/dags/src/pinta_dags/dags/initialize_dem_preview.py b/components/dags/src/pinta_dags/dags/initialize_dem_preview.py new file mode 100644 index 0000000..45fad8f --- /dev/null +++ b/components/dags/src/pinta_dags/dags/initialize_dem_preview.py @@ -0,0 +1,157 @@ +# Copyright (c) 2026 National Land Survey of Finland +# (https://www.maanmittauslaitos.fi/en). +# This file is part of the Pinta. +# Licensed under the MIT License; see the repository LICENSE file. + +from typing import cast + +from airflow.sdk import DAG, Param, Variable, dag, task +from pinta_common import constants +from pinta_db.job_db.models.user import DemPreview +from pinta_db.job_db.schema import Schema +from pinta_db.primary_db.models.dem import Dem as PrimaryDem +from pinta_db.primary_db.schema import Schema as PrimarySchema + +from pinta_dags import config +from pinta_dags.config import AirflowVariable +from pinta_dags.tasks import ( + build_job_connection_uri_task, + find_production_area_tile_geometries, + get_database_name, + initialize_dem_tables, + merge_dem_staging_tables, +) + +FROM_DB_SCHEMA = PrimarySchema.DEM.value +FROM_DB_TABLE = PrimaryDem.__tablename__ +TO_DB_SCHEMA = Schema.USER.value +TO_DB_TABLE = DemPreview.__tablename__ + + +def _get_max_parallel_pipelines() -> int: + # Reuses the reference DEM parallelism variable + var = AirflowVariable.CALCULATE_REFERENCE_DEM_MAX_PARALLEL_PIPELINES + max_parallel = int(Variable.get(var, 4)) + if max_parallel < 1: + msg = f"{var} must be at least 1" + raise ValueError(msg) + return max_parallel + + +def _get_staging_tables() -> int: + # Reuses the reference DEM staging tables variable + var = AirflowVariable.CALCULATE_REFERENCE_DEM_STAGING_TABLES + staging_tables = int(Variable.get(var, 1)) + if staging_tables < 0: + msg = f"{var} must be at least 0" + raise ValueError(msg) + return staging_tables + + +def create_initialize_dem_preview_dag( + *, + dag_id: str, +) -> DAG: + @dag( + dag_id=dag_id, + tags=[dag_id], + dag_display_name="Initialize DEM preview", + schedule=None, + params={ + "id": Param( + "", + type="string", + format="uuid", + description=("Production area id as UUID"), + ) + }, + is_paused_upon_creation=False, + ) + def initialize_dem_preview_dag() -> None: + # Precondition: the production area must already have its job database + # provisioned and database_name set for production area by orchestrator DAG. + + @task.docker( + **config.PINTA_CONTAINER_TASK_ARGS, + max_active_tis_per_dag=_get_max_parallel_pipelines(), + ) + def copy_dem_preview( # noqa: PLR0913 + primary_connection_uri: str, + job_connection_uri: str, + tile_wkt: str, + staging_tables: int, + from_schema: str, + from_table: str, + to_schema: str, + to_table: str, + ) -> None: + import sqlalchemy + import sqlmodel + from pinta_processing import pipelines + + with ( + sqlmodel.Session( + sqlalchemy.create_engine(primary_connection_uri) + ) as primary_session, + sqlmodel.Session( + sqlalchemy.create_engine(job_connection_uri) + ) as job_session, + ): + pipeline = pipelines.postgis_to_postgis( + from_session=primary_session, + from_schema=from_schema, + from_table=from_table, + to_session=job_session, + to_schema=to_schema, + to_table=to_table, + tile_wkt=tile_wkt, + staging_tables=staging_tables, + ) + pipeline.execute() + + primary_connection_uri = config.connection_uri_template("pinta_processing_db") + job_connection_uri = config.connection_uri_template("pinta_job_db") + staging_tables = _get_staging_tables() + + prod_area_id = "{{ params.id }}" + database_name = cast( + "str", get_database_name(primary_connection_uri, prod_area_id) + ) + job_db_uri = cast( + "str", + build_job_connection_uri_task( + base_uri=job_connection_uri, + database_name=database_name, + ), + ) + tile_wkt_list = find_production_area_tile_geometries.override( + task_id="find_production_area" + )(primary_connection_uri, prod_area_id) + + initialize_task = initialize_dem_tables( + job_db_uri, TO_DB_SCHEMA, TO_DB_TABLE, staging_tables + ) + copied_tiles = copy_dem_preview.partial( + primary_connection_uri=primary_connection_uri, + job_connection_uri=job_db_uri, + staging_tables=staging_tables, + from_schema=FROM_DB_SCHEMA, + from_table=FROM_DB_TABLE, + to_schema=TO_DB_SCHEMA, + to_table=TO_DB_TABLE, + ).expand(tile_wkt=tile_wkt_list) + ( + tile_wkt_list + >> initialize_task + >> copied_tiles + >> merge_dem_staging_tables( + job_db_uri, TO_DB_SCHEMA, TO_DB_TABLE, staging_tables + ) + ) + + return initialize_dem_preview_dag() + + +DAG_ID = constants.DAG_ID_INITIALIZE_DEM_PREVIEW + +globals()[DAG_ID] = create_initialize_dem_preview_dag(dag_id=DAG_ID) diff --git a/components/dags/src/pinta_dags/tasks.py b/components/dags/src/pinta_dags/tasks.py index 4536758..fd71e3d 100644 --- a/components/dags/src/pinta_dags/tasks.py +++ b/components/dags/src/pinta_dags/tasks.py @@ -33,6 +33,51 @@ def get_database_name( return area_in_db.database_name +@task.docker(**config.PINTA_CONTAINER_TASK_ARGS) +def find_production_area_tile_paths( + connection_uri: str, + production_area_id: str, +) -> list[str]: + """Return the source file paths of the production area's point cloud tiles.""" + import sqlalchemy + import sqlmodel + from pinta_db.primary_db.models.management import ProductionArea + + engine = sqlalchemy.create_engine(connection_uri) + with sqlmodel.Session(engine) as session: + area_in_db = session.exec( + sqlmodel.select(ProductionArea).where( + ProductionArea.id == production_area_id + ) + ).first() + if not area_in_db: + return [] + return [tile.file_path for tile in area_in_db.tiles] + + +@task.docker(**config.PINTA_CONTAINER_TASK_ARGS) +def find_production_area_tile_geometries( + connection_uri: str, + production_area_id: str, +) -> list[str]: + """Return the geometries (as WKT) of the production area's point cloud tiles.""" + import sqlalchemy + import sqlmodel + from geoalchemy2.shape import to_shape + from pinta_db.primary_db.models.management import ProductionArea + + engine = sqlalchemy.create_engine(connection_uri) + with sqlmodel.Session(engine) as session: + area_in_db = session.exec( + sqlmodel.select(ProductionArea).where( + ProductionArea.id == production_area_id + ) + ).first() + if not area_in_db: + return [] + return [to_shape(tile.geom).wkt for tile in area_in_db.tiles] + + @task def build_job_connection_uri_task( base_uri: str, diff --git a/components/dags/test_dags/dags/test_calculate_dem_diff.py b/components/dags/test_dags/dags/test_calculate_dem_diff.py new file mode 100644 index 0000000..e7b4735 --- /dev/null +++ b/components/dags/test_dags/dags/test_calculate_dem_diff.py @@ -0,0 +1,103 @@ +# Copyright (c) 2026 National Land Survey of Finland +# (https://www.maanmittauslaitos.fi/en). +# This file is part of the Pinta. +# Licensed under the MIT License; see the repository LICENSE file. + +import uuid +from typing import TYPE_CHECKING + +import pytest +from airflow.models import DagBag, dagbag + +from pinta_dags.dags import calculate_dem_diff + +if TYPE_CHECKING: + from airflow.sdk import DAG + + +def create_dag_to_test() -> "DAG": + dag = calculate_dem_diff.create_calculate_dem_diff_dag( + dag_id=f"some_id_{uuid.uuid4()}" + ) + + assert str(dag.dag_id).startswith("some_id") + + dag_bag = DagBag(include_examples=False) + dag_bag.bag_dag(dag) + dagbag.sync_bag_to_db(dag_bag, "mock-dags", None) + + return dag + + +@pytest.fixture(autouse=True) +def mock_airflow_settings(monkeypatch: "pytest.MonkeyPatch") -> None: + monkeypatch.setenv("AIRFLOW_CONN_PINTA_PROCESSING_DB", "postgres://mockaddr:123/db") + monkeypatch.setenv("AIRFLOW_CONN_PINTA_JOB_DB_ADMIN", "postgres://mockaddr:123/db") + monkeypatch.setenv("AIRFLOW_CONN_PINTA_JOB_DB", "postgres://mockaddr:123/db") + monkeypatch.setenv( + "AIRFLOW_VAR_PINTA_CALCULATE_DEM_DIFF_MAX_PARALLEL_PIPELINES", "2" + ) + monkeypatch.setenv("AIRFLOW_VAR_PINTA_CALCULATE_DEM_DIFF_STAGING_TABLES", "3") + + +def test_calculate_dem_diff_all_tasks() -> None: + dag = create_dag_to_test() + + assert set(dag.task_ids) == { + "get_database_name", + "build_job_connection_uri_task", + "find_production_area", + "initialize_diff_tables", + "initialize_diff_lte_threshold_tables", + "calculate_dem_diff", + "merge_diff_tables", + "merge_diff_lte_threshold_tables", + "should_cluster", + "cluster_diff_polygons", + } + + +def test_dependencies() -> None: + dag = create_dag_to_test() + assert dag is not None + + get_database_name = dag.get_task("get_database_name") + build_job_connection_uri_task = dag.get_task("build_job_connection_uri_task") + find_production_area = dag.get_task("find_production_area") + init_diff = dag.get_task("initialize_diff_tables") + init_diff_lte = dag.get_task("initialize_diff_lte_threshold_tables") + calculate = dag.get_task("calculate_dem_diff") + merge_diff = dag.get_task("merge_diff_tables") + merge_diff_lte = dag.get_task("merge_diff_lte_threshold_tables") + should_cluster = dag.get_task("should_cluster") + cluster = dag.get_task("cluster_diff_polygons") + + assert get_database_name.task_id in build_job_connection_uri_task.upstream_task_ids + # The diff tiles come from the production area geometries task. + assert find_production_area.task_id in init_diff.upstream_task_ids + assert find_production_area.task_id in init_diff_lte.upstream_task_ids + assert init_diff.task_id in calculate.upstream_task_ids + assert init_diff_lte.task_id in calculate.upstream_task_ids + assert calculate.task_id in merge_diff.upstream_task_ids + assert calculate.task_id in merge_diff_lte.upstream_task_ids + assert merge_diff.task_id in should_cluster.upstream_task_ids + assert merge_diff_lte.task_id in should_cluster.upstream_task_ids + assert should_cluster.task_id in cluster.upstream_task_ids + + +def test_get_max_parallel_pipelines_rejects_below_one( + monkeypatch: "pytest.MonkeyPatch", +) -> None: + monkeypatch.setenv( + "AIRFLOW_VAR_PINTA_CALCULATE_DEM_DIFF_MAX_PARALLEL_PIPELINES", "0" + ) + with pytest.raises(ValueError, match="must be at least 1"): + calculate_dem_diff._get_max_parallel_pipelines() + + +def test_get_staging_tables_rejects_negative( + monkeypatch: "pytest.MonkeyPatch", +) -> None: + monkeypatch.setenv("AIRFLOW_VAR_PINTA_CALCULATE_DEM_DIFF_STAGING_TABLES", "-1") + with pytest.raises(ValueError, match="must be at least 0"): + calculate_dem_diff._get_staging_tables() diff --git a/components/dags/test_dags/dags/test_calculate_rasters_for_production_area.py b/components/dags/test_dags/dags/test_calculate_rasters_for_production_area.py new file mode 100644 index 0000000..eebc5d2 --- /dev/null +++ b/components/dags/test_dags/dags/test_calculate_rasters_for_production_area.py @@ -0,0 +1,102 @@ +# Copyright (c) 2026 National Land Survey of Finland +# (https://www.maanmittauslaitos.fi/en). +# This file is part of the Pinta. +# Licensed under the MIT License; see the repository LICENSE file. + +import uuid +from typing import TYPE_CHECKING + +import pytest +from airflow.models import DagBag, dagbag + +from pinta_dags.dags import calculate_rasters_for_production_area as rasters_dag + +if TYPE_CHECKING: + from airflow.sdk import DAG + + +def create_dag_to_test() -> "DAG": + dag = rasters_dag.create_calculate_rasters_for_production_area_dag( + dag_id=f"some_id_{uuid.uuid4()}" + ) + + assert str(dag.dag_id).startswith("some_id") + + dag_bag = DagBag(include_examples=False) + dag_bag.bag_dag(dag) + dagbag.sync_bag_to_db(dag_bag, "mock-dags", None) + + return dag + + +@pytest.fixture(autouse=True) +def mock_airflow_settings(monkeypatch: "pytest.MonkeyPatch") -> None: + monkeypatch.setenv("AIRFLOW_CONN_PINTA_PROCESSING_DB", "postgres://mockaddr:123/db") + monkeypatch.setenv("AIRFLOW_CONN_PINTA_JOB_DB_ADMIN", "postgres://mockaddr:123/db") + monkeypatch.setenv("AIRFLOW_CONN_PINTA_JOB_DB", "postgres://mockaddr:123/db") + + +def test_all_tasks() -> None: + dag = create_dag_to_test() + + assert set(dag.task_ids) == { + "ensure_job_database", + "should_calculate_reference_dem", + "should_calculate_dem_diff", + "should_initialize_dem_preview", + "trigger_calculate_reference_dem", + "trigger_calculate_dem_diff", + "trigger_initialize_dem_preview", + "set_processing_status_completed", + "set_processing_status_failed", + } + + +def test_initialize_dem_preview_param_defaults_true() -> None: + dag = create_dag_to_test() + + assert dag.params["initialize_dem_preview"] is True + + +def test_dem_preview_runs_in_parallel_off_ensure_database() -> None: + # The DEM preview trigger is gated by its own short-circuit and hangs + # directly off ensure_database, independent of the reference DEM -> diff + # chain. + dag = create_dag_to_test() + + ensure_database = dag.get_task("ensure_job_database") + preview_gate = dag.get_task("should_initialize_dem_preview") + trigger_preview = dag.get_task("trigger_initialize_dem_preview") + + assert ensure_database.task_id in preview_gate.upstream_task_ids + assert preview_gate.task_id in trigger_preview.upstream_task_ids + # Not chained behind the reference DEM / diff triggers. + assert "trigger_calculate_reference_dem" not in trigger_preview.upstream_task_ids + assert "trigger_calculate_dem_diff" not in trigger_preview.upstream_task_ids + + +def test_trigger_operators_poll_frequently_for_completion() -> None: + # The waiting triggers must poll well below the 60 s default, otherwise the + # orchestration sits idle for up to a minute after each child DAG finishes + # and the end-to-end run times out. + dag = create_dag_to_test() + + for task_id in ( + "trigger_calculate_reference_dem", + "trigger_calculate_dem_diff", + "trigger_initialize_dem_preview", + ): + trigger = dag.get_task(task_id) + assert trigger.wait_for_completion is True + assert trigger.poke_interval <= 10 + + +def test_status_tasks_wait_for_dem_preview_trigger() -> None: + dag = create_dag_to_test() + + trigger_preview = dag.get_task("trigger_initialize_dem_preview") + status_completed = dag.get_task("set_processing_status_completed") + status_failed = dag.get_task("set_processing_status_failed") + + assert trigger_preview.task_id in status_completed.upstream_task_ids + assert trigger_preview.task_id in status_failed.upstream_task_ids diff --git a/components/dags/test_dags/dags/test_initialize_dem_preview.py b/components/dags/test_dags/dags/test_initialize_dem_preview.py new file mode 100644 index 0000000..f648660 --- /dev/null +++ b/components/dags/test_dags/dags/test_initialize_dem_preview.py @@ -0,0 +1,149 @@ +# Copyright (c) 2026 National Land Survey of Finland +# (https://www.maanmittauslaitos.fi/en). +# This file is part of the Pinta. +# Licensed under the MIT License; see the repository LICENSE file. + +import uuid +from typing import TYPE_CHECKING +from unittest.mock import MagicMock + +import pytest +from airflow.models import DagBag, dagbag + +from pinta_dags.dags import initialize_dem_preview + +if TYPE_CHECKING: + from airflow.sdk import DAG + from pytest_mock import MockerFixture + + +def create_dag_to_test() -> "DAG": + dag = initialize_dem_preview.create_initialize_dem_preview_dag( + dag_id=f"some_id_{uuid.uuid4()}" + ) + + assert str(dag.dag_id).startswith("some_id") + + dag_bag = DagBag(include_examples=False) + dag_bag.bag_dag(dag) + dagbag.sync_bag_to_db(dag_bag, "mock-dags", None) + + return dag + + +@pytest.fixture(autouse=True) +def mock_airflow_settings(monkeypatch: "pytest.MonkeyPatch") -> None: + monkeypatch.setenv("AIRFLOW_CONN_PINTA_PROCESSING_DB", "postgres://mockaddr:123/db") + monkeypatch.setenv("AIRFLOW_CONN_PINTA_JOB_DB_ADMIN", "postgres://mockaddr:123/db") + monkeypatch.setenv("AIRFLOW_CONN_PINTA_JOB_DB", "postgres://mockaddr:123/db") + monkeypatch.setenv( + "AIRFLOW_VAR_PINTA_CALCULATE_REFERENCE_DEM_MAX_PARALLEL_PIPELINES", "2" + ) + monkeypatch.setenv("AIRFLOW_VAR_PINTA_CALCULATE_REFERENCE_DEM_STAGING_TABLES", "3") + + +def test_initialize_dem_preview_all_tasks() -> None: + dag = create_dag_to_test() + + assert set(dag.task_ids) == { + "get_database_name", + "build_job_connection_uri_task", + "find_production_area", + "initialize_dem_tables", + "copy_dem_preview", + "merge_dem_staging_tables", + } + + +def test_dependencies() -> None: + dag = create_dag_to_test() + assert dag is not None + + get_database_name = dag.get_task("get_database_name") + build_job_connection_uri_task = dag.get_task("build_job_connection_uri_task") + find_production_area = dag.get_task("find_production_area") + initialize = dag.get_task("initialize_dem_tables") + copy_dem_preview = dag.get_task("copy_dem_preview") + merge_dem_staging_tables = dag.get_task("merge_dem_staging_tables") + + assert get_database_name.task_id in build_job_connection_uri_task.upstream_task_ids + assert find_production_area.task_id in initialize.upstream_task_ids + assert initialize.task_id in copy_dem_preview.upstream_task_ids + assert copy_dem_preview.task_id in merge_dem_staging_tables.upstream_task_ids + + +def test_copies_from_primary_dem_to_dem_preview() -> None: + # The preview copy reads the primary DEM table and writes the job database + # DEM preview table. + assert initialize_dem_preview.FROM_DB_SCHEMA == "dem" + assert initialize_dem_preview.FROM_DB_TABLE == "dem" + assert initialize_dem_preview.TO_DB_SCHEMA == "user_data" + assert initialize_dem_preview.TO_DB_TABLE == "dem_preview" + + +def test_get_max_parallel_pipelines_reads_variable() -> None: + assert initialize_dem_preview._get_max_parallel_pipelines() == 2 + + +def test_get_max_parallel_pipelines_rejects_below_one( + monkeypatch: "pytest.MonkeyPatch", +) -> None: + monkeypatch.setenv( + "AIRFLOW_VAR_PINTA_CALCULATE_REFERENCE_DEM_MAX_PARALLEL_PIPELINES", "0" + ) + with pytest.raises(ValueError, match="must be at least 1"): + initialize_dem_preview._get_max_parallel_pipelines() + + +def test_get_staging_tables_reads_variable() -> None: + assert initialize_dem_preview._get_staging_tables() == 3 + + +def test_get_staging_tables_rejects_negative( + monkeypatch: "pytest.MonkeyPatch", +) -> None: + monkeypatch.setenv("AIRFLOW_VAR_PINTA_CALCULATE_REFERENCE_DEM_STAGING_TABLES", "-1") + with pytest.raises(ValueError, match="must be at least 0"): + initialize_dem_preview._get_staging_tables() + + +def test_copy_dem_preview_builds_and_executes_pipeline( + mocker: "MockerFixture", +) -> None: + mocker.patch("sqlalchemy.create_engine") + mocker.patch("sqlmodel.Session") + # Inject a mock pipelines module so the task body's ``from pinta_processing + # import pipelines`` resolves to the mock instead of importing the real + # module (which would load heavy DB modules and leak into the sys.modules + # patching other DAG tests rely on). + mock_pipeline = MagicMock() + mock_pipelines_module = MagicMock() + mock_pipelines_module.postgis_to_postgis.return_value = mock_pipeline + mocker.patch.dict( + "sys.modules", + {"pinta_processing.pipelines": mock_pipelines_module}, + ) + + dag = create_dag_to_test() + copy_dem_preview = dag.get_task("copy_dem_preview").python_callable + + copy_dem_preview( + primary_connection_uri="postgres://primary", + job_connection_uri="postgres://job", + tile_wkt="POINT (0 0)", + staging_tables=3, + from_schema="dem", + from_table="dem", + to_schema="user_data", + to_table="dem_preview", + ) + + mock_pipelines_module.postgis_to_postgis.assert_called_once() + kwargs = mock_pipelines_module.postgis_to_postgis.call_args.kwargs + assert kwargs["from_schema"] == "dem" + assert kwargs["from_table"] == "dem" + assert kwargs["to_schema"] == "user_data" + assert kwargs["to_table"] == "dem_preview" + assert kwargs["tile_wkt"] == "POINT (0 0)" + assert kwargs["staging_tables"] == 3 + mock_pipeline.execute.assert_called_once_with() diff --git a/components/dags/test_dags/dags/test_tasks.py b/components/dags/test_dags/dags/test_tasks.py new file mode 100644 index 0000000..fce5977 --- /dev/null +++ b/components/dags/test_dags/dags/test_tasks.py @@ -0,0 +1,110 @@ +# Copyright (c) 2026 National Land Survey of Finland +# (https://www.maanmittauslaitos.fi/en). +# This file is part of the Pinta. +# Licensed under the MIT License; see the repository LICENSE file. + +from typing import TYPE_CHECKING +from unittest.mock import MagicMock + +import pytest + +from pinta_dags import tasks + +if TYPE_CHECKING: + from pytest_mock import MockerFixture + + +@pytest.fixture +def mock_session(mocker: "MockerFixture") -> MagicMock: + """Patch the engine/session so task bodies run without a real database.""" + mocker.patch("sqlalchemy.create_engine") + session = MagicMock() + session_ctx = mocker.patch("sqlmodel.Session") + session_ctx.return_value.__enter__.return_value = session + return session + + +def test_get_database_name_returns_database_name(mock_session: MagicMock) -> None: + area = MagicMock(database_name="job_db") + mock_session.exec.return_value.first.return_value = area + + result = tasks.get_database_name.function( + "postgres://mock/db", "some-production-area-id" + ) + + assert result == "job_db" + + +@pytest.mark.parametrize("area", [None, MagicMock(database_name=None)]) +def test_get_database_name_raises_without_database_name( + mock_session: MagicMock, area: object +) -> None: + mock_session.exec.return_value.first.return_value = area + + with pytest.raises(ValueError, match="no database name set"): + tasks.get_database_name.function("postgres://mock/db", "prod-area-id") + + +def test_find_production_area_tile_paths_returns_file_paths( + mock_session: MagicMock, +) -> None: + area = MagicMock( + tiles=[MagicMock(file_path="/a.laz"), MagicMock(file_path="/b.laz")] + ) + mock_session.exec.return_value.first.return_value = area + + result = tasks.find_production_area_tile_paths.function( + "postgres://mock/db", "prod-area-id" + ) + + assert result == ["/a.laz", "/b.laz"] + + +def test_find_production_area_tile_paths_missing_area_returns_empty( + mock_session: MagicMock, +) -> None: + mock_session.exec.return_value.first.return_value = None + + result = tasks.find_production_area_tile_paths.function( + "postgres://mock/db", "prod-area-id" + ) + + assert result == [] + + +def test_find_production_area_tile_geometries_returns_wkt( + mock_session: MagicMock, mocker: "MockerFixture" +) -> None: + mocker.patch( + "geoalchemy2.shape.to_shape", + side_effect=[MagicMock(wkt="POINT (0 0)"), MagicMock(wkt="POINT (1 1)")], + ) + area = MagicMock(tiles=[MagicMock(), MagicMock()]) + mock_session.exec.return_value.first.return_value = area + + result = tasks.find_production_area_tile_geometries.function( + "postgres://mock/db", "prod-area-id" + ) + + assert result == ["POINT (0 0)", "POINT (1 1)"] + + +def test_find_production_area_tile_geometries_missing_area_returns_empty( + mock_session: MagicMock, +) -> None: + mock_session.exec.return_value.first.return_value = None + + result = tasks.find_production_area_tile_geometries.function( + "postgres://mock/db", "prod-area-id" + ) + + assert result == [] + + +def test_build_job_connection_uri_task_replaces_database_name() -> None: + result = tasks.build_job_connection_uri_task.function( + base_uri="postgresql://user:pass@host:1234/template_db", + database_name="job_test", + ) + + assert result == "postgresql://user:pass@host:1234/job_test" diff --git a/components/db/migrations/job/versions/008_add_dem_preview.py b/components/db/migrations/job/versions/008_add_dem_preview.py new file mode 100644 index 0000000..002837a --- /dev/null +++ b/components/db/migrations/job/versions/008_add_dem_preview.py @@ -0,0 +1,56 @@ +# Copyright (c) 2026 National Land Survey of Finland +# (https://www.maanmittauslaitos.fi/en). +# This file is part of the Pinta. +# Licensed under the MIT License; see the repository LICENSE file. + +"""Add dem preview tables + +Revision ID: 008 +Revises: 007 +Create Date: 2026-07-02 + +""" + +from collections.abc import Sequence + +from alembic import op + +from migrations import _schema_op +from pinta_common import Settings + +# revision identifiers, used by Alembic. +revision: str = "008" +down_revision: str | Sequence[str] | None = "007" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + + +_OVERVIEW_FACTORS = (2, 8, 128) +_BASE_TABLE = "dem_preview" + + +def upgrade() -> None: + """Upgrade schema.""" + srid = int(Settings.DB_SRID) + pixel_size = Settings.DB_DEM_PIXEL_SIZE + nodata = Settings.DB_DEM_NODATA + + _schema_op.create_raster_table( + _BASE_TABLE, pixel_size, nodata, srid, schema="user_data" + ) + for factor in _OVERVIEW_FACTORS: + table = f"o_{factor}_{_BASE_TABLE}" + _schema_op.create_raster_table( + table, pixel_size * factor, nodata, srid, schema="user_data" + ) + _schema_op.add_overview_constraints("user_data", table, factor, _BASE_TABLE) + + +def downgrade() -> None: + """Downgrade schema.""" + op.drop_index(f"idx_{_BASE_TABLE}_rast", table_name=_BASE_TABLE, schema="user_data") + op.drop_geospatial_table(_BASE_TABLE, schema="user_data") + for factor in reversed(_OVERVIEW_FACTORS): + table = f"o_{factor}_{_BASE_TABLE}" + op.drop_index(f"idx_{table}_rast", table_name=table, schema="user_data") + op.drop_geospatial_table(table, schema="user_data") diff --git a/components/db/migrations/job/versions/009_add_worker_user_schema_permissions.py b/components/db/migrations/job/versions/009_add_worker_user_schema_permissions.py new file mode 100644 index 0000000..98d05b2 --- /dev/null +++ b/components/db/migrations/job/versions/009_add_worker_user_schema_permissions.py @@ -0,0 +1,74 @@ +# Copyright (c) 2026 National Land Survey of Finland +# (https://www.maanmittauslaitos.fi/en). +# This file is part of the Pinta. +# Licensed under the MIT License; see the repository LICENSE file. + +"""Add permissions for worker in user schema + +Revision ID: 009 +Revises: 008 +Create Date: 2026-07-02 + +""" + +import os +from collections.abc import Sequence + +from alembic import op + +from migrations import _schema_op as schema_op + +# revision identifiers, used by Alembic. +revision: str = "009" +down_revision: str | Sequence[str] | None = "008" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + +OWNER = os.environ["DB_JOB_OWNER_ROLE"] +WRITER = os.environ["DB_JOB_WRITER_ROLE"] +READER = os.environ["DB_JOB_READER_ROLE"] +PROCESSING_WORKER = os.environ["DB_JOB_PROCESSING_WORKER_ROLE"] +DB_NAME = os.environ["DB_JOB_TEMPLATE_NAME"] + +REFERENCE_SCHEMA = "reference" +USER_SCHEMA = "user_data" + + +def upgrade() -> None: + """Upgrade schema.""" + schema_op.grant_privileges_on_schema( + schema=USER_SCHEMA, role=PROCESSING_WORKER, privileges=("USAGE", "CREATE") + ) + schema_op.grant_default_privileges_on_tables_in_schema( + schema=USER_SCHEMA, + schema_owner=OWNER, + role=PROCESSING_WORKER, + privileges=("SELECT", "INSERT", "UPDATE", "DELETE", "TRUNCATE"), + ) + schema_op.grant_default_privileges_on_sequences_in_schema( + schema=USER_SCHEMA, + schema_owner=OWNER, + role=PROCESSING_WORKER, + privileges=("SELECT", "USAGE"), + ) + op.execute( + f"GRANT USAGE, SELECT ON ALL SEQUENCES IN SCHEMA user_data" + f" TO {PROCESSING_WORKER}" + ) + + +def downgrade() -> None: + """Downgrade schema.""" + schema_op.revoke_privileges_on_schema( + schema=USER_SCHEMA, role=PROCESSING_WORKER, privileges=("USAGE", "CREATE") + ) + schema_op.revoke_default_privileges_on_tables_in_schema( + schema=USER_SCHEMA, + schema_owner=OWNER, + role=PROCESSING_WORKER, + privileges=("SELECT", "INSERT", "UPDATE", "DELETE", "TRUNCATE"), + ) + op.execute( + f"REVOKE USAGE, SELECT ON ALL SEQUENCES IN SCHEMA user_data" # noqa: S608 + f" FROM {PROCESSING_WORKER}" + ) diff --git a/components/db/src/pinta_common/constants.py b/components/db/src/pinta_common/constants.py index c2028e3..4dc3bfa 100644 --- a/components/db/src/pinta_common/constants.py +++ b/components/db/src/pinta_common/constants.py @@ -9,4 +9,5 @@ DAG_ID_PROCESS_PRODUCTION_AREAS = "process_production_areas" DAG_ID_CALCULATE_REFERENCE_DEM = "calculate_reference_dem" DAG_ID_CALCULATE_DEM_DIFF = "calculate_dem_diff" +DAG_ID_INITIALIZE_DEM_PREVIEW = "initialize_dem_preview" DAG_ID_CALCULATE_RASTERS_FOR_PRODUCTION_AREA = "calculate_rasters_for_production_area" diff --git a/components/db/src/pinta_db/job_db/models/base.py b/components/db/src/pinta_db/job_db/models/base.py index e07afd1..eaff65b 100644 --- a/components/db/src/pinta_db/job_db/models/base.py +++ b/components/db/src/pinta_db/job_db/models/base.py @@ -22,4 +22,8 @@ class UserBase(BaseJobDb): __table_args__ = {"schema": Schema.USER.value} # noqa: RUF012 + +class UserVectorBase(UserBase): + """Base model for tables in user schema.""" + id: uuid.UUID = Field(default_factory=uuid.uuid4, primary_key=True) diff --git a/components/db/src/pinta_db/job_db/models/user.py b/components/db/src/pinta_db/job_db/models/user.py index e540730..b7afc8f 100644 --- a/components/db/src/pinta_db/job_db/models/user.py +++ b/components/db/src/pinta_db/job_db/models/user.py @@ -10,13 +10,30 @@ from sqlmodel import Field from pinta_common import Settings +from pinta_db.common import base from pinta_db.constants import POLYGON -from pinta_db.job_db.models.base import UserBase +from pinta_db.job_db.models import base as job_base -class UpdateArea(UserBase, table=True): # type: ignore[call-arg] +class UpdateArea(job_base.UserVectorBase, table=True): # type: ignore[call-arg] """Final update area.""" geom: Any = Field( sa_column=Column(Geometry(POLYGON, srid=Settings.DB_SRID, nullable=False)) ) + + +class DemPreview(job_base.UserBase, base.RasterBase, table=True): # type: ignore[call-arg] + """Reference raster.""" + + +class O2DemPreview(job_base.UserBase, base.RasterBase, table=True): # type: ignore[call-arg] + """Overview factor 2.""" + + +class O8DemPreview(job_base.UserBase, base.RasterBase, table=True): # type: ignore[call-arg] + """Overview factor 8.""" + + +class O128DemPreview(job_base.UserBase, base.RasterBase, table=True): # type: ignore[call-arg] + """Overview factor 128.""" diff --git a/components/e2e/pyproject.toml b/components/e2e/pyproject.toml index b403368..5182c3a 100644 --- a/components/e2e/pyproject.toml +++ b/components/e2e/pyproject.toml @@ -62,5 +62,5 @@ testpaths = [ "test_e2e", ] addopts = "-n auto --dist loadgroup --maxprocesses=4 --import-mode=importlib" -timeout = 180 +timeout = 600 timeout_method = "thread" diff --git a/components/e2e/test_e2e/test_dem_workflows.py b/components/e2e/test_e2e/test_dem_workflows.py index 0b59ca3..9a6bb0e 100644 --- a/components/e2e/test_e2e/test_dem_workflows.py +++ b/components/e2e/test_e2e/test_dem_workflows.py @@ -37,16 +37,19 @@ def _get_production_area(db: "Session") -> ProductionArea: return production_area -def _count_diff_rasters(database_name: str) -> int: - """Count the diff raster tiles written to the job database.""" +def _count_rows(database_name: str, *tables: str) -> int: + """Count the total rows across the given ``schema.table`` names in the job db.""" + counts = " + ".join(f"(SELECT count(*) FROM {table})" for table in tables) credentials = db_utils.get_job_admin_credentials(database_name) with engine_utils.get_autocommit_connection(credentials) as connection: - return connection.execute( - sqlmodel.text( - "SELECT (SELECT count(*) FROM reference.diff_gt_threshold) " - "+ (SELECT count(*) FROM reference.diff_lte_threshold)" - ) - ).scalar_one() + return connection.execute(sqlmodel.text(f"SELECT {counts}")).scalar_one() + + +def _count_diff_rasters(database_name: str) -> int: + """Count the diff raster tiles written to the job database.""" + return _count_rows( + database_name, "reference.diff_gt_threshold", "reference.diff_lte_threshold" + ) @pytest.mark.xdist_group("airflow") @@ -110,6 +113,9 @@ def check_state(statuses: list[str]) -> None: assert layers.get_vector_layer_by_model(reference.DiffPolygon) assert _count_diff_rasters(completed_feature["database_name"]) > 0 + # The DEM preview copy runs in parallel and populates user_data.dem_preview. + assert _count_rows(completed_feature["database_name"], "user_data.dem_preview") > 0 + cluster_layer = layers.get_vector_layer_by_model(reference.DiffPolygonCluster) assert cluster_layer.setSubsetString("") assert cluster_layer.featureCount() > 0 @@ -130,6 +136,8 @@ def test_calculate_rasters_reference_dem_only( "id": str(production_area.id), "calculate_reference_dem": True, "calculate_dem_diff": False, + "cluster_diff_polygons": False, + "initialize_dem_preview": False, }, ) state = airflow_client.wait_for_dag_run(run, timeout=WORKFLOW_TIMEOUT_S) diff --git a/components/processing/src/pinta_processing/pipelines.py b/components/processing/src/pinta_processing/pipelines.py index 63c67db..25d02ec 100644 --- a/components/processing/src/pinta_processing/pipelines.py +++ b/components/processing/src/pinta_processing/pipelines.py @@ -172,6 +172,29 @@ def calculate_diff_models( ) +def postgis_to_postgis( # noqa: PLR0913 + from_session: Session, + from_schema: str, + from_table: str, + to_session: Session, + to_schema: str, + to_table: str, + tile_wkt: str, + staging_tables: int = 0, +) -> core.Pipeline: + """Read raster from Postgis, write to Postgis.""" + return ( + reader.PostgisReader( + from_schema, + from_table, + from_session, + tile_wkt, + ) + | _generate_overview_stages(to_schema, to_table, to_session, staging_tables) + | writer.RasterPostgisWriter(to_schema, to_table, to_session, staging_tables) + ) + + def _generate_overview_stages( schema: str, table_name: str, diff --git a/components/processing/test_integration_processing/pipelines/test_postgis_to_postgis_pipeline.py b/components/processing/test_integration_processing/pipelines/test_postgis_to_postgis_pipeline.py new file mode 100644 index 0000000..b33a507 --- /dev/null +++ b/components/processing/test_integration_processing/pipelines/test_postgis_to_postgis_pipeline.py @@ -0,0 +1,138 @@ +# Copyright (c) 2026 National Land Survey of Finland +# (https://www.maanmittauslaitos.fi/en). +# This file is part of the Pinta. +# Licensed under the MIT License; see the repository LICENSE file. + +import tempfile +import typing +from pathlib import Path + +import numpy as np +import rasterio +import sqlalchemy as sa +from pinta_db.job_db.models import user +from pinta_db.primary_db.models import dem +from pinta_db_utils import model_utils +from pinta_db_utils.postgis import raster +from pinta_test_utils import pinta_utils + +from pinta_processing import pipelines, reader, writer + +if typing.TYPE_CHECKING: + from sqlmodel import Session + +_SOURCE_SCHEMA, _SOURCE_TABLE = model_utils.schema_and_table(dem.Dem) +_TARGET_SCHEMA, _TARGET_TABLE = model_utils.schema_and_table(user.DemPreview) + + +def test_postgis_to_postgis_copies_source_raster_to_target( + admin_primary_session: "Session", + session: "Session", + processing_worker_session: "Session", +) -> None: + """The pipeline copies the source raster into the (empty) target table.""" + bounds = _populate_source(admin_primary_session) + _init_target(processing_worker_session) + + pipelines.postgis_to_postgis( + from_session=session, + from_schema=_SOURCE_SCHEMA, + from_table=_SOURCE_TABLE, + to_session=processing_worker_session, + to_schema=_TARGET_SCHEMA, + to_table=_TARGET_TABLE, + tile_wkt=_bounds_to_wkt(*bounds), + ).execute() + + source_array = _read_back(session, _SOURCE_SCHEMA, _SOURCE_TABLE) + target_array = _read_back(processing_worker_session, _TARGET_SCHEMA, _TARGET_TABLE) + assert target_array is not None + assert source_array is not None + assert target_array.shape == source_array.shape + assert np.array_equal(target_array, source_array) + + +def test_postgis_to_postgis_writes_overview_tables( + admin_primary_session: "Session", + session: "Session", + processing_worker_session: "Session", +) -> None: + """The pipeline also fills the target overview tables, downsampled per level.""" + bounds = _populate_source(admin_primary_session) + _init_target(processing_worker_session) + + pipelines.postgis_to_postgis( + from_session=session, + from_schema=_SOURCE_SCHEMA, + from_table=_SOURCE_TABLE, + to_session=processing_worker_session, + to_schema=_TARGET_SCHEMA, + to_table=_TARGET_TABLE, + tile_wkt=_bounds_to_wkt(*bounds), + ).execute() + + for level in raster.DEFAULT_OVERVIEW_LEVELS: + overview_table = raster.OVERVIEW_TABLE_NAME.format( + level=level, table_name=_TARGET_TABLE + ) + count = processing_worker_session.exec( # type: ignore[call-overload] + sa.text(f"SELECT COUNT(*) FROM {_TARGET_SCHEMA}.{overview_table}") + ).first()[0] + assert count > 0, f"Expected overview level {level} to have rows" + + +def _populate_source( + admin_primary_session: "Session", +) -> tuple[float, float, float, float]: + """Write the test DEM into the source table and return its bounds.""" + raster.initialize_raster_table(admin_primary_session, _SOURCE_SCHEMA, _SOURCE_TABLE) + file_path = pinta_utils.get_test_data_path("processing/dem.tif") + + ( + reader.RasterioReader(str(file_path)) + | writer.RasterPostgisWriter( + _SOURCE_SCHEMA, _SOURCE_TABLE, admin_primary_session + ) + ).execute() + + with rasterio.open(str(file_path)) as src: + return (src.bounds.left, src.bounds.bottom, src.bounds.right, src.bounds.top) + + +def _init_target(processing_worker_session: "Session") -> None: + """Ensure the target raster table and its overview tables exist and are empty.""" + raster.initialize_raster_table( + processing_worker_session, _TARGET_SCHEMA, _TARGET_TABLE, staging_tables=0 + ) + raster.initialize_overview_tables( + processing_worker_session, _TARGET_SCHEMA, _TARGET_TABLE, staging_tables=0 + ) + + +def _read_back(session: "Session", schema: str, table: str) -> np.ndarray | None: + """Merge all tiles of a raster table back into a single array via GDAL.""" + raster_binary = session.exec( # type: ignore[call-overload] + sa.text( + f"SELECT ST_AsGDALRaster(ST_Union(rast), 'GTiff') FROM {schema}.{table}" + ) + ).first()[0] + if raster_binary is None: + return None + + with tempfile.TemporaryDirectory() as tmp: + output_path = Path(tmp) / "target.tif" + output_path.write_bytes(raster_binary) + with rasterio.open(str(output_path)) as src: + return src.read(1) + + +def _bounds_to_wkt(left: float, bottom: float, right: float, top: float) -> str: + return ( + "POLYGON((" + f"{left} {bottom}, " + f"{right} {bottom}, " + f"{right} {top}, " + f"{left} {top}, " + f"{left} {bottom}" + "))" + ) diff --git a/components/qgis_plugin/src/pinta_qgis_plugin/api/api_client.py b/components/qgis_plugin/src/pinta_qgis_plugin/api/api_client.py index 7ff9492..59c40ab 100644 --- a/components/qgis_plugin/src/pinta_qgis_plugin/api/api_client.py +++ b/components/qgis_plugin/src/pinta_qgis_plugin/api/api_client.py @@ -98,6 +98,7 @@ def start_reference_dem_workflow(self, production_area_id: str) -> None: "calculate_reference_dem": True, "calculate_dem_diff": True, "cluster_diff_polygons": True, + "initialize_dem_preview": True, }, production_area_id=production_area_id, ) diff --git a/components/qgis_plugin/src/pinta_qgis_plugin/project/config/job_layers.py b/components/qgis_plugin/src/pinta_qgis_plugin/project/config/job_layers.py index 6b02e38..a2d6ff4 100644 --- a/components/qgis_plugin/src/pinta_qgis_plugin/project/config/job_layers.py +++ b/components/qgis_plugin/src/pinta_qgis_plugin/project/config/job_layers.py @@ -48,6 +48,14 @@ style_path=_STYLES_PATH / "raster_diff.qml", visible_initially=False, ), + config.RasterLayerConfig( + schema="user_data", + table_name="dem_preview", + layer_name=tr("DEM preview"), + layer_id="dem_preview", + style_path=_STYLES_PATH / "elevation_model.qml", + visible_initially=False, + ), config.VectorLayerConfig( schema="reference", table_name="diff_polygon", diff --git a/components/qgis_plugin/src/pinta_qgis_plugin/resources/i18n/fi.qm b/components/qgis_plugin/src/pinta_qgis_plugin/resources/i18n/fi.qm index bf4db89..a6dc05b 100644 Binary files a/components/qgis_plugin/src/pinta_qgis_plugin/resources/i18n/fi.qm and b/components/qgis_plugin/src/pinta_qgis_plugin/resources/i18n/fi.qm differ diff --git a/components/qgis_plugin/src/pinta_qgis_plugin/resources/i18n/fi.ts b/components/qgis_plugin/src/pinta_qgis_plugin/resources/i18n/fi.ts index 5d41b7d..c37d2b4 100644 --- a/components/qgis_plugin/src/pinta_qgis_plugin/resources/i18n/fi.ts +++ b/components/qgis_plugin/src/pinta_qgis_plugin/resources/i18n/fi.ts @@ -89,17 +89,17 @@ Erotusmalli, yli 20cm eroavaisuudet - + Polygonized DEM difference Muutospolygonit - + Modification area suggestions Muutosalue-ehdotukset - + Cluster significance Klusterin merkittävyys @@ -179,29 +179,34 @@ Ajon tilan viimeinen muutosaika - + Energy sum Energiasumma - + Relevance score Merkittävyys - + Cluster area Klusterin pinta-ala - + Update area Muutosalue - + Elevation Korkeus + + + DEM preview + Tuloskorkeusmalli + diff --git a/components/qgis_plugin/test_qgis/processing/test_api_client.py b/components/qgis_plugin/test_qgis/processing/test_api_client.py index 771182a..702eb18 100644 --- a/components/qgis_plugin/test_qgis/processing/test_api_client.py +++ b/components/qgis_plugin/test_qgis/processing/test_api_client.py @@ -82,6 +82,7 @@ def test_start_reference_dem_workflow_posts_workflow_payload( "calculate_reference_dem": True, "calculate_dem_diff": True, "cluster_diff_polygons": True, + "initialize_dem_preview": True, }, "production_area_id": "area-1", },