diff --git a/components/dags/src/pinta_dags/dags/calculate_dem_diff.py b/components/dags/src/pinta_dags/dags/calculate_dem_diff.py
index ec660f2..ca0935a 100644
--- a/components/dags/src/pinta_dags/dags/calculate_dem_diff.py
+++ b/components/dags/src/pinta_dags/dags/calculate_dem_diff.py
@@ -14,6 +14,7 @@
from pinta_dags.config import AirflowVariable
from pinta_dags.tasks import (
build_job_connection_uri_task,
+ find_production_area_tile_geometries,
get_database_name,
initialize_dem_tables,
merge_dem_staging_tables,
@@ -75,26 +76,6 @@ def calculate_dem_diff_dag() -> None:
# Precondition: the production area must already have its job database
# provisioned and database_name set for production area by orchestrator DAG.
- @task.docker(**config.PINTA_CONTAINER_TASK_ARGS)
- def find_production_area(
- connection_uri: str,
- production_area_id: str,
- ) -> list[str]:
- import sqlalchemy
- import sqlmodel
- from geoalchemy2.shape import to_shape
- from pinta_db.primary_db.models.management import ProductionArea
-
- engine = sqlalchemy.create_engine(connection_uri)
- with sqlmodel.Session(engine) as session:
- statement = sqlmodel.select(ProductionArea).where(
- ProductionArea.id == production_area_id
- )
- area_in_db = session.exec(statement).first()
- if not area_in_db:
- return []
- return [to_shape(tile.geom).wkt for tile in area_in_db.tiles]
-
@task.docker(
**config.PINTA_CONTAINER_TASK_ARGS,
max_active_tis_per_dag=_get_max_parallel_pipelines(),
@@ -155,7 +136,9 @@ def cluster_diff_polygons(job_connection_uri: str) -> None:
database_name=database_name,
),
)
- tile_wkt_list = find_production_area(primary_connection_uri, prod_area_id)
+ tile_wkt_list = find_production_area_tile_geometries.override(
+ task_id="find_production_area"
+ )(primary_connection_uri, prod_area_id)
init_diff_task = initialize_dem_tables.override(
task_id="initialize_diff_tables"
diff --git a/components/dags/src/pinta_dags/dags/calculate_rasters_for_production_area.py b/components/dags/src/pinta_dags/dags/calculate_rasters_for_production_area.py
index 2c40712..7eabd49 100644
--- a/components/dags/src/pinta_dags/dags/calculate_rasters_for_production_area.py
+++ b/components/dags/src/pinta_dags/dags/calculate_rasters_for_production_area.py
@@ -13,6 +13,9 @@
from pinta_dags import config
+# How often each triggered child DAG is polled for completion.
+_TRIGGER_POKE_INTERVAL_SECONDS = 5
+
def create_calculate_rasters_for_production_area_dag( # noqa: PLR0915
*,
@@ -47,6 +50,11 @@ def create_calculate_rasters_for_production_area_dag( # noqa: PLR0915
type="boolean",
description="Cluster difference polygons in the DEM diff DAG",
),
+ "initialize_dem_preview": Param(
+ default=True,
+ type="boolean",
+ description="Initialize DEM preview table",
+ ),
},
is_paused_upon_creation=False,
)
@@ -105,6 +113,7 @@ def should_run(*, requested: bool) -> bool:
trigger_dag_id=constants.DAG_ID_CALCULATE_REFERENCE_DEM,
conf={"id": "{{ params.id }}"},
wait_for_completion=True,
+ poke_interval=_TRIGGER_POKE_INTERVAL_SECONDS,
)
trigger_calculate_dem_diff = TriggerDagRunOperator(
@@ -115,6 +124,15 @@ def should_run(*, requested: bool) -> bool:
"cluster": "{{ params.cluster_diff_polygons }}",
},
wait_for_completion=True,
+ poke_interval=_TRIGGER_POKE_INTERVAL_SECONDS,
+ )
+
+ trigger_initialize_dem_preview = TriggerDagRunOperator(
+ task_id="trigger_initialize_dem_preview",
+ trigger_dag_id=constants.DAG_ID_INITIALIZE_DEM_PREVIEW,
+ conf={"id": "{{ params.id }}"},
+ wait_for_completion=True,
+ poke_interval=_TRIGGER_POKE_INTERVAL_SECONDS,
)
@task.docker(
@@ -184,8 +202,17 @@ def set_processing_status_failed(
task_id="should_calculate_dem_diff",
trigger_rule=TriggerRule.NONE_FAILED,
)(requested=cast("bool", "{{ params.calculate_dem_diff }}"))
-
- triggers = [trigger_calculate_reference_dem, trigger_calculate_dem_diff]
+ # The DEM preview copy is independent of the reference DEM -> DEM diff
+ # chain, so it runs in parallel straight off ensure_database.
+ preview_gate = should_run.override(
+ task_id="should_initialize_dem_preview",
+ )(requested=cast("bool", "{{ params.initialize_dem_preview }}"))
+
+ triggers = [
+ trigger_calculate_reference_dem,
+ trigger_calculate_dem_diff,
+ trigger_initialize_dem_preview,
+ ]
status_completed = set_processing_status_completed(
primary_connection_uri, prod_area_id
)
@@ -197,6 +224,8 @@ def set_processing_status_failed(
# DEM so it can compare against freshly computed reference rasters.
ensure_database >> reference_gate >> trigger_calculate_reference_dem
trigger_calculate_reference_dem >> diff_gate >> trigger_calculate_dem_diff
+ # Runs in parallel with the reference DEM -> DEM diff chain.
+ ensure_database >> preview_gate >> trigger_initialize_dem_preview
# Also depend on ensure_database so a failure there (before any trigger
# runs) still resolves the processing status instead of leaving the
# production area stuck in STARTED.
diff --git a/components/dags/src/pinta_dags/dags/calculate_reference_dem.py b/components/dags/src/pinta_dags/dags/calculate_reference_dem.py
index 2485a0b..b2a8e36 100644
--- a/components/dags/src/pinta_dags/dags/calculate_reference_dem.py
+++ b/components/dags/src/pinta_dags/dags/calculate_reference_dem.py
@@ -14,6 +14,7 @@
from pinta_dags.config import AirflowVariable
from pinta_dags.tasks import (
build_job_connection_uri_task,
+ find_production_area_tile_paths,
get_database_name,
initialize_dem_tables,
merge_dem_staging_tables,
@@ -65,25 +66,6 @@ def calculate_reference_dem_dag() -> None:
# Precondition: the production area must already have its job database
# provisioned and database_name set for production area by orchestrator DAG.
- @task.docker(**config.PINTA_CONTAINER_TASK_ARGS)
- def find_production_area(
- connection_uri: str,
- production_area_id: str,
- ) -> list[str]:
- import sqlalchemy
- import sqlmodel
- from pinta_db.primary_db.models.management import ProductionArea
-
- engine = sqlalchemy.create_engine(connection_uri)
- with sqlmodel.Session(engine) as session:
- statement = sqlmodel.select(ProductionArea).where(
- ProductionArea.id == production_area_id
- )
- area_in_db = session.exec(statement).first()
- if not area_in_db:
- return []
- return [tile.file_path for tile in area_in_db.tiles]
-
@task.docker(
**config.PINTA_CONTAINER_TASK_ARGS,
max_active_tis_per_dag=_get_max_parallel_pipelines(),
@@ -133,7 +115,9 @@ def blast2dem( # noqa: PLR0913
database_name = cast(
"str", get_database_name(primary_connection_uri, prod_area_id)
)
- file_paths = find_production_area(primary_connection_uri, prod_area_id)
+ file_paths = find_production_area_tile_paths.override(
+ task_id="find_production_area"
+ )(primary_connection_uri, prod_area_id)
job_db_uri = cast(
"str",
diff --git a/components/dags/src/pinta_dags/dags/initialize_dem_preview.py b/components/dags/src/pinta_dags/dags/initialize_dem_preview.py
new file mode 100644
index 0000000..45fad8f
--- /dev/null
+++ b/components/dags/src/pinta_dags/dags/initialize_dem_preview.py
@@ -0,0 +1,157 @@
+# Copyright (c) 2026 National Land Survey of Finland
+# (https://www.maanmittauslaitos.fi/en).
+# This file is part of the Pinta.
+# Licensed under the MIT License; see the repository LICENSE file.
+
+from typing import cast
+
+from airflow.sdk import DAG, Param, Variable, dag, task
+from pinta_common import constants
+from pinta_db.job_db.models.user import DemPreview
+from pinta_db.job_db.schema import Schema
+from pinta_db.primary_db.models.dem import Dem as PrimaryDem
+from pinta_db.primary_db.schema import Schema as PrimarySchema
+
+from pinta_dags import config
+from pinta_dags.config import AirflowVariable
+from pinta_dags.tasks import (
+ build_job_connection_uri_task,
+ find_production_area_tile_geometries,
+ get_database_name,
+ initialize_dem_tables,
+ merge_dem_staging_tables,
+)
+
+FROM_DB_SCHEMA = PrimarySchema.DEM.value
+FROM_DB_TABLE = PrimaryDem.__tablename__
+TO_DB_SCHEMA = Schema.USER.value
+TO_DB_TABLE = DemPreview.__tablename__
+
+
+def _get_max_parallel_pipelines() -> int:
+ # Reuses the reference DEM parallelism variable
+ var = AirflowVariable.CALCULATE_REFERENCE_DEM_MAX_PARALLEL_PIPELINES
+ max_parallel = int(Variable.get(var, 4))
+ if max_parallel < 1:
+ msg = f"{var} must be at least 1"
+ raise ValueError(msg)
+ return max_parallel
+
+
+def _get_staging_tables() -> int:
+ # Reuses the reference DEM staging tables variable
+ var = AirflowVariable.CALCULATE_REFERENCE_DEM_STAGING_TABLES
+ staging_tables = int(Variable.get(var, 1))
+ if staging_tables < 0:
+ msg = f"{var} must be at least 0"
+ raise ValueError(msg)
+ return staging_tables
+
+
+def create_initialize_dem_preview_dag(
+ *,
+ dag_id: str,
+) -> DAG:
+ @dag(
+ dag_id=dag_id,
+ tags=[dag_id],
+ dag_display_name="Initialize DEM preview",
+ schedule=None,
+ params={
+ "id": Param(
+ "",
+ type="string",
+ format="uuid",
+ description=("Production area id as UUID"),
+ )
+ },
+ is_paused_upon_creation=False,
+ )
+ def initialize_dem_preview_dag() -> None:
+ # Precondition: the production area must already have its job database
+ # provisioned and database_name set for production area by orchestrator DAG.
+
+ @task.docker(
+ **config.PINTA_CONTAINER_TASK_ARGS,
+ max_active_tis_per_dag=_get_max_parallel_pipelines(),
+ )
+ def copy_dem_preview( # noqa: PLR0913
+ primary_connection_uri: str,
+ job_connection_uri: str,
+ tile_wkt: str,
+ staging_tables: int,
+ from_schema: str,
+ from_table: str,
+ to_schema: str,
+ to_table: str,
+ ) -> None:
+ import sqlalchemy
+ import sqlmodel
+ from pinta_processing import pipelines
+
+ with (
+ sqlmodel.Session(
+ sqlalchemy.create_engine(primary_connection_uri)
+ ) as primary_session,
+ sqlmodel.Session(
+ sqlalchemy.create_engine(job_connection_uri)
+ ) as job_session,
+ ):
+ pipeline = pipelines.postgis_to_postgis(
+ from_session=primary_session,
+ from_schema=from_schema,
+ from_table=from_table,
+ to_session=job_session,
+ to_schema=to_schema,
+ to_table=to_table,
+ tile_wkt=tile_wkt,
+ staging_tables=staging_tables,
+ )
+ pipeline.execute()
+
+ primary_connection_uri = config.connection_uri_template("pinta_processing_db")
+ job_connection_uri = config.connection_uri_template("pinta_job_db")
+ staging_tables = _get_staging_tables()
+
+ prod_area_id = "{{ params.id }}"
+ database_name = cast(
+ "str", get_database_name(primary_connection_uri, prod_area_id)
+ )
+ job_db_uri = cast(
+ "str",
+ build_job_connection_uri_task(
+ base_uri=job_connection_uri,
+ database_name=database_name,
+ ),
+ )
+ tile_wkt_list = find_production_area_tile_geometries.override(
+ task_id="find_production_area"
+ )(primary_connection_uri, prod_area_id)
+
+ initialize_task = initialize_dem_tables(
+ job_db_uri, TO_DB_SCHEMA, TO_DB_TABLE, staging_tables
+ )
+ copied_tiles = copy_dem_preview.partial(
+ primary_connection_uri=primary_connection_uri,
+ job_connection_uri=job_db_uri,
+ staging_tables=staging_tables,
+ from_schema=FROM_DB_SCHEMA,
+ from_table=FROM_DB_TABLE,
+ to_schema=TO_DB_SCHEMA,
+ to_table=TO_DB_TABLE,
+ ).expand(tile_wkt=tile_wkt_list)
+ (
+ tile_wkt_list
+ >> initialize_task
+ >> copied_tiles
+ >> merge_dem_staging_tables(
+ job_db_uri, TO_DB_SCHEMA, TO_DB_TABLE, staging_tables
+ )
+ )
+
+ return initialize_dem_preview_dag()
+
+
+DAG_ID = constants.DAG_ID_INITIALIZE_DEM_PREVIEW
+
+globals()[DAG_ID] = create_initialize_dem_preview_dag(dag_id=DAG_ID)
diff --git a/components/dags/src/pinta_dags/tasks.py b/components/dags/src/pinta_dags/tasks.py
index 4536758..fd71e3d 100644
--- a/components/dags/src/pinta_dags/tasks.py
+++ b/components/dags/src/pinta_dags/tasks.py
@@ -33,6 +33,51 @@ def get_database_name(
return area_in_db.database_name
+@task.docker(**config.PINTA_CONTAINER_TASK_ARGS)
+def find_production_area_tile_paths(
+ connection_uri: str,
+ production_area_id: str,
+) -> list[str]:
+ """Return the source file paths of the production area's point cloud tiles."""
+ import sqlalchemy
+ import sqlmodel
+ from pinta_db.primary_db.models.management import ProductionArea
+
+ engine = sqlalchemy.create_engine(connection_uri)
+ with sqlmodel.Session(engine) as session:
+ area_in_db = session.exec(
+ sqlmodel.select(ProductionArea).where(
+ ProductionArea.id == production_area_id
+ )
+ ).first()
+ if not area_in_db:
+ return []
+ return [tile.file_path for tile in area_in_db.tiles]
+
+
+@task.docker(**config.PINTA_CONTAINER_TASK_ARGS)
+def find_production_area_tile_geometries(
+ connection_uri: str,
+ production_area_id: str,
+) -> list[str]:
+ """Return the geometries (as WKT) of the production area's point cloud tiles."""
+ import sqlalchemy
+ import sqlmodel
+ from geoalchemy2.shape import to_shape
+ from pinta_db.primary_db.models.management import ProductionArea
+
+ engine = sqlalchemy.create_engine(connection_uri)
+ with sqlmodel.Session(engine) as session:
+ area_in_db = session.exec(
+ sqlmodel.select(ProductionArea).where(
+ ProductionArea.id == production_area_id
+ )
+ ).first()
+ if not area_in_db:
+ return []
+ return [to_shape(tile.geom).wkt for tile in area_in_db.tiles]
+
+
@task
def build_job_connection_uri_task(
base_uri: str,
diff --git a/components/dags/test_dags/dags/test_calculate_dem_diff.py b/components/dags/test_dags/dags/test_calculate_dem_diff.py
new file mode 100644
index 0000000..e7b4735
--- /dev/null
+++ b/components/dags/test_dags/dags/test_calculate_dem_diff.py
@@ -0,0 +1,103 @@
+# Copyright (c) 2026 National Land Survey of Finland
+# (https://www.maanmittauslaitos.fi/en).
+# This file is part of the Pinta.
+# Licensed under the MIT License; see the repository LICENSE file.
+
+import uuid
+from typing import TYPE_CHECKING
+
+import pytest
+from airflow.models import DagBag, dagbag
+
+from pinta_dags.dags import calculate_dem_diff
+
+if TYPE_CHECKING:
+ from airflow.sdk import DAG
+
+
+def create_dag_to_test() -> "DAG":
+ dag = calculate_dem_diff.create_calculate_dem_diff_dag(
+ dag_id=f"some_id_{uuid.uuid4()}"
+ )
+
+ assert str(dag.dag_id).startswith("some_id")
+
+ dag_bag = DagBag(include_examples=False)
+ dag_bag.bag_dag(dag)
+ dagbag.sync_bag_to_db(dag_bag, "mock-dags", None)
+
+ return dag
+
+
+@pytest.fixture(autouse=True)
+def mock_airflow_settings(monkeypatch: "pytest.MonkeyPatch") -> None:
+ monkeypatch.setenv("AIRFLOW_CONN_PINTA_PROCESSING_DB", "postgres://mockaddr:123/db")
+ monkeypatch.setenv("AIRFLOW_CONN_PINTA_JOB_DB_ADMIN", "postgres://mockaddr:123/db")
+ monkeypatch.setenv("AIRFLOW_CONN_PINTA_JOB_DB", "postgres://mockaddr:123/db")
+ monkeypatch.setenv(
+ "AIRFLOW_VAR_PINTA_CALCULATE_DEM_DIFF_MAX_PARALLEL_PIPELINES", "2"
+ )
+ monkeypatch.setenv("AIRFLOW_VAR_PINTA_CALCULATE_DEM_DIFF_STAGING_TABLES", "3")
+
+
+def test_calculate_dem_diff_all_tasks() -> None:
+ dag = create_dag_to_test()
+
+ assert set(dag.task_ids) == {
+ "get_database_name",
+ "build_job_connection_uri_task",
+ "find_production_area",
+ "initialize_diff_tables",
+ "initialize_diff_lte_threshold_tables",
+ "calculate_dem_diff",
+ "merge_diff_tables",
+ "merge_diff_lte_threshold_tables",
+ "should_cluster",
+ "cluster_diff_polygons",
+ }
+
+
+def test_dependencies() -> None:
+ dag = create_dag_to_test()
+ assert dag is not None
+
+ get_database_name = dag.get_task("get_database_name")
+ build_job_connection_uri_task = dag.get_task("build_job_connection_uri_task")
+ find_production_area = dag.get_task("find_production_area")
+ init_diff = dag.get_task("initialize_diff_tables")
+ init_diff_lte = dag.get_task("initialize_diff_lte_threshold_tables")
+ calculate = dag.get_task("calculate_dem_diff")
+ merge_diff = dag.get_task("merge_diff_tables")
+ merge_diff_lte = dag.get_task("merge_diff_lte_threshold_tables")
+ should_cluster = dag.get_task("should_cluster")
+ cluster = dag.get_task("cluster_diff_polygons")
+
+ assert get_database_name.task_id in build_job_connection_uri_task.upstream_task_ids
+ # The diff tiles come from the production area geometries task.
+ assert find_production_area.task_id in init_diff.upstream_task_ids
+ assert find_production_area.task_id in init_diff_lte.upstream_task_ids
+ assert init_diff.task_id in calculate.upstream_task_ids
+ assert init_diff_lte.task_id in calculate.upstream_task_ids
+ assert calculate.task_id in merge_diff.upstream_task_ids
+ assert calculate.task_id in merge_diff_lte.upstream_task_ids
+ assert merge_diff.task_id in should_cluster.upstream_task_ids
+ assert merge_diff_lte.task_id in should_cluster.upstream_task_ids
+ assert should_cluster.task_id in cluster.upstream_task_ids
+
+
+def test_get_max_parallel_pipelines_rejects_below_one(
+ monkeypatch: "pytest.MonkeyPatch",
+) -> None:
+ monkeypatch.setenv(
+ "AIRFLOW_VAR_PINTA_CALCULATE_DEM_DIFF_MAX_PARALLEL_PIPELINES", "0"
+ )
+ with pytest.raises(ValueError, match="must be at least 1"):
+ calculate_dem_diff._get_max_parallel_pipelines()
+
+
+def test_get_staging_tables_rejects_negative(
+ monkeypatch: "pytest.MonkeyPatch",
+) -> None:
+ monkeypatch.setenv("AIRFLOW_VAR_PINTA_CALCULATE_DEM_DIFF_STAGING_TABLES", "-1")
+ with pytest.raises(ValueError, match="must be at least 0"):
+ calculate_dem_diff._get_staging_tables()
diff --git a/components/dags/test_dags/dags/test_calculate_rasters_for_production_area.py b/components/dags/test_dags/dags/test_calculate_rasters_for_production_area.py
new file mode 100644
index 0000000..eebc5d2
--- /dev/null
+++ b/components/dags/test_dags/dags/test_calculate_rasters_for_production_area.py
@@ -0,0 +1,102 @@
+# Copyright (c) 2026 National Land Survey of Finland
+# (https://www.maanmittauslaitos.fi/en).
+# This file is part of the Pinta.
+# Licensed under the MIT License; see the repository LICENSE file.
+
+import uuid
+from typing import TYPE_CHECKING
+
+import pytest
+from airflow.models import DagBag, dagbag
+
+from pinta_dags.dags import calculate_rasters_for_production_area as rasters_dag
+
+if TYPE_CHECKING:
+ from airflow.sdk import DAG
+
+
+def create_dag_to_test() -> "DAG":
+ dag = rasters_dag.create_calculate_rasters_for_production_area_dag(
+ dag_id=f"some_id_{uuid.uuid4()}"
+ )
+
+ assert str(dag.dag_id).startswith("some_id")
+
+ dag_bag = DagBag(include_examples=False)
+ dag_bag.bag_dag(dag)
+ dagbag.sync_bag_to_db(dag_bag, "mock-dags", None)
+
+ return dag
+
+
+@pytest.fixture(autouse=True)
+def mock_airflow_settings(monkeypatch: "pytest.MonkeyPatch") -> None:
+ monkeypatch.setenv("AIRFLOW_CONN_PINTA_PROCESSING_DB", "postgres://mockaddr:123/db")
+ monkeypatch.setenv("AIRFLOW_CONN_PINTA_JOB_DB_ADMIN", "postgres://mockaddr:123/db")
+ monkeypatch.setenv("AIRFLOW_CONN_PINTA_JOB_DB", "postgres://mockaddr:123/db")
+
+
+def test_all_tasks() -> None:
+ dag = create_dag_to_test()
+
+ assert set(dag.task_ids) == {
+ "ensure_job_database",
+ "should_calculate_reference_dem",
+ "should_calculate_dem_diff",
+ "should_initialize_dem_preview",
+ "trigger_calculate_reference_dem",
+ "trigger_calculate_dem_diff",
+ "trigger_initialize_dem_preview",
+ "set_processing_status_completed",
+ "set_processing_status_failed",
+ }
+
+
+def test_initialize_dem_preview_param_defaults_true() -> None:
+ dag = create_dag_to_test()
+
+ assert dag.params["initialize_dem_preview"] is True
+
+
+def test_dem_preview_runs_in_parallel_off_ensure_database() -> None:
+ # The DEM preview trigger is gated by its own short-circuit and hangs
+ # directly off ensure_database, independent of the reference DEM -> diff
+ # chain.
+ dag = create_dag_to_test()
+
+ ensure_database = dag.get_task("ensure_job_database")
+ preview_gate = dag.get_task("should_initialize_dem_preview")
+ trigger_preview = dag.get_task("trigger_initialize_dem_preview")
+
+ assert ensure_database.task_id in preview_gate.upstream_task_ids
+ assert preview_gate.task_id in trigger_preview.upstream_task_ids
+ # Not chained behind the reference DEM / diff triggers.
+ assert "trigger_calculate_reference_dem" not in trigger_preview.upstream_task_ids
+ assert "trigger_calculate_dem_diff" not in trigger_preview.upstream_task_ids
+
+
+def test_trigger_operators_poll_frequently_for_completion() -> None:
+ # The waiting triggers must poll well below the 60 s default, otherwise the
+ # orchestration sits idle for up to a minute after each child DAG finishes
+ # and the end-to-end run times out.
+ dag = create_dag_to_test()
+
+ for task_id in (
+ "trigger_calculate_reference_dem",
+ "trigger_calculate_dem_diff",
+ "trigger_initialize_dem_preview",
+ ):
+ trigger = dag.get_task(task_id)
+ assert trigger.wait_for_completion is True
+ assert trigger.poke_interval <= 10
+
+
+def test_status_tasks_wait_for_dem_preview_trigger() -> None:
+ dag = create_dag_to_test()
+
+ trigger_preview = dag.get_task("trigger_initialize_dem_preview")
+ status_completed = dag.get_task("set_processing_status_completed")
+ status_failed = dag.get_task("set_processing_status_failed")
+
+ assert trigger_preview.task_id in status_completed.upstream_task_ids
+ assert trigger_preview.task_id in status_failed.upstream_task_ids
diff --git a/components/dags/test_dags/dags/test_initialize_dem_preview.py b/components/dags/test_dags/dags/test_initialize_dem_preview.py
new file mode 100644
index 0000000..f648660
--- /dev/null
+++ b/components/dags/test_dags/dags/test_initialize_dem_preview.py
@@ -0,0 +1,149 @@
+# Copyright (c) 2026 National Land Survey of Finland
+# (https://www.maanmittauslaitos.fi/en).
+# This file is part of the Pinta.
+# Licensed under the MIT License; see the repository LICENSE file.
+
+import uuid
+from typing import TYPE_CHECKING
+from unittest.mock import MagicMock
+
+import pytest
+from airflow.models import DagBag, dagbag
+
+from pinta_dags.dags import initialize_dem_preview
+
+if TYPE_CHECKING:
+ from airflow.sdk import DAG
+ from pytest_mock import MockerFixture
+
+
+def create_dag_to_test() -> "DAG":
+ dag = initialize_dem_preview.create_initialize_dem_preview_dag(
+ dag_id=f"some_id_{uuid.uuid4()}"
+ )
+
+ assert str(dag.dag_id).startswith("some_id")
+
+ dag_bag = DagBag(include_examples=False)
+ dag_bag.bag_dag(dag)
+ dagbag.sync_bag_to_db(dag_bag, "mock-dags", None)
+
+ return dag
+
+
+@pytest.fixture(autouse=True)
+def mock_airflow_settings(monkeypatch: "pytest.MonkeyPatch") -> None:
+ monkeypatch.setenv("AIRFLOW_CONN_PINTA_PROCESSING_DB", "postgres://mockaddr:123/db")
+ monkeypatch.setenv("AIRFLOW_CONN_PINTA_JOB_DB_ADMIN", "postgres://mockaddr:123/db")
+ monkeypatch.setenv("AIRFLOW_CONN_PINTA_JOB_DB", "postgres://mockaddr:123/db")
+ monkeypatch.setenv(
+ "AIRFLOW_VAR_PINTA_CALCULATE_REFERENCE_DEM_MAX_PARALLEL_PIPELINES", "2"
+ )
+ monkeypatch.setenv("AIRFLOW_VAR_PINTA_CALCULATE_REFERENCE_DEM_STAGING_TABLES", "3")
+
+
+def test_initialize_dem_preview_all_tasks() -> None:
+ dag = create_dag_to_test()
+
+ assert set(dag.task_ids) == {
+ "get_database_name",
+ "build_job_connection_uri_task",
+ "find_production_area",
+ "initialize_dem_tables",
+ "copy_dem_preview",
+ "merge_dem_staging_tables",
+ }
+
+
+def test_dependencies() -> None:
+ dag = create_dag_to_test()
+ assert dag is not None
+
+ get_database_name = dag.get_task("get_database_name")
+ build_job_connection_uri_task = dag.get_task("build_job_connection_uri_task")
+ find_production_area = dag.get_task("find_production_area")
+ initialize = dag.get_task("initialize_dem_tables")
+ copy_dem_preview = dag.get_task("copy_dem_preview")
+ merge_dem_staging_tables = dag.get_task("merge_dem_staging_tables")
+
+ assert get_database_name.task_id in build_job_connection_uri_task.upstream_task_ids
+ assert find_production_area.task_id in initialize.upstream_task_ids
+ assert initialize.task_id in copy_dem_preview.upstream_task_ids
+ assert copy_dem_preview.task_id in merge_dem_staging_tables.upstream_task_ids
+
+
+def test_copies_from_primary_dem_to_dem_preview() -> None:
+ # The preview copy reads the primary DEM table and writes the job database
+ # DEM preview table.
+ assert initialize_dem_preview.FROM_DB_SCHEMA == "dem"
+ assert initialize_dem_preview.FROM_DB_TABLE == "dem"
+ assert initialize_dem_preview.TO_DB_SCHEMA == "user_data"
+ assert initialize_dem_preview.TO_DB_TABLE == "dem_preview"
+
+
+def test_get_max_parallel_pipelines_reads_variable() -> None:
+ assert initialize_dem_preview._get_max_parallel_pipelines() == 2
+
+
+def test_get_max_parallel_pipelines_rejects_below_one(
+ monkeypatch: "pytest.MonkeyPatch",
+) -> None:
+ monkeypatch.setenv(
+ "AIRFLOW_VAR_PINTA_CALCULATE_REFERENCE_DEM_MAX_PARALLEL_PIPELINES", "0"
+ )
+ with pytest.raises(ValueError, match="must be at least 1"):
+ initialize_dem_preview._get_max_parallel_pipelines()
+
+
+def test_get_staging_tables_reads_variable() -> None:
+ assert initialize_dem_preview._get_staging_tables() == 3
+
+
+def test_get_staging_tables_rejects_negative(
+ monkeypatch: "pytest.MonkeyPatch",
+) -> None:
+ monkeypatch.setenv("AIRFLOW_VAR_PINTA_CALCULATE_REFERENCE_DEM_STAGING_TABLES", "-1")
+ with pytest.raises(ValueError, match="must be at least 0"):
+ initialize_dem_preview._get_staging_tables()
+
+
+def test_copy_dem_preview_builds_and_executes_pipeline(
+ mocker: "MockerFixture",
+) -> None:
+ mocker.patch("sqlalchemy.create_engine")
+ mocker.patch("sqlmodel.Session")
+ # Inject a mock pipelines module so the task body's ``from pinta_processing
+ # import pipelines`` resolves to the mock instead of importing the real
+ # module (which would load heavy DB modules and leak into the sys.modules
+ # patching other DAG tests rely on).
+ mock_pipeline = MagicMock()
+ mock_pipelines_module = MagicMock()
+ mock_pipelines_module.postgis_to_postgis.return_value = mock_pipeline
+ mocker.patch.dict(
+ "sys.modules",
+ {"pinta_processing.pipelines": mock_pipelines_module},
+ )
+
+ dag = create_dag_to_test()
+ copy_dem_preview = dag.get_task("copy_dem_preview").python_callable
+
+ copy_dem_preview(
+ primary_connection_uri="postgres://primary",
+ job_connection_uri="postgres://job",
+ tile_wkt="POINT (0 0)",
+ staging_tables=3,
+ from_schema="dem",
+ from_table="dem",
+ to_schema="user_data",
+ to_table="dem_preview",
+ )
+
+ mock_pipelines_module.postgis_to_postgis.assert_called_once()
+ kwargs = mock_pipelines_module.postgis_to_postgis.call_args.kwargs
+ assert kwargs["from_schema"] == "dem"
+ assert kwargs["from_table"] == "dem"
+ assert kwargs["to_schema"] == "user_data"
+ assert kwargs["to_table"] == "dem_preview"
+ assert kwargs["tile_wkt"] == "POINT (0 0)"
+ assert kwargs["staging_tables"] == 3
+ mock_pipeline.execute.assert_called_once_with()
diff --git a/components/dags/test_dags/dags/test_tasks.py b/components/dags/test_dags/dags/test_tasks.py
new file mode 100644
index 0000000..fce5977
--- /dev/null
+++ b/components/dags/test_dags/dags/test_tasks.py
@@ -0,0 +1,110 @@
+# Copyright (c) 2026 National Land Survey of Finland
+# (https://www.maanmittauslaitos.fi/en).
+# This file is part of the Pinta.
+# Licensed under the MIT License; see the repository LICENSE file.
+
+from typing import TYPE_CHECKING
+from unittest.mock import MagicMock
+
+import pytest
+
+from pinta_dags import tasks
+
+if TYPE_CHECKING:
+ from pytest_mock import MockerFixture
+
+
+@pytest.fixture
+def mock_session(mocker: "MockerFixture") -> MagicMock:
+ """Patch the engine/session so task bodies run without a real database."""
+ mocker.patch("sqlalchemy.create_engine")
+ session = MagicMock()
+ session_ctx = mocker.patch("sqlmodel.Session")
+ session_ctx.return_value.__enter__.return_value = session
+ return session
+
+
+def test_get_database_name_returns_database_name(mock_session: MagicMock) -> None:
+ area = MagicMock(database_name="job_db")
+ mock_session.exec.return_value.first.return_value = area
+
+ result = tasks.get_database_name.function(
+ "postgres://mock/db", "some-production-area-id"
+ )
+
+ assert result == "job_db"
+
+
+@pytest.mark.parametrize("area", [None, MagicMock(database_name=None)])
+def test_get_database_name_raises_without_database_name(
+ mock_session: MagicMock, area: object
+) -> None:
+ mock_session.exec.return_value.first.return_value = area
+
+ with pytest.raises(ValueError, match="no database name set"):
+ tasks.get_database_name.function("postgres://mock/db", "prod-area-id")
+
+
+def test_find_production_area_tile_paths_returns_file_paths(
+ mock_session: MagicMock,
+) -> None:
+ area = MagicMock(
+ tiles=[MagicMock(file_path="/a.laz"), MagicMock(file_path="/b.laz")]
+ )
+ mock_session.exec.return_value.first.return_value = area
+
+ result = tasks.find_production_area_tile_paths.function(
+ "postgres://mock/db", "prod-area-id"
+ )
+
+ assert result == ["/a.laz", "/b.laz"]
+
+
+def test_find_production_area_tile_paths_missing_area_returns_empty(
+ mock_session: MagicMock,
+) -> None:
+ mock_session.exec.return_value.first.return_value = None
+
+ result = tasks.find_production_area_tile_paths.function(
+ "postgres://mock/db", "prod-area-id"
+ )
+
+ assert result == []
+
+
+def test_find_production_area_tile_geometries_returns_wkt(
+ mock_session: MagicMock, mocker: "MockerFixture"
+) -> None:
+ mocker.patch(
+ "geoalchemy2.shape.to_shape",
+ side_effect=[MagicMock(wkt="POINT (0 0)"), MagicMock(wkt="POINT (1 1)")],
+ )
+ area = MagicMock(tiles=[MagicMock(), MagicMock()])
+ mock_session.exec.return_value.first.return_value = area
+
+ result = tasks.find_production_area_tile_geometries.function(
+ "postgres://mock/db", "prod-area-id"
+ )
+
+ assert result == ["POINT (0 0)", "POINT (1 1)"]
+
+
+def test_find_production_area_tile_geometries_missing_area_returns_empty(
+ mock_session: MagicMock,
+) -> None:
+ mock_session.exec.return_value.first.return_value = None
+
+ result = tasks.find_production_area_tile_geometries.function(
+ "postgres://mock/db", "prod-area-id"
+ )
+
+ assert result == []
+
+
+def test_build_job_connection_uri_task_replaces_database_name() -> None:
+ result = tasks.build_job_connection_uri_task.function(
+ base_uri="postgresql://user:pass@host:1234/template_db",
+ database_name="job_test",
+ )
+
+ assert result == "postgresql://user:pass@host:1234/job_test"
diff --git a/components/db/migrations/job/versions/008_add_dem_preview.py b/components/db/migrations/job/versions/008_add_dem_preview.py
new file mode 100644
index 0000000..002837a
--- /dev/null
+++ b/components/db/migrations/job/versions/008_add_dem_preview.py
@@ -0,0 +1,56 @@
+# Copyright (c) 2026 National Land Survey of Finland
+# (https://www.maanmittauslaitos.fi/en).
+# This file is part of the Pinta.
+# Licensed under the MIT License; see the repository LICENSE file.
+
+"""Add dem preview tables
+
+Revision ID: 008
+Revises: 007
+Create Date: 2026-07-02
+
+"""
+
+from collections.abc import Sequence
+
+from alembic import op
+
+from migrations import _schema_op
+from pinta_common import Settings
+
+# revision identifiers, used by Alembic.
+revision: str = "008"
+down_revision: str | Sequence[str] | None = "007"
+branch_labels: str | Sequence[str] | None = None
+depends_on: str | Sequence[str] | None = None
+
+
+_OVERVIEW_FACTORS = (2, 8, 128)
+_BASE_TABLE = "dem_preview"
+
+
+def upgrade() -> None:
+ """Upgrade schema."""
+ srid = int(Settings.DB_SRID)
+ pixel_size = Settings.DB_DEM_PIXEL_SIZE
+ nodata = Settings.DB_DEM_NODATA
+
+ _schema_op.create_raster_table(
+ _BASE_TABLE, pixel_size, nodata, srid, schema="user_data"
+ )
+ for factor in _OVERVIEW_FACTORS:
+ table = f"o_{factor}_{_BASE_TABLE}"
+ _schema_op.create_raster_table(
+ table, pixel_size * factor, nodata, srid, schema="user_data"
+ )
+ _schema_op.add_overview_constraints("user_data", table, factor, _BASE_TABLE)
+
+
+def downgrade() -> None:
+ """Downgrade schema."""
+ op.drop_index(f"idx_{_BASE_TABLE}_rast", table_name=_BASE_TABLE, schema="user_data")
+ op.drop_geospatial_table(_BASE_TABLE, schema="user_data")
+ for factor in reversed(_OVERVIEW_FACTORS):
+ table = f"o_{factor}_{_BASE_TABLE}"
+ op.drop_index(f"idx_{table}_rast", table_name=table, schema="user_data")
+ op.drop_geospatial_table(table, schema="user_data")
diff --git a/components/db/migrations/job/versions/009_add_worker_user_schema_permissions.py b/components/db/migrations/job/versions/009_add_worker_user_schema_permissions.py
new file mode 100644
index 0000000..98d05b2
--- /dev/null
+++ b/components/db/migrations/job/versions/009_add_worker_user_schema_permissions.py
@@ -0,0 +1,74 @@
+# Copyright (c) 2026 National Land Survey of Finland
+# (https://www.maanmittauslaitos.fi/en).
+# This file is part of the Pinta.
+# Licensed under the MIT License; see the repository LICENSE file.
+
+"""Add permissions for worker in user schema
+
+Revision ID: 009
+Revises: 008
+Create Date: 2026-07-02
+
+"""
+
+import os
+from collections.abc import Sequence
+
+from alembic import op
+
+from migrations import _schema_op as schema_op
+
+# revision identifiers, used by Alembic.
+revision: str = "009"
+down_revision: str | Sequence[str] | None = "008"
+branch_labels: str | Sequence[str] | None = None
+depends_on: str | Sequence[str] | None = None
+
+OWNER = os.environ["DB_JOB_OWNER_ROLE"]
+WRITER = os.environ["DB_JOB_WRITER_ROLE"]
+READER = os.environ["DB_JOB_READER_ROLE"]
+PROCESSING_WORKER = os.environ["DB_JOB_PROCESSING_WORKER_ROLE"]
+DB_NAME = os.environ["DB_JOB_TEMPLATE_NAME"]
+
+REFERENCE_SCHEMA = "reference"
+USER_SCHEMA = "user_data"
+
+
+def upgrade() -> None:
+ """Upgrade schema."""
+ schema_op.grant_privileges_on_schema(
+ schema=USER_SCHEMA, role=PROCESSING_WORKER, privileges=("USAGE", "CREATE")
+ )
+ schema_op.grant_default_privileges_on_tables_in_schema(
+ schema=USER_SCHEMA,
+ schema_owner=OWNER,
+ role=PROCESSING_WORKER,
+ privileges=("SELECT", "INSERT", "UPDATE", "DELETE", "TRUNCATE"),
+ )
+ schema_op.grant_default_privileges_on_sequences_in_schema(
+ schema=USER_SCHEMA,
+ schema_owner=OWNER,
+ role=PROCESSING_WORKER,
+ privileges=("SELECT", "USAGE"),
+ )
+ op.execute(
+ f"GRANT USAGE, SELECT ON ALL SEQUENCES IN SCHEMA user_data"
+ f" TO {PROCESSING_WORKER}"
+ )
+
+
+def downgrade() -> None:
+ """Downgrade schema."""
+ schema_op.revoke_privileges_on_schema(
+ schema=USER_SCHEMA, role=PROCESSING_WORKER, privileges=("USAGE", "CREATE")
+ )
+ schema_op.revoke_default_privileges_on_tables_in_schema(
+ schema=USER_SCHEMA,
+ schema_owner=OWNER,
+ role=PROCESSING_WORKER,
+ privileges=("SELECT", "INSERT", "UPDATE", "DELETE", "TRUNCATE"),
+ )
+ op.execute(
+ f"REVOKE USAGE, SELECT ON ALL SEQUENCES IN SCHEMA user_data" # noqa: S608
+ f" FROM {PROCESSING_WORKER}"
+ )
diff --git a/components/db/src/pinta_common/constants.py b/components/db/src/pinta_common/constants.py
index c2028e3..4dc3bfa 100644
--- a/components/db/src/pinta_common/constants.py
+++ b/components/db/src/pinta_common/constants.py
@@ -9,4 +9,5 @@
DAG_ID_PROCESS_PRODUCTION_AREAS = "process_production_areas"
DAG_ID_CALCULATE_REFERENCE_DEM = "calculate_reference_dem"
DAG_ID_CALCULATE_DEM_DIFF = "calculate_dem_diff"
+DAG_ID_INITIALIZE_DEM_PREVIEW = "initialize_dem_preview"
DAG_ID_CALCULATE_RASTERS_FOR_PRODUCTION_AREA = "calculate_rasters_for_production_area"
diff --git a/components/db/src/pinta_db/job_db/models/base.py b/components/db/src/pinta_db/job_db/models/base.py
index e07afd1..eaff65b 100644
--- a/components/db/src/pinta_db/job_db/models/base.py
+++ b/components/db/src/pinta_db/job_db/models/base.py
@@ -22,4 +22,8 @@ class UserBase(BaseJobDb):
__table_args__ = {"schema": Schema.USER.value} # noqa: RUF012
+
+class UserVectorBase(UserBase):
+ """Base model for tables in user schema."""
+
id: uuid.UUID = Field(default_factory=uuid.uuid4, primary_key=True)
diff --git a/components/db/src/pinta_db/job_db/models/user.py b/components/db/src/pinta_db/job_db/models/user.py
index e540730..b7afc8f 100644
--- a/components/db/src/pinta_db/job_db/models/user.py
+++ b/components/db/src/pinta_db/job_db/models/user.py
@@ -10,13 +10,30 @@
from sqlmodel import Field
from pinta_common import Settings
+from pinta_db.common import base
from pinta_db.constants import POLYGON
-from pinta_db.job_db.models.base import UserBase
+from pinta_db.job_db.models import base as job_base
-class UpdateArea(UserBase, table=True): # type: ignore[call-arg]
+class UpdateArea(job_base.UserVectorBase, table=True): # type: ignore[call-arg]
"""Final update area."""
geom: Any = Field(
sa_column=Column(Geometry(POLYGON, srid=Settings.DB_SRID, nullable=False))
)
+
+
+class DemPreview(job_base.UserBase, base.RasterBase, table=True): # type: ignore[call-arg]
+ """Reference raster."""
+
+
+class O2DemPreview(job_base.UserBase, base.RasterBase, table=True): # type: ignore[call-arg]
+ """Overview factor 2."""
+
+
+class O8DemPreview(job_base.UserBase, base.RasterBase, table=True): # type: ignore[call-arg]
+ """Overview factor 8."""
+
+
+class O128DemPreview(job_base.UserBase, base.RasterBase, table=True): # type: ignore[call-arg]
+ """Overview factor 128."""
diff --git a/components/e2e/pyproject.toml b/components/e2e/pyproject.toml
index b403368..5182c3a 100644
--- a/components/e2e/pyproject.toml
+++ b/components/e2e/pyproject.toml
@@ -62,5 +62,5 @@ testpaths = [
"test_e2e",
]
addopts = "-n auto --dist loadgroup --maxprocesses=4 --import-mode=importlib"
-timeout = 180
+timeout = 600
timeout_method = "thread"
diff --git a/components/e2e/test_e2e/test_dem_workflows.py b/components/e2e/test_e2e/test_dem_workflows.py
index 0b59ca3..9a6bb0e 100644
--- a/components/e2e/test_e2e/test_dem_workflows.py
+++ b/components/e2e/test_e2e/test_dem_workflows.py
@@ -37,16 +37,19 @@ def _get_production_area(db: "Session") -> ProductionArea:
return production_area
-def _count_diff_rasters(database_name: str) -> int:
- """Count the diff raster tiles written to the job database."""
+def _count_rows(database_name: str, *tables: str) -> int:
+ """Count the total rows across the given ``schema.table`` names in the job db."""
+ counts = " + ".join(f"(SELECT count(*) FROM {table})" for table in tables)
credentials = db_utils.get_job_admin_credentials(database_name)
with engine_utils.get_autocommit_connection(credentials) as connection:
- return connection.execute(
- sqlmodel.text(
- "SELECT (SELECT count(*) FROM reference.diff_gt_threshold) "
- "+ (SELECT count(*) FROM reference.diff_lte_threshold)"
- )
- ).scalar_one()
+ return connection.execute(sqlmodel.text(f"SELECT {counts}")).scalar_one()
+
+
+def _count_diff_rasters(database_name: str) -> int:
+ """Count the diff raster tiles written to the job database."""
+ return _count_rows(
+ database_name, "reference.diff_gt_threshold", "reference.diff_lte_threshold"
+ )
@pytest.mark.xdist_group("airflow")
@@ -110,6 +113,9 @@ def check_state(statuses: list[str]) -> None:
assert layers.get_vector_layer_by_model(reference.DiffPolygon)
assert _count_diff_rasters(completed_feature["database_name"]) > 0
+ # The DEM preview copy runs in parallel and populates user_data.dem_preview.
+ assert _count_rows(completed_feature["database_name"], "user_data.dem_preview") > 0
+
cluster_layer = layers.get_vector_layer_by_model(reference.DiffPolygonCluster)
assert cluster_layer.setSubsetString("")
assert cluster_layer.featureCount() > 0
@@ -130,6 +136,8 @@ def test_calculate_rasters_reference_dem_only(
"id": str(production_area.id),
"calculate_reference_dem": True,
"calculate_dem_diff": False,
+ "cluster_diff_polygons": False,
+ "initialize_dem_preview": False,
},
)
state = airflow_client.wait_for_dag_run(run, timeout=WORKFLOW_TIMEOUT_S)
diff --git a/components/processing/src/pinta_processing/pipelines.py b/components/processing/src/pinta_processing/pipelines.py
index 63c67db..25d02ec 100644
--- a/components/processing/src/pinta_processing/pipelines.py
+++ b/components/processing/src/pinta_processing/pipelines.py
@@ -172,6 +172,29 @@ def calculate_diff_models(
)
+def postgis_to_postgis( # noqa: PLR0913
+ from_session: Session,
+ from_schema: str,
+ from_table: str,
+ to_session: Session,
+ to_schema: str,
+ to_table: str,
+ tile_wkt: str,
+ staging_tables: int = 0,
+) -> core.Pipeline:
+ """Read raster from Postgis, write to Postgis."""
+ return (
+ reader.PostgisReader(
+ from_schema,
+ from_table,
+ from_session,
+ tile_wkt,
+ )
+ | _generate_overview_stages(to_schema, to_table, to_session, staging_tables)
+ | writer.RasterPostgisWriter(to_schema, to_table, to_session, staging_tables)
+ )
+
+
def _generate_overview_stages(
schema: str,
table_name: str,
diff --git a/components/processing/test_integration_processing/pipelines/test_postgis_to_postgis_pipeline.py b/components/processing/test_integration_processing/pipelines/test_postgis_to_postgis_pipeline.py
new file mode 100644
index 0000000..b33a507
--- /dev/null
+++ b/components/processing/test_integration_processing/pipelines/test_postgis_to_postgis_pipeline.py
@@ -0,0 +1,138 @@
+# Copyright (c) 2026 National Land Survey of Finland
+# (https://www.maanmittauslaitos.fi/en).
+# This file is part of the Pinta.
+# Licensed under the MIT License; see the repository LICENSE file.
+
+import tempfile
+import typing
+from pathlib import Path
+
+import numpy as np
+import rasterio
+import sqlalchemy as sa
+from pinta_db.job_db.models import user
+from pinta_db.primary_db.models import dem
+from pinta_db_utils import model_utils
+from pinta_db_utils.postgis import raster
+from pinta_test_utils import pinta_utils
+
+from pinta_processing import pipelines, reader, writer
+
+if typing.TYPE_CHECKING:
+ from sqlmodel import Session
+
+_SOURCE_SCHEMA, _SOURCE_TABLE = model_utils.schema_and_table(dem.Dem)
+_TARGET_SCHEMA, _TARGET_TABLE = model_utils.schema_and_table(user.DemPreview)
+
+
+def test_postgis_to_postgis_copies_source_raster_to_target(
+ admin_primary_session: "Session",
+ session: "Session",
+ processing_worker_session: "Session",
+) -> None:
+ """The pipeline copies the source raster into the (empty) target table."""
+ bounds = _populate_source(admin_primary_session)
+ _init_target(processing_worker_session)
+
+ pipelines.postgis_to_postgis(
+ from_session=session,
+ from_schema=_SOURCE_SCHEMA,
+ from_table=_SOURCE_TABLE,
+ to_session=processing_worker_session,
+ to_schema=_TARGET_SCHEMA,
+ to_table=_TARGET_TABLE,
+ tile_wkt=_bounds_to_wkt(*bounds),
+ ).execute()
+
+ source_array = _read_back(session, _SOURCE_SCHEMA, _SOURCE_TABLE)
+ target_array = _read_back(processing_worker_session, _TARGET_SCHEMA, _TARGET_TABLE)
+ assert target_array is not None
+ assert source_array is not None
+ assert target_array.shape == source_array.shape
+ assert np.array_equal(target_array, source_array)
+
+
+def test_postgis_to_postgis_writes_overview_tables(
+ admin_primary_session: "Session",
+ session: "Session",
+ processing_worker_session: "Session",
+) -> None:
+ """The pipeline also fills the target overview tables, downsampled per level."""
+ bounds = _populate_source(admin_primary_session)
+ _init_target(processing_worker_session)
+
+ pipelines.postgis_to_postgis(
+ from_session=session,
+ from_schema=_SOURCE_SCHEMA,
+ from_table=_SOURCE_TABLE,
+ to_session=processing_worker_session,
+ to_schema=_TARGET_SCHEMA,
+ to_table=_TARGET_TABLE,
+ tile_wkt=_bounds_to_wkt(*bounds),
+ ).execute()
+
+ for level in raster.DEFAULT_OVERVIEW_LEVELS:
+ overview_table = raster.OVERVIEW_TABLE_NAME.format(
+ level=level, table_name=_TARGET_TABLE
+ )
+ count = processing_worker_session.exec( # type: ignore[call-overload]
+ sa.text(f"SELECT COUNT(*) FROM {_TARGET_SCHEMA}.{overview_table}")
+ ).first()[0]
+ assert count > 0, f"Expected overview level {level} to have rows"
+
+
+def _populate_source(
+ admin_primary_session: "Session",
+) -> tuple[float, float, float, float]:
+ """Write the test DEM into the source table and return its bounds."""
+ raster.initialize_raster_table(admin_primary_session, _SOURCE_SCHEMA, _SOURCE_TABLE)
+ file_path = pinta_utils.get_test_data_path("processing/dem.tif")
+
+ (
+ reader.RasterioReader(str(file_path))
+ | writer.RasterPostgisWriter(
+ _SOURCE_SCHEMA, _SOURCE_TABLE, admin_primary_session
+ )
+ ).execute()
+
+ with rasterio.open(str(file_path)) as src:
+ return (src.bounds.left, src.bounds.bottom, src.bounds.right, src.bounds.top)
+
+
+def _init_target(processing_worker_session: "Session") -> None:
+ """Ensure the target raster table and its overview tables exist and are empty."""
+ raster.initialize_raster_table(
+ processing_worker_session, _TARGET_SCHEMA, _TARGET_TABLE, staging_tables=0
+ )
+ raster.initialize_overview_tables(
+ processing_worker_session, _TARGET_SCHEMA, _TARGET_TABLE, staging_tables=0
+ )
+
+
+def _read_back(session: "Session", schema: str, table: str) -> np.ndarray | None:
+ """Merge all tiles of a raster table back into a single array via GDAL."""
+ raster_binary = session.exec( # type: ignore[call-overload]
+ sa.text(
+ f"SELECT ST_AsGDALRaster(ST_Union(rast), 'GTiff') FROM {schema}.{table}"
+ )
+ ).first()[0]
+ if raster_binary is None:
+ return None
+
+ with tempfile.TemporaryDirectory() as tmp:
+ output_path = Path(tmp) / "target.tif"
+ output_path.write_bytes(raster_binary)
+ with rasterio.open(str(output_path)) as src:
+ return src.read(1)
+
+
+def _bounds_to_wkt(left: float, bottom: float, right: float, top: float) -> str:
+ return (
+ "POLYGON(("
+ f"{left} {bottom}, "
+ f"{right} {bottom}, "
+ f"{right} {top}, "
+ f"{left} {top}, "
+ f"{left} {bottom}"
+ "))"
+ )
diff --git a/components/qgis_plugin/src/pinta_qgis_plugin/api/api_client.py b/components/qgis_plugin/src/pinta_qgis_plugin/api/api_client.py
index 7ff9492..59c40ab 100644
--- a/components/qgis_plugin/src/pinta_qgis_plugin/api/api_client.py
+++ b/components/qgis_plugin/src/pinta_qgis_plugin/api/api_client.py
@@ -98,6 +98,7 @@ def start_reference_dem_workflow(self, production_area_id: str) -> None:
"calculate_reference_dem": True,
"calculate_dem_diff": True,
"cluster_diff_polygons": True,
+ "initialize_dem_preview": True,
},
production_area_id=production_area_id,
)
diff --git a/components/qgis_plugin/src/pinta_qgis_plugin/project/config/job_layers.py b/components/qgis_plugin/src/pinta_qgis_plugin/project/config/job_layers.py
index 6b02e38..a2d6ff4 100644
--- a/components/qgis_plugin/src/pinta_qgis_plugin/project/config/job_layers.py
+++ b/components/qgis_plugin/src/pinta_qgis_plugin/project/config/job_layers.py
@@ -48,6 +48,14 @@
style_path=_STYLES_PATH / "raster_diff.qml",
visible_initially=False,
),
+ config.RasterLayerConfig(
+ schema="user_data",
+ table_name="dem_preview",
+ layer_name=tr("DEM preview"),
+ layer_id="dem_preview",
+ style_path=_STYLES_PATH / "elevation_model.qml",
+ visible_initially=False,
+ ),
config.VectorLayerConfig(
schema="reference",
table_name="diff_polygon",
diff --git a/components/qgis_plugin/src/pinta_qgis_plugin/resources/i18n/fi.qm b/components/qgis_plugin/src/pinta_qgis_plugin/resources/i18n/fi.qm
index bf4db89..a6dc05b 100644
Binary files a/components/qgis_plugin/src/pinta_qgis_plugin/resources/i18n/fi.qm and b/components/qgis_plugin/src/pinta_qgis_plugin/resources/i18n/fi.qm differ
diff --git a/components/qgis_plugin/src/pinta_qgis_plugin/resources/i18n/fi.ts b/components/qgis_plugin/src/pinta_qgis_plugin/resources/i18n/fi.ts
index 5d41b7d..c37d2b4 100644
--- a/components/qgis_plugin/src/pinta_qgis_plugin/resources/i18n/fi.ts
+++ b/components/qgis_plugin/src/pinta_qgis_plugin/resources/i18n/fi.ts
@@ -89,17 +89,17 @@
Erotusmalli, yli 20cm eroavaisuudet
-
+
Polygonized DEM difference
Muutospolygonit
-
+
Modification area suggestions
Muutosalue-ehdotukset
-
+
Cluster significance
Klusterin merkittävyys
@@ -179,29 +179,34 @@
Ajon tilan viimeinen muutosaika
-
+
Energy sum
Energiasumma
-
+
Relevance score
Merkittävyys
-
+
Cluster area
Klusterin pinta-ala
-
+
Update area
Muutosalue
-
+
Elevation
Korkeus
+
+
+ DEM preview
+ Tuloskorkeusmalli
+
diff --git a/components/qgis_plugin/test_qgis/processing/test_api_client.py b/components/qgis_plugin/test_qgis/processing/test_api_client.py
index 771182a..702eb18 100644
--- a/components/qgis_plugin/test_qgis/processing/test_api_client.py
+++ b/components/qgis_plugin/test_qgis/processing/test_api_client.py
@@ -82,6 +82,7 @@ def test_start_reference_dem_workflow_posts_workflow_payload(
"calculate_reference_dem": True,
"calculate_dem_diff": True,
"cluster_diff_polygons": True,
+ "initialize_dem_preview": True,
},
"production_area_id": "area-1",
},