diff --git a/components/dags/src/pinta_dags/config.py b/components/dags/src/pinta_dags/config.py
index b52507cc..5178fab0 100644
--- a/components/dags/src/pinta_dags/config.py
+++ b/components/dags/src/pinta_dags/config.py
@@ -36,6 +36,11 @@ class AirflowVariable(enum.StrEnum):
)
CALCULATE_DEM_DIFF_STAGING_TABLES = "pinta_calculate_dem_diff_staging_tables"
+ # Maximum number of update area dissolve pipelines running in parallel.
+ DISSOLVE_UPDATE_AREAS_MAX_PARALLEL_PIPELINES = (
+ "pinta_dissolve_update_areas_max_parallel_pipelines"
+ )
+
def connection_uri_template(conn_id: str) -> str:
"""Jinja template for a connection's SQLAlchemy URI with the psycopg3 driver."""
diff --git a/components/dags/src/pinta_dags/dags/dissolve_update_areas.py b/components/dags/src/pinta_dags/dags/dissolve_update_areas.py
new file mode 100644
index 00000000..460513cf
--- /dev/null
+++ b/components/dags/src/pinta_dags/dags/dissolve_update_areas.py
@@ -0,0 +1,142 @@
+# Copyright (c) 2026 National Land Survey of Finland
+# (https://www.maanmittauslaitos.fi/en).
+# This file is part of the Pinta.
+# Licensed under the MIT License; see the repository LICENSE file.
+
+import datetime
+from typing import cast
+
+from airflow.sdk import DAG, Param, Variable, dag, task
+from pinta_common import constants
+
+from pinta_dags import config
+from pinta_dags.config import AirflowVariable
+from pinta_dags.tasks import (
+ build_job_connection_uri_task,
+ find_update_area_geometries,
+ get_database_name,
+ set_processing_status_completed,
+ set_processing_status_failed,
+ set_processing_status_started,
+)
+
+
+def _get_max_parallel_pipelines() -> int:
+ var = AirflowVariable.DISSOLVE_UPDATE_AREAS_MAX_PARALLEL_PIPELINES
+ max_parallel = int(Variable.get(var, 4))
+ if max_parallel < 1:
+ msg = f"{var} must be at least 1"
+ raise ValueError(msg)
+ return max_parallel
+
+
+def create_dissolve_update_areas_dag(
+ *,
+ dag_id: str,
+) -> DAG:
+ @dag(
+ dag_id=dag_id,
+ tags=[dag_id],
+ dag_display_name="Dissolve update areas",
+ schedule=None,
+ params={
+ "id": Param(
+ "",
+ type="string",
+ format="uuid",
+ description=("Production area id as UUID"),
+ )
+ },
+ is_paused_upon_creation=False,
+ )
+ def dissolve_update_areas_dag() -> None:
+ # Precondition: the production area must already have its job database
+ # provisioned and database_name set for production area by orchestrator DAG.
+
+ @task.docker(
+ **config.PINTA_CONTAINER_TASK_ARGS,
+ max_active_tis_per_dag=_get_max_parallel_pipelines(),
+ # Parallel tasks merging into the same base/overview tiles can
+ # deadlock on the concurrent row updates; retry to ride out the loser.
+ retries=3,
+ retry_delay=datetime.timedelta(seconds=10),
+ )
+ def dissolve_update_area(
+ primary_connection_uri: str,
+ job_connection_uri: str,
+ geom_wkt: str,
+ ) -> None:
+ import sqlalchemy
+ import sqlmodel
+ from pinta_processing import pipelines
+
+ with (
+ sqlmodel.Session(
+ sqlalchemy.create_engine(primary_connection_uri)
+ ) as primary_session,
+ sqlmodel.Session(
+ sqlalchemy.create_engine(job_connection_uri)
+ ) as job_session,
+ ):
+ pipeline = pipelines.dissolve_update_area(
+ primary_session=primary_session,
+ job_session=job_session,
+ geom_wkt=geom_wkt,
+ )
+ pipeline.execute()
+
+ primary_connection_uri = config.connection_uri_template("pinta_processing_db")
+ job_connection_uri = config.connection_uri_template("pinta_job_db")
+
+ prod_area_id = "{{ params.id }}"
+
+ status_started = set_processing_status_started(
+ primary_connection_uri, prod_area_id
+ )
+ database_name = cast(
+ "str", get_database_name(primary_connection_uri, prod_area_id)
+ )
+ job_db_uri = cast(
+ "str",
+ build_job_connection_uri_task(
+ base_uri=job_connection_uri,
+ database_name=database_name,
+ ),
+ )
+ geom_wkt_list = find_update_area_geometries(job_db_uri)
+
+ dissolved_areas = dissolve_update_area.partial(
+ primary_connection_uri=primary_connection_uri,
+ job_connection_uri=job_db_uri,
+ ).expand(geom_wkt=geom_wkt_list)
+
+ status_completed = set_processing_status_completed(
+ primary_connection_uri, prod_area_id
+ )
+ status_failed = set_processing_status_failed(
+ primary_connection_uri, prod_area_id
+ )
+
+ # Stamp STARTED before any work, then run the dissolve chain.
+ status_started >> database_name
+ geom_wkt_list >> dissolved_areas
+
+ # Resolve the final status off every task that can fail (each is a direct
+ # upstream, so ONE_FAILED still fires when an early step fails and the
+ # mapped task never runs). NONE_FAILED marks COMPLETED otherwise.
+ processing_steps = [
+ status_started,
+ database_name,
+ job_db_uri,
+ geom_wkt_list,
+ dissolved_areas,
+ ]
+ processing_steps >> status_completed
+ processing_steps >> status_failed
+
+ return dissolve_update_areas_dag()
+
+
+DAG_ID = constants.DAG_ID_DISSOLVE_UPDATE_AREAS
+
+globals()[DAG_ID] = create_dissolve_update_areas_dag(dag_id=DAG_ID)
diff --git a/components/dags/src/pinta_dags/tasks.py b/components/dags/src/pinta_dags/tasks.py
index fd71e3df..cf986af8 100644
--- a/components/dags/src/pinta_dags/tasks.py
+++ b/components/dags/src/pinta_dags/tasks.py
@@ -5,7 +5,7 @@
"""Airflow tasks shared across Pinta DAGs."""
-from airflow.sdk import task
+from airflow.sdk import TriggerRule, task
from pinta_dags import config
@@ -78,6 +78,22 @@ def find_production_area_tile_geometries(
return [to_shape(tile.geom).wkt for tile in area_in_db.tiles]
+@task.docker(**config.PINTA_CONTAINER_TASK_ARGS)
+def find_update_area_geometries(
+ connection_uri: str,
+) -> list[str]:
+ """Return the geometries (as WKT) of all update areas in the job database."""
+ import sqlalchemy
+ import sqlmodel
+ from geoalchemy2.shape import to_shape
+ from pinta_db.job_db.models.user import UpdateArea
+
+ engine = sqlalchemy.create_engine(connection_uri)
+ with sqlmodel.Session(engine) as session:
+ update_areas = session.exec(sqlmodel.select(UpdateArea)).all()
+ return [to_shape(area.geom).wkt for area in update_areas]
+
+
@task
def build_job_connection_uri_task(
base_uri: str,
@@ -142,3 +158,68 @@ def merge_dem_staging_tables(
staging_tables=staging_tables,
session=session,
)
+
+
+@task.docker(**config.PINTA_CONTAINER_TASK_ARGS)
+def set_processing_status_started(connection_uri: str, production_area_id: str) -> None:
+ """Mark the production area as processing started."""
+ import sqlalchemy
+ import sqlmodel
+ from pinta_db.primary_db.models.management import ProcessingStatus, ProductionArea
+
+ engine = sqlalchemy.create_engine(connection_uri)
+ with sqlmodel.Session(engine) as session:
+ area_in_db = session.exec(
+ sqlmodel.select(ProductionArea).where(
+ ProductionArea.id == production_area_id
+ )
+ ).first()
+ if area_in_db:
+ area_in_db.processing_status = ProcessingStatus.STARTED
+ session.commit()
+
+
+@task.docker(
+ **config.PINTA_CONTAINER_TASK_ARGS,
+ trigger_rule=TriggerRule.NONE_FAILED,
+)
+def set_processing_status_completed(
+ connection_uri: str, production_area_id: str
+) -> None:
+ """Mark the production area as processing completed when nothing failed."""
+ import sqlalchemy
+ import sqlmodel
+ from pinta_db.primary_db.models.management import ProcessingStatus, ProductionArea
+
+ engine = sqlalchemy.create_engine(connection_uri)
+ with sqlmodel.Session(engine) as session:
+ area_in_db = session.exec(
+ sqlmodel.select(ProductionArea).where(
+ ProductionArea.id == production_area_id
+ )
+ ).first()
+ if area_in_db:
+ area_in_db.processing_status = ProcessingStatus.COMPLETED
+ session.commit()
+
+
+@task.docker(
+ **config.PINTA_CONTAINER_TASK_ARGS,
+ trigger_rule=TriggerRule.ONE_FAILED,
+)
+def set_processing_status_failed(connection_uri: str, production_area_id: str) -> None:
+ """Mark the production area as processing failed when any upstream failed."""
+ import sqlalchemy
+ import sqlmodel
+ from pinta_db.primary_db.models.management import ProcessingStatus, ProductionArea
+
+ engine = sqlalchemy.create_engine(connection_uri)
+ with sqlmodel.Session(engine) as session:
+ area_in_db = session.exec(
+ sqlmodel.select(ProductionArea).where(
+ ProductionArea.id == production_area_id
+ )
+ ).first()
+ if area_in_db:
+ area_in_db.processing_status = ProcessingStatus.FAILURE
+ session.commit()
diff --git a/components/dags/test_dags/dags/test_dissolve_update_areas.py b/components/dags/test_dags/dags/test_dissolve_update_areas.py
new file mode 100644
index 00000000..dc174fe9
--- /dev/null
+++ b/components/dags/test_dags/dags/test_dissolve_update_areas.py
@@ -0,0 +1,154 @@
+# Copyright (c) 2026 National Land Survey of Finland
+# (https://www.maanmittauslaitos.fi/en).
+# This file is part of the Pinta.
+# Licensed under the MIT License; see the repository LICENSE file.
+
+import uuid
+from typing import TYPE_CHECKING
+from unittest.mock import MagicMock
+
+import pytest
+from airflow.models import DagBag, dagbag
+
+from pinta_dags.dags import dissolve_update_areas
+
+if TYPE_CHECKING:
+ from airflow.sdk import DAG
+ from pytest_mock import MockerFixture
+
+
+def create_dag_to_test() -> "DAG":
+ dag = dissolve_update_areas.create_dissolve_update_areas_dag(
+ dag_id=f"some_id_{uuid.uuid4()}"
+ )
+
+ assert str(dag.dag_id).startswith("some_id")
+
+ dag_bag = DagBag(include_examples=False)
+ dag_bag.bag_dag(dag)
+ dagbag.sync_bag_to_db(dag_bag, "mock-dags", None)
+
+ return dag
+
+
+@pytest.fixture(autouse=True)
+def mock_airflow_settings(monkeypatch: "pytest.MonkeyPatch") -> None:
+ monkeypatch.setenv("AIRFLOW_CONN_PINTA_PROCESSING_DB", "postgres://mockaddr:123/db")
+ monkeypatch.setenv("AIRFLOW_CONN_PINTA_JOB_DB_ADMIN", "postgres://mockaddr:123/db")
+ monkeypatch.setenv("AIRFLOW_CONN_PINTA_JOB_DB", "postgres://mockaddr:123/db")
+ monkeypatch.setenv(
+ "AIRFLOW_VAR_PINTA_DISSOLVE_UPDATE_AREAS_MAX_PARALLEL_PIPELINES", "2"
+ )
+
+
+def test_dissolve_update_areas_all_tasks() -> None:
+ dag = create_dag_to_test()
+
+ assert set(dag.task_ids) == {
+ "set_processing_status_started",
+ "get_database_name",
+ "build_job_connection_uri_task",
+ "find_update_area_geometries",
+ "dissolve_update_area",
+ "set_processing_status_completed",
+ "set_processing_status_failed",
+ }
+
+
+def test_dependencies() -> None:
+ dag = create_dag_to_test()
+ assert dag is not None
+
+ status_started = dag.get_task("set_processing_status_started")
+ get_database_name = dag.get_task("get_database_name")
+ build_job_connection_uri_task = dag.get_task("build_job_connection_uri_task")
+ find_update_area_geometries = dag.get_task("find_update_area_geometries")
+ dissolve_update_area = dag.get_task("dissolve_update_area")
+
+ assert status_started.task_id in get_database_name.upstream_task_ids
+ assert get_database_name.task_id in build_job_connection_uri_task.upstream_task_ids
+ assert (
+ build_job_connection_uri_task.task_id
+ in find_update_area_geometries.upstream_task_ids
+ )
+ assert find_update_area_geometries.task_id in dissolve_update_area.upstream_task_ids
+
+
+def test_processing_status_tasks() -> None:
+ dag = create_dag_to_test()
+
+ status_completed = dag.get_task("set_processing_status_completed")
+ status_failed = dag.get_task("set_processing_status_failed")
+
+ # Both terminal status tasks fan in from every step that can fail so the
+ # status is always resolved, even when an early step fails.
+ expected_upstream = {
+ "set_processing_status_started",
+ "get_database_name",
+ "build_job_connection_uri_task",
+ "find_update_area_geometries",
+ "dissolve_update_area",
+ }
+ assert expected_upstream <= status_completed.upstream_task_ids
+ assert expected_upstream <= status_failed.upstream_task_ids
+
+ assert status_completed.trigger_rule == "none_failed"
+ assert status_failed.trigger_rule == "one_failed"
+
+
+def test_get_max_parallel_pipelines_reads_variable() -> None:
+ assert dissolve_update_areas._get_max_parallel_pipelines() == 2
+
+
+def test_get_max_parallel_pipelines_rejects_below_one(
+ monkeypatch: "pytest.MonkeyPatch",
+) -> None:
+ monkeypatch.setenv(
+ "AIRFLOW_VAR_PINTA_DISSOLVE_UPDATE_AREAS_MAX_PARALLEL_PIPELINES", "0"
+ )
+ with pytest.raises(ValueError, match="must be at least 1"):
+ dissolve_update_areas._get_max_parallel_pipelines()
+
+
+def test_get_max_parallel_pipelines_defaults_when_unset(
+ monkeypatch: "pytest.MonkeyPatch",
+) -> None:
+ monkeypatch.delenv(
+ "AIRFLOW_VAR_PINTA_DISSOLVE_UPDATE_AREAS_MAX_PARALLEL_PIPELINES",
+ raising=False,
+ )
+ assert dissolve_update_areas._get_max_parallel_pipelines() == 4
+
+
+def test_dissolve_update_area_builds_and_executes_pipeline(
+ mocker: "MockerFixture",
+) -> None:
+ mocker.patch("sqlalchemy.create_engine")
+ mocker.patch("sqlmodel.Session")
+ # Inject a mock pipelines module so the task body's ``from pinta_processing
+ # import pipelines`` resolves to the mock instead of importing the real
+ # module (which would load heavy DB modules and leak into the sys.modules
+ # patching other DAG tests rely on).
+ mock_pipeline = MagicMock()
+ mock_pipelines_module = MagicMock()
+ mock_pipelines_module.dissolve_update_area.return_value = mock_pipeline
+ mocker.patch.dict(
+ "sys.modules",
+ {"pinta_processing.pipelines": mock_pipelines_module},
+ )
+
+ dag = create_dag_to_test()
+ dissolve_update_area = dag.get_task("dissolve_update_area").python_callable
+
+ dissolve_update_area(
+ primary_connection_uri="postgres://primary",
+ job_connection_uri="postgres://job",
+ geom_wkt="POLYGON ((0 0, 0 1, 1 1, 1 0, 0 0))",
+ )
+
+ mock_pipelines_module.dissolve_update_area.assert_called_once()
+ kwargs = mock_pipelines_module.dissolve_update_area.call_args.kwargs
+ assert kwargs["geom_wkt"] == "POLYGON ((0 0, 0 1, 1 1, 1 0, 0 0))"
+ assert "primary_session" in kwargs
+ assert "job_session" in kwargs
+ mock_pipeline.execute.assert_called_once_with()
diff --git a/components/db/migrations/job/versions/010_add_dirty_flag_to_update_area.py b/components/db/migrations/job/versions/010_add_dirty_flag_to_update_area.py
new file mode 100644
index 00000000..88103731
--- /dev/null
+++ b/components/db/migrations/job/versions/010_add_dirty_flag_to_update_area.py
@@ -0,0 +1,45 @@
+# Copyright (c) 2026 National Land Survey of Finland
+# (https://www.maanmittauslaitos.fi/en).
+# This file is part of the Pinta.
+# Licensed under the MIT License; see the repository LICENSE file.
+
+"""Add dirty flag to update_area table
+
+Revision ID: 010
+Revises: 009
+Create Date: 2026-07-02
+
+"""
+
+from collections.abc import Sequence
+
+import sqlalchemy as sa
+from alembic import op
+
+# revision identifiers, used by Alembic.
+revision: str = "010"
+down_revision: str | Sequence[str] | None = "009"
+branch_labels: str | Sequence[str] | None = None
+depends_on: str | Sequence[str] | None = None
+
+_SCHEMA = "user_data"
+_TABLE = "update_area"
+
+
+def upgrade() -> None:
+ """Upgrade schema."""
+ op.add_column(
+ _TABLE,
+ sa.Column(
+ "dirty",
+ sa.Boolean(),
+ nullable=False,
+ server_default=sa.true(),
+ ),
+ schema=_SCHEMA,
+ )
+
+
+def downgrade() -> None:
+ """Downgrade schema."""
+ op.drop_column(_TABLE, "dirty", schema=_SCHEMA)
diff --git a/components/db/src/pinta_common/constants.py b/components/db/src/pinta_common/constants.py
index 4dc3bfa7..91b1647e 100644
--- a/components/db/src/pinta_common/constants.py
+++ b/components/db/src/pinta_common/constants.py
@@ -10,4 +10,5 @@
DAG_ID_CALCULATE_REFERENCE_DEM = "calculate_reference_dem"
DAG_ID_CALCULATE_DEM_DIFF = "calculate_dem_diff"
DAG_ID_INITIALIZE_DEM_PREVIEW = "initialize_dem_preview"
+DAG_ID_DISSOLVE_UPDATE_AREAS = "dissolve_update_areas"
DAG_ID_CALCULATE_RASTERS_FOR_PRODUCTION_AREA = "calculate_rasters_for_production_area"
diff --git a/components/db/src/pinta_db/job_db/models/user.py b/components/db/src/pinta_db/job_db/models/user.py
index b7afc8f4..cf851568 100644
--- a/components/db/src/pinta_db/job_db/models/user.py
+++ b/components/db/src/pinta_db/job_db/models/user.py
@@ -6,7 +6,7 @@
from typing import Any
from geoalchemy2 import Geometry
-from sqlalchemy import Column
+from sqlalchemy import Boolean, Column, true
from sqlmodel import Field
from pinta_common import Settings
@@ -21,6 +21,10 @@ class UpdateArea(job_base.UserVectorBase, table=True): # type: ignore[call-arg]
geom: Any = Field(
sa_column=Column(Geometry(POLYGON, srid=Settings.DB_SRID, nullable=False))
)
+ dirty: bool = Field(
+ default=True,
+ sa_column=Column(Boolean, nullable=False, server_default=true()),
+ )
class DemPreview(job_base.UserBase, base.RasterBase, table=True): # type: ignore[call-arg]
diff --git a/components/e2e/test_e2e/test_dissolve_update_areas.py b/components/e2e/test_e2e/test_dissolve_update_areas.py
new file mode 100644
index 00000000..7e9f47d9
--- /dev/null
+++ b/components/e2e/test_e2e/test_dissolve_update_areas.py
@@ -0,0 +1,186 @@
+# Copyright (c) 2026 National Land Survey of Finland
+# (https://www.maanmittauslaitos.fi/en).
+# This file is part of the Pinta.
+# Licensed under the MIT License; see the repository LICENSE file.
+
+import typing
+
+import pytest
+import sqlmodel
+from pinta_db.primary_db.models.management import ProcessingStatus, ProductionArea
+from pinta_db_test_utils import db_utils
+from pinta_db_utils import database_utils, engine_utils
+from pinta_e2e_utils import layers
+from pinta_e2e_utils.airflow_client import AirflowClient, DagRun
+
+if typing.TYPE_CHECKING:
+ from unittest.mock import MagicMock
+
+ from pinta_qgis_plugin.plugin import Plugin
+ from pytestqt.qtbot import QtBot
+ from sqlmodel import Session
+
+# The dissolve workflow only spins up a couple of short-lived task containers,
+# but the docker pulls / cold starts can still be slow, so keep a generous budget.
+WORKFLOW_TIMEOUT_S = 240.0
+
+# Constant value the seeded reference DEM is flattened to. Chosen well outside the
+# real DEM elevation range so a mean over the update area unambiguously tells the
+# original DEM apart from a preview that has taken the reference values.
+REFERENCE_DEM_VALUE = 1000.0
+
+# Radius (m) of the update area probe polygon around the DEM centroid. Kept small
+# so the primary DEM read (buffered 50 m) stays inside the seeded coverage.
+UPDATE_AREA_RADIUS_M = 15
+
+
+def _dem_preview_mean(database_name: str, area_ewkt: str) -> float:
+ """Return the mean of the job DEM preview clipped to ``area_ewkt``."""
+ credentials = db_utils.get_job_admin_credentials(database_name)
+ with engine_utils.get_autocommit_connection(credentials) as connection:
+ return connection.execute(
+ sqlmodel.text(
+ "SELECT (ST_SummaryStats(ST_Union("
+ " ST_Clip(rast, ST_GeomFromEWKT(:area), true)"
+ "))).mean "
+ "FROM user_data.dem_preview "
+ "WHERE ST_Intersects(rast, ST_GeomFromEWKT(:area))"
+ ).bindparams(area=area_ewkt)
+ ).scalar_one()
+
+
+@pytest.fixture
+def dissolve_update_area_setup(
+ seeded_processing_dem: int,
+ created_db: str,
+ db: "Session",
+) -> str:
+ """Provision the job database for a dissolve run without the DEM workflow.
+
+ ``seeded_processing_dem`` populates the primary ``dem.dem`` table. This fixture
+ then provisions a job database the way the orchestrator DAG would, copies the
+ primary DEM into ``user_data.dem_preview``, seeds a distinct constant-valued
+ ``reference.dem`` and creates an update area polygon. Returns the update area
+ geometry as EWKT so the test can probe the preview inside it.
+ """
+ production_area = db.exec(sqlmodel.select(ProductionArea)).first()
+ assert production_area is not None
+
+ # The orchestrator DAG normally provisions the job db and stamps
+ # database_name; do the same by hand so the dissolve DAG's precondition holds.
+ database_name = f"job_{production_area.id}"
+ production_area.database_name = database_name
+ db.add(production_area)
+ db.commit()
+
+ with engine_utils.get_autocommit_connection(
+ db_utils.get_job_admin_credentials("postgres")
+ ) as admin_connection:
+ database_utils.initialize_db_from_template(
+ admin_connection, database_name, replace_existing=True
+ )
+
+ primary = db_utils.get_primary_admin_credentials(created_db)
+ job = db_utils.get_job_admin_credentials(database_name)
+ with (
+ engine_utils.get_autocommit_connection(primary) as primary_connection,
+ engine_utils.get_autocommit_connection(job) as job_connection,
+ ):
+ tiles = (
+ primary_connection.execute(sqlmodel.text("SELECT rast::text FROM dem.dem"))
+ .scalars()
+ .all()
+ )
+ assert tiles, "No primary DEM tiles were seeded"
+ for tile in tiles:
+ # Copy the primary DEM verbatim into the preview.
+ job_connection.execute(
+ sqlmodel.text(
+ "INSERT INTO user_data.dem_preview (rast) "
+ "VALUES (CAST(:rast AS raster))"
+ ).bindparams(rast=tile)
+ )
+ # Seed the reference DEM as a flat surface at REFERENCE_DEM_VALUE,
+ # keeping the nodata mask, so it is clearly distinct from the preview.
+ job_connection.execute(
+ sqlmodel.text(
+ "INSERT INTO reference.dem (rast) "
+ "VALUES (ST_MapAlgebra(CAST(:rast AS raster), 1, '32BF', :value))"
+ ).bindparams(rast=tile, value=str(REFERENCE_DEM_VALUE))
+ )
+
+ area_ewkt = job_connection.execute(
+ sqlmodel.text(
+ "SELECT ST_AsEWKT(ST_Buffer(ST_Centroid(ST_Union(rast::geometry)), "
+ ":radius)) FROM user_data.dem_preview"
+ ).bindparams(radius=UPDATE_AREA_RADIUS_M)
+ ).scalar_one()
+ job_connection.execute(
+ sqlmodel.text(
+ "INSERT INTO user_data.update_area (id, geom, dirty) "
+ "VALUES (gen_random_uuid(), ST_GeomFromEWKT(:geom), true)"
+ ).bindparams(geom=area_ewkt)
+ )
+
+ return area_ewkt
+
+
+@pytest.mark.xdist_group("airflow")
+def test_dissolve_update_areas_workflow(
+ qgis_plugin: "Plugin",
+ qtbot: "QtBot",
+ m_error_dialog: "MagicMock",
+ airflow_client: "AirflowClient",
+ db: "Session",
+ dissolve_update_area_setup: str,
+) -> None:
+ # Importing here to avoid wrong DB name in environment
+ from pinta_qgis_plugin.api import api_client # noqa: PLC0415
+ from pinta_qgis_plugin.project.groups import ( # noqa: PLC0415
+ management_layer_collection,
+ )
+
+ area_ewkt = dissolve_update_area_setup
+ production_area = db.exec(sqlmodel.select(ProductionArea)).first()
+ assert production_area is not None
+ database_name = production_area.database_name
+ assert database_name is not None
+
+ # The preview starts from the primary DEM, i.e. the real elevations, well
+ # below the flat reference surface it should be dissolved to.
+ mean_before = _dem_preview_mean(database_name, area_ewkt)
+ assert mean_before != pytest.approx(REFERENCE_DEM_VALUE, abs=100)
+
+ production_area_layer = layers.get_vector_layer_by_model(ProductionArea)
+ assert production_area_layer.featureCount() == 1
+ action = layers.find_layer_action(
+ production_area_layer,
+ management_layer_collection.ACTION_TITLE_START_DISSOLVE_UPDATE_AREAS,
+ )
+ feature = next(production_area_layer.getFeatures())
+
+ client = api_client.get_api_client()
+ with qtbot.waitSignal(client.workflow_started, timeout=10000) as blocker:
+ layers.run_layer_action(production_area_layer, action, feature)
+ dag_id, dag_run_id = blocker.args
+
+ m_error_dialog.assert_not_called()
+ dag_run = DagRun(id=dag_id, run_id=dag_run_id)
+
+ state = airflow_client.wait_for_dag_run(dag_run, timeout=WORKFLOW_TIMEOUT_S)
+ assert state == "success", (
+ f"DAG run finished with state={state}\n"
+ f"{airflow_client.describe_failed_run(dag_run)}"
+ )
+
+ # The dissolve unions the reference DEM (priority) into the preview inside the
+ # update area, so the preview must now match the flat reference surface.
+ mean_after = _dem_preview_mean(database_name, area_ewkt)
+ assert mean_after == pytest.approx(REFERENCE_DEM_VALUE, abs=50)
+ assert abs(mean_after - mean_before) > 100
+
+ # The workflow stamps the production area processing status COMPLETED on success.
+ db.expire_all()
+ completed_area = db.exec(sqlmodel.select(ProductionArea)).first()
+ assert completed_area is not None
+ assert completed_area.processing_status == ProcessingStatus.COMPLETED
diff --git a/components/processing/src/pinta_processing/core.py b/components/processing/src/pinta_processing/core.py
index aec7d707..4be4a272 100644
--- a/components/processing/src/pinta_processing/core.py
+++ b/components/processing/src/pinta_processing/core.py
@@ -26,6 +26,16 @@ def __post_init__(self) -> None:
# Bypass frozen dataclass assignment for construction-time normalization.
object.__setattr__(self, "array", self.array.astype(np.float32, copy=False))
+ @property
+ def bounds(self) -> tuple[float, float, float, float]:
+ """Return the raster's map bounds as (left, bottom, right, top)."""
+ height, width = self.array.shape
+ left = self.transform.c
+ top = self.transform.f
+ right = left + self.transform.a * width
+ bottom = top + self.transform.e * height
+ return left, bottom, right, top
+
@staticmethod
def from_rasterio(src: rasterio.DatasetReader) -> "RasterDataset":
"""Construct dataset from rasterio reader."""
diff --git a/components/processing/src/pinta_processing/filters/__init__.py b/components/processing/src/pinta_processing/filters/__init__.py
index 3cd3a894..cd5a8f16 100644
--- a/components/processing/src/pinta_processing/filters/__init__.py
+++ b/components/processing/src/pinta_processing/filters/__init__.py
@@ -5,8 +5,10 @@
from pinta_processing.filters.diff import RasterDiff
from pinta_processing.filters.filter import RasterFilter
+from pinta_processing.filters.interpolate import RasterInterpolate
from pinta_processing.filters.multiply import MultiplyValues
from pinta_processing.filters.overview import DownsampleOverview
+from pinta_processing.filters.union import RasterUnion
from pinta_processing.filters.vectorize import VectorizeRaster
__all__ = [
@@ -14,5 +16,7 @@
"MultiplyValues",
"RasterDiff",
"RasterFilter",
+ "RasterInterpolate",
+ "RasterUnion",
"VectorizeRaster",
]
diff --git a/components/processing/src/pinta_processing/filters/interpolate.py b/components/processing/src/pinta_processing/filters/interpolate.py
new file mode 100644
index 00000000..181e0577
--- /dev/null
+++ b/components/processing/src/pinta_processing/filters/interpolate.py
@@ -0,0 +1,120 @@
+# Copyright (c) 2026 National Land Survey of Finland
+# (https://www.maanmittauslaitos.fi/en).
+# This file is part of the Pinta.
+# Licensed under the MIT License; see the repository LICENSE file.
+
+import numpy as np
+from rasterio import features
+from scipy.interpolate import griddata
+from scipy.ndimage import binary_dilation, generate_binary_structure
+from shapely import wkt as shapely_wkt
+from shapely.geometry.base import BaseGeometry
+
+from pinta_processing import core, exceptions
+
+_MIN_KNOWN_POINTS = 4
+_SAMPLING_MARGIN = 5
+
+
+class RasterInterpolate(core.Stage):
+ """Interpolate the raster pixels inside a polygon with cubic interpolation.
+
+ Pixels whose centre falls inside the polygon are recomputed with SciPy's
+ cubic griddata interpolation from the surrounding valid pixels. The raster
+ must hold enough valid data around the polygon for every target pixel
+ to fall within the interpolation domain.
+ """
+
+ def __init__(self, wkt: str) -> None:
+ super().__init__()
+ self.wkt = wkt
+ self._polygon = self._parse_polygon(wkt)
+
+ @staticmethod
+ def _parse_polygon(wkt: str) -> BaseGeometry:
+ """Parse the interpolation WKT and ensure it is a polygon."""
+ try:
+ geometry = shapely_wkt.loads(wkt)
+ except Exception as error:
+ message = f"RasterInterpolate polygon WKT could not be parsed: {error}"
+ raise ValueError(message) from error
+
+ if geometry.geom_type != "Polygon":
+ message = (
+ f"RasterInterpolate polygon must be a Polygon, got {geometry.geom_type}"
+ )
+ raise ValueError(message)
+ return geometry
+
+ def process(self, data: core.RasterDataset) -> core.RasterDataset:
+ """Interpolate the pixels inside the polygon and return the raster."""
+ if not isinstance(data, core.RasterDataset):
+ raise exceptions.InvalidStageInputError(
+ stage_name=RasterInterpolate.__name__,
+ expected_type=core.RasterDataset.__name__,
+ received_type=type(data).__name__,
+ )
+
+ array = data.array.astype(np.float64, copy=True)
+ if data.nodata is not None:
+ array[array == data.nodata] = np.nan
+
+ target_mask = features.rasterize(
+ [(self._polygon, 1)],
+ out_shape=array.shape,
+ transform=data.transform,
+ fill=0,
+ all_touched=False,
+ dtype="uint8",
+ ).astype(bool)
+
+ interpolated = self._interpolate(array, target_mask)
+
+ result = array.copy()
+ result[target_mask] = interpolated
+ if data.nodata is not None:
+ result[np.isnan(result)] = data.nodata
+
+ return core.RasterDataset(
+ array=result,
+ transform=data.transform,
+ crs=data.crs,
+ nodata=data.nodata,
+ )
+
+ def _interpolate(self, array: np.ndarray, target_mask: np.ndarray) -> np.ndarray:
+ """Cubic-interpolate the target pixels from surrounding valid data."""
+ if not target_mask.any():
+ message = "RasterInterpolate polygon does not cover any raster pixels"
+ raise ValueError(message)
+
+ known_mask = self._sampling_mask(target_mask) & ~np.isnan(array) & ~target_mask
+ if int(known_mask.sum()) < _MIN_KNOWN_POINTS:
+ message = (
+ "RasterInterpolate has too little data around the polygon to "
+ "interpolate"
+ )
+ raise ValueError(message)
+
+ rows, cols = np.indices(array.shape)
+ points = np.column_stack((rows[known_mask], cols[known_mask]))
+ values = array[known_mask]
+ targets = np.column_stack((rows[target_mask], cols[target_mask]))
+
+ interpolated = griddata(points, values, targets, method="cubic")
+
+ if np.isnan(interpolated).any():
+ message = (
+ "RasterInterpolate has too little data around the polygon to "
+ "interpolate"
+ )
+ raise ValueError(message)
+ return interpolated
+
+ @staticmethod
+ def _sampling_mask(target_mask: np.ndarray) -> np.ndarray:
+ """Return the band of pixels around the polygon used for sampling."""
+ structure = generate_binary_structure(2, 2)
+ return binary_dilation(
+ target_mask, structure=structure, iterations=_SAMPLING_MARGIN
+ )
diff --git a/components/processing/src/pinta_processing/filters/union.py b/components/processing/src/pinta_processing/filters/union.py
new file mode 100644
index 00000000..e6efbcfe
--- /dev/null
+++ b/components/processing/src/pinta_processing/filters/union.py
@@ -0,0 +1,141 @@
+# Copyright (c) 2026 National Land Survey of Finland
+# (https://www.maanmittauslaitos.fi/en).
+# This file is part of the Pinta.
+# Licensed under the MIT License; see the repository LICENSE file.
+
+import math
+
+import affine
+import numpy as np
+
+from pinta_processing import core, exceptions
+
+# Relative tolerance for comparing pixel sizes across datasets.
+_PIXEL_SIZE_RELATIVE_TOLERANCE = 1e-9
+
+
+class RasterUnion(core.Stage):
+ """Union several aligned raster datasets into one, the last dataset winning.
+
+ Concat input rasters to a single raster covering the union of their extents. For
+ individual pixels last non nodata value wins.
+
+ The inputs must already share the same CRS and pixel size; this stage neither
+ reprojects nor resamples and raises if they do not match.
+ """
+
+ def process(self, data: tuple[core.RasterDataset, ...]) -> core.RasterDataset:
+ """Merge the input rasters over their union, the last dataset winning."""
+ datasets = self._validate_input(data)
+ self._validate_alignment(datasets)
+
+ transform, height, width = self._build_merged_transform(datasets)
+
+ merged = np.full((height, width), np.nan, dtype=np.float64)
+ for dataset in datasets:
+ placed = self._place_on_grid(dataset, transform, height, width)
+ valid = ~np.isnan(placed)
+ merged[valid] = placed[valid]
+
+ nodata = next((d.nodata for d in datasets if d.nodata is not None), None)
+ if nodata is not None:
+ merged[np.isnan(merged)] = nodata
+
+ return core.RasterDataset(
+ array=merged,
+ transform=transform,
+ crs=datasets[0].crs,
+ nodata=nodata,
+ )
+
+ @staticmethod
+ def _validate_input(
+ data: tuple[core.RasterDataset, ...],
+ ) -> tuple[core.RasterDataset, ...]:
+ """Ensure the input is a non-empty tuple of raster datasets."""
+ if (
+ not isinstance(data, tuple)
+ or len(data) == 0
+ or not all(isinstance(item, core.RasterDataset) for item in data)
+ ):
+ raise exceptions.InvalidStageInputError(
+ stage_name=RasterUnion.__name__,
+ expected_type="tuple[RasterDataset, ...]",
+ received_type=type(data).__name__,
+ )
+ return data
+
+ @staticmethod
+ def _validate_alignment(datasets: tuple[core.RasterDataset, ...]) -> None:
+ """Ensure all datasets share the same CRS and pixel size."""
+ reference = datasets[0]
+ for dataset in datasets[1:]:
+ if dataset.crs != reference.crs:
+ msg = (
+ "RasterUnion inputs must share the same CRS, got "
+ f"{reference.crs} and {dataset.crs}"
+ )
+ raise ValueError(msg)
+ if not (
+ math.isclose(
+ dataset.transform.a,
+ reference.transform.a,
+ rel_tol=_PIXEL_SIZE_RELATIVE_TOLERANCE,
+ )
+ and math.isclose(
+ dataset.transform.e,
+ reference.transform.e,
+ rel_tol=_PIXEL_SIZE_RELATIVE_TOLERANCE,
+ )
+ ):
+ msg = (
+ "RasterUnion inputs must share the same pixel size, got "
+ f"({reference.transform.a}, {reference.transform.e}) and "
+ f"({dataset.transform.a}, {dataset.transform.e})"
+ )
+ raise ValueError(msg)
+
+ @staticmethod
+ def _to_nan_array(dataset: core.RasterDataset) -> np.ndarray:
+ """Return the raster values as float64 with nodata replaced by nan."""
+ array = dataset.array.astype(np.float64, copy=True)
+ if dataset.nodata is not None:
+ array[array == dataset.nodata] = np.nan
+ return array
+
+ def _build_merged_transform(
+ self, datasets: tuple[core.RasterDataset, ...]
+ ) -> tuple[affine.Affine, int, int]:
+ """Build the transform and shape of the grid covering all datasets."""
+ lefts, bottoms, rights, tops = zip(
+ *(dataset.bounds for dataset in datasets), strict=True
+ )
+ left = min(lefts)
+ top = max(tops)
+ right = max(rights)
+ bottom = min(bottoms)
+ transform = affine.Affine(
+ datasets[0].transform.a, 0.0, left, 0.0, datasets[0].transform.e, top
+ )
+ width = round((right - left) / transform.a)
+ height = round((bottom - top) / transform.e)
+ return transform, height, width
+
+ def _place_on_grid(
+ self,
+ dataset: core.RasterDataset,
+ transform: affine.Affine,
+ height: int,
+ width: int,
+ ) -> np.ndarray:
+ """Position dataset inside given grid, filling gaps with nan."""
+ col_offset = round((dataset.transform.c - transform.c) / transform.a)
+ row_offset = round((dataset.transform.f - transform.f) / transform.e)
+ grid = np.full((height, width), np.nan, dtype=np.float64)
+ source = self._to_nan_array(dataset)
+ source_height, source_width = source.shape
+ grid[
+ row_offset : row_offset + source_height,
+ col_offset : col_offset + source_width,
+ ] = source
+ return grid
diff --git a/components/processing/src/pinta_processing/pipelines.py b/components/processing/src/pinta_processing/pipelines.py
index 25d02eca..28d88626 100644
--- a/components/processing/src/pinta_processing/pipelines.py
+++ b/components/processing/src/pinta_processing/pipelines.py
@@ -9,10 +9,11 @@
import numpy as np
from pinta_common import Settings
-from pinta_db.job_db.models import reference
+from pinta_db.job_db.models import reference, user
from pinta_db.primary_db.models import dem
from pinta_db_utils import model_utils
from pinta_db_utils.postgis import raster
+from shapely import wkt as shapely_wkt
from sqlmodel import Session
from pinta_processing import core, filters, reader, writer
@@ -20,6 +21,8 @@
from pinta_processing.utils import tm35_map_sheet_utils
DEFAULT_BUFFERED = 300
+DISSOLVE_PRIMARY_DEM_BUFFER = 50 # Read dem from primary db around the update area
+DISSOLVE_INTERPOLATE_AREA_BUFFER = 4 # Interpolate zone around the update area
DEFAULT_LASTOOLS_PARAMS = {
"buffered": DEFAULT_BUFFERED,
"kill": 300,
@@ -172,6 +175,57 @@ def calculate_diff_models(
)
+def dissolve_update_area(
+ primary_session: Session,
+ job_session: Session,
+ geom_wkt: str,
+) -> core.Pipeline:
+ """Merge the primary and reference DEM and smooth the update area seam.
+
+ - Read primary DEM buffered by DISSOLVE_PRIMARY_DEM_BUFFER around udpate area.
+ - Read reference DEM buffered by DISSOLVE_INTERPOLATE_AREA_BUFFER around udpate area
+ - Union the DEMs, reference dem has priority.
+ - Interpolate DISSOLVE_INTERPOLATE_AREA_BUFFER meters wide donut outside the update
+ area to smooth the seam.
+
+ The blended result is merged into dem_preview and its overviews. Overviews are
+ downsampled from the blended patch, so under concurrent update-area tasks a shared
+ overview tile may end up with slightly stale value (overview are visualization-only)
+ Only dem_preview needs to be eventually consistent, which the
+ tile-level merge guarantees.
+ """
+ geom = shapely_wkt.loads(geom_wkt)
+ primary_dem_area = geom.buffer(DISSOLVE_PRIMARY_DEM_BUFFER)
+ reference_dem_area = geom.buffer(DISSOLVE_INTERPOLATE_AREA_BUFFER)
+ buffer_zone_area = geom.buffer(DISSOLVE_INTERPOLATE_AREA_BUFFER).difference(geom)
+
+ dem_schema, dem_table = model_utils.schema_and_table(dem.Dem)
+ reference_schema, reference_dem_table = model_utils.schema_and_table(reference.Dem)
+ preview_schema, preview_table = model_utils.schema_and_table(user.DemPreview)
+
+ return (
+ reader.PostgisReader(
+ dem_schema, dem_table, primary_session, primary_dem_area.wkt
+ )
+ | core.Zip(
+ reader.PostgisReader(
+ reference_schema,
+ reference_dem_table,
+ job_session,
+ reference_dem_area.wkt,
+ )
+ )
+ | filters.RasterUnion()
+ | filters.RasterInterpolate(buffer_zone_area.wkt)
+ | _generate_overview_stages(
+ preview_schema, preview_table, job_session, staging_tables=0, mode="update"
+ )
+ | writer.RasterPostgisWriter(
+ preview_schema, preview_table, job_session, mode="update"
+ )
+ )
+
+
def postgis_to_postgis( # noqa: PLR0913
from_session: Session,
from_schema: str,
@@ -200,6 +254,7 @@ def _generate_overview_stages(
table_name: str,
session: Session,
staging_tables: int,
+ mode: writer.WriterMode = "insert",
) -> core.Stage:
return functools.reduce(
operator.or_,
@@ -213,6 +268,7 @@ def _generate_overview_stages(
),
session,
staging_tables,
+ mode,
)
)
for level in raster.DEFAULT_OVERVIEW_LEVELS
@@ -220,13 +276,14 @@ def _generate_overview_stages(
)
-def _overview_to_postgis(
+def _overview_to_postgis( # noqa: PLR0913
factor: int,
schema: str,
table_name: str,
session: Session,
staging_tables: int,
+ mode: writer.WriterMode = "insert",
) -> core.Pipeline:
return filters.DownsampleOverview(factor) | writer.RasterPostgisWriter(
- schema, table_name, session, staging_tables
+ schema, table_name, session, staging_tables, mode=mode
)
diff --git a/components/processing/src/pinta_processing/writer/__init__.py b/components/processing/src/pinta_processing/writer/__init__.py
index 9335559e..7ed8ebaa 100644
--- a/components/processing/src/pinta_processing/writer/__init__.py
+++ b/components/processing/src/pinta_processing/writer/__init__.py
@@ -4,6 +4,15 @@
# Licensed under the MIT License; see the repository LICENSE file.
from pinta_processing.writer.geotiff import GeotiffWriter
-from pinta_processing.writer.postgis import RasterPostgisWriter, VectorPostgisWriter
+from pinta_processing.writer.postgis import (
+ RasterPostgisWriter,
+ VectorPostgisWriter,
+ WriterMode,
+)
-__all__ = ["GeotiffWriter", "RasterPostgisWriter", "VectorPostgisWriter"]
+__all__ = [
+ "GeotiffWriter",
+ "RasterPostgisWriter",
+ "VectorPostgisWriter",
+ "WriterMode",
+]
diff --git a/components/processing/src/pinta_processing/writer/postgis.py b/components/processing/src/pinta_processing/writer/postgis.py
index 59622d55..72b57701 100644
--- a/components/processing/src/pinta_processing/writer/postgis.py
+++ b/components/processing/src/pinta_processing/writer/postgis.py
@@ -14,6 +14,7 @@
import math
import struct
from dataclasses import dataclass
+from typing import Literal
import numpy as np
import sqlmodel
@@ -25,6 +26,15 @@
LOGGER = logging.getLogger(__name__)
+WriterMode = Literal["insert", "update"]
+
+# Output pixel type for merged tiles, matching the float32 DEM band.
+_MERGE_PIXEL_TYPE = "32BF"
+
+# When updating existing tiles, TEMP table holding incoming tiles before they are
+# merged into the target table.
+_UPDATE_STAGING_TABLE = "_pinta_raster_update_staging"
+
@dataclass
class DataTypeConfig:
@@ -51,19 +61,24 @@ def from_numpy_dtype(numpy_dtype: str) -> "DataTypeConfig":
class RasterPostgisWriter(core.Stage):
"""Write raster data to PostGIS table using COPY FROM stdin."""
- def __init__(
+ def __init__( # noqa: PLR0913
self,
schema: str,
table_name: str,
session: sqlmodel.Session,
staging_tables: int = 0,
tile_size: int | None = None,
+ mode: WriterMode = "insert",
) -> None:
super().__init__()
+ if mode == "update" and staging_tables != 0:
+ msg = "update mode requires staging_tables=0"
+ raise ValueError(msg)
self.schema = schema
self.table_name = table_name
self.session = session
self.staging_tables = staging_tables
+ self.mode = mode
self.tile_size = (
tile_size if tile_size is not None else Settings.DB_DEFAULT_TILE_SIZE
)
@@ -80,6 +95,9 @@ def process(self, data: core.RasterDataset | None) -> None:
if data.crs is None:
msg = "CRS is required for writing to PostGIS. "
raise ValueError(msg)
+
+ if self.mode == "update":
+ return self._update_to_postgis(data)
if self.staging_tables == 0:
return self._write_to_postgis(data, self.table_name)
@@ -99,6 +117,82 @@ def _write_to_postgis(self, data: core.RasterDataset, table_name: str) -> None:
raw_connection.commit()
+ def _update_to_postgis(self, data: core.RasterDataset) -> None:
+ """Merge raster tiles into an existing PostGIS table.
+
+ Incoming pixels overwrite the target, target pixels are
+ kept where the incoming tile is nodata. Tiles with no matching row yet
+ are inserted.
+ """
+ tiles = [tile for tile in self._generate_tiles(data) if _tile_has_data(tile)]
+ if not tiles:
+ LOGGER.info(
+ "No data to merge into table %s.%s", self.schema, self.table_name
+ )
+ return
+
+ raw_connection = self.session.connection().connection
+ nodata_value = data.nodata if data.nodata is not None else 0.0
+ LOGGER.info("Merging data into table %s.%s", self.schema, self.table_name)
+ try:
+ with raw_connection.cursor() as cursor:
+ cursor.execute(
+ f"CREATE TEMP TABLE {_UPDATE_STAGING_TABLE} "
+ "(rast raster) ON COMMIT DROP"
+ )
+ copy_sql = f"COPY {_UPDATE_STAGING_TABLE} (rast) FROM STDIN"
+ with cursor.copy(copy_sql) as copy:
+ for tile in tiles:
+ raster_bytes = self._raster_dataset_to_postgis_bytes(tile)
+ copy.write(raster_bytes.hex() + "\n")
+
+ cursor.execute(self._merge_update_sql(), (nodata_value, nodata_value))
+ cursor.execute(self._merge_insert_sql())
+
+ raw_connection.commit()
+ except Exception:
+ raw_connection.rollback()
+ raise
+
+ def _merge_update_sql(self) -> str:
+ """SQL merging staged tiles into existing rows via ST_MapAlgebra.
+
+ Matches rows by extent. Takes two nodata
+ parameters: the both-nodata fill value and the merged band's nodata.
+ """
+ # Schema/table come from trusted model metadata, not user input.
+ target = f"{self.schema}.{self.table_name}"
+ return f"""
+ UPDATE {target} AS target
+ SET rast = ST_SetBandNoDataValue(
+ ST_MapAlgebra(
+ target.rast, staging.rast,
+ '[rast2]', -- both valid: incoming wins
+ '{_MERGE_PIXEL_TYPE}', -- output pixel type
+ 'FIRST', -- keep target extent
+ '[rast2]', -- target nodata: use incoming
+ '[rast1]', -- incoming nodata: keep target
+ %s -- both nodata: fill value
+ ),
+ %s
+ )
+ FROM {_UPDATE_STAGING_TABLE} AS staging
+ WHERE target.rast::geometry = staging.rast::geometry
+ """ # noqa: S608
+
+ def _merge_insert_sql(self) -> str:
+ # Schema/table come from trusted model metadata, not user input.
+ target = f"{self.schema}.{self.table_name}"
+ return f"""
+ INSERT INTO {target} (rast)
+ SELECT staging.rast
+ FROM {_UPDATE_STAGING_TABLE} AS staging
+ WHERE NOT EXISTS (
+ SELECT 1 FROM {target} AS target
+ WHERE target.rast::geometry = staging.rast::geometry
+ )
+ """ # noqa: S608
+
def _generate_tiles(
self,
data: core.RasterDataset,
@@ -121,6 +215,10 @@ def _generate_tiles(
tile_height = self.tile_size * pixel_height
height, width = data.array.shape[:2]
+
+ if height == 0 or width == 0:
+ return []
+
nodata_value = data.nodata if data.nodata is not None else 0.0
# Raster bounds
@@ -272,6 +370,16 @@ def _bits_to_int(*bits: tuple[int, int]) -> int:
return int("".join(f"{value:0{size}b}" for value, size in bits), 2)
+def _tile_has_data(tile: core.RasterDataset) -> bool:
+ """Return True if the tile has at least one non-nodata pixel."""
+ nodata = tile.nodata
+ if nodata is None:
+ return True
+ if isinstance(nodata, float) and math.isnan(nodata):
+ return bool(np.any(~np.isnan(tile.array)))
+ return bool(np.any(tile.array != nodata))
+
+
class VectorPostgisWriter(core.Stage):
"""Write vector data to PostGIS table."""
diff --git a/components/processing/test_processing/pinta_processing/filter/test_interpolate.py b/components/processing/test_processing/pinta_processing/filter/test_interpolate.py
new file mode 100644
index 00000000..072e40ea
--- /dev/null
+++ b/components/processing/test_processing/pinta_processing/filter/test_interpolate.py
@@ -0,0 +1,153 @@
+# Copyright (c) 2026 National Land Survey of Finland
+# (https://www.maanmittauslaitos.fi/en).
+# This file is part of the Pinta.
+# Licensed under the MIT License; see the repository LICENSE file.
+
+from typing import Any
+
+import affine
+import numpy as np
+import pytest
+from shapely.geometry import box
+
+from pinta_processing import core, exceptions
+from pinta_processing.filters import RasterInterpolate
+from pinta_processing_test_utils import constants
+
+# Transform maps pixel (row, col) centre to (col + 0.5, -(row + 0.5)).
+_TRANSFORM = affine.Affine(1.0, 0.0, 0.0, 0.0, -1.0, 0.0)
+
+
+def _dataset(
+ array: np.ndarray,
+ *,
+ transform: affine.Affine = _TRANSFORM,
+ crs: str = constants.DEFAULT_CRS,
+ nodata: float | None = None,
+) -> core.RasterDataset:
+ return core.RasterDataset(
+ array=np.asarray(array, dtype=np.float32),
+ transform=transform,
+ crs=crs,
+ nodata=nodata,
+ )
+
+
+def _pixel_box_wkt(row: int, col: int) -> str:
+ """WKT box selecting the single pixel at (row, col)."""
+ return box(col, -(row + 1), col + 1, -row).wkt
+
+
+def _linear_field(rows: int, cols: int) -> np.ndarray:
+ """A planar field z = row + col, exactly reproducible by cubic interpolation."""
+ return np.fromfunction(lambda r, c: r + c, (rows, cols), dtype=np.float64)
+
+
+def test_interpolate_replaces_polygon_pixels_from_surrounding_data():
+ field = _linear_field(7, 7)
+ corrupted = field.copy()
+ corrupted[3, 3] = 999.0
+ dataset = _dataset(corrupted)
+ stage = RasterInterpolate(_pixel_box_wkt(3, 3))
+
+ result = stage.process(dataset)
+
+ # Cubic interpolation of a planar field recovers the true value ~= 6.0.
+ assert np.isclose(result.array[3, 3], 6.0, atol=1e-3)
+
+
+def test_interpolate_leaves_pixels_outside_polygon_untouched():
+ field = _linear_field(7, 7)
+ corrupted = field.copy()
+ corrupted[3, 3] = 999.0
+ dataset = _dataset(corrupted)
+ stage = RasterInterpolate(_pixel_box_wkt(3, 3))
+
+ result = stage.process(dataset)
+
+ outside = np.ones((7, 7), dtype=bool)
+ outside[3, 3] = False
+ assert np.allclose(result.array[outside], corrupted[outside])
+
+
+def test_interpolate_preserves_metadata():
+ dataset = _dataset(_linear_field(7, 7), nodata=constants.DEFAULT_NODATA)
+ stage = RasterInterpolate(_pixel_box_wkt(3, 3))
+
+ result = stage.process(dataset)
+
+ assert result.transform == dataset.transform
+ assert result.crs == constants.DEFAULT_CRS
+ assert result.nodata == constants.DEFAULT_NODATA
+
+
+def test_interpolate_does_not_modify_input():
+ field = _linear_field(7, 7)
+ dataset = _dataset(field)
+ original = dataset.array.copy()
+ stage = RasterInterpolate(_pixel_box_wkt(3, 3))
+
+ stage.process(dataset)
+
+ assert np.allclose(dataset.array, original)
+
+
+@pytest.mark.parametrize(
+ "invalid_wkt",
+ [
+ "POINT (1 1)",
+ "LINESTRING (0 0, 1 1)",
+ ],
+ ids=["point", "linestring"],
+)
+def test_interpolate_rejects_non_polygon(invalid_wkt: str):
+ with pytest.raises(ValueError, match="Polygon"):
+ RasterInterpolate(invalid_wkt)
+
+
+def test_interpolate_rejects_unparseable_wkt():
+ with pytest.raises(ValueError, match="could not be parsed"):
+ RasterInterpolate("not wkt")
+
+
+def test_interpolate_raises_when_polygon_covers_no_pixels():
+ dataset = _dataset(_linear_field(5, 5))
+ stage = RasterInterpolate(box(100, 100, 101, 101).wkt)
+
+ with pytest.raises(ValueError, match="does not cover any raster pixels"):
+ stage.process(dataset)
+
+
+def test_interpolate_raises_when_data_does_not_surround_polygon():
+ # Valid data only in the top rows, so a middle pixel is outside the
+ # interpolation domain (no data below it).
+ array = np.full((8, 8), constants.DEFAULT_NODATA, dtype=np.float32)
+ array[0:2, :] = 10.0
+ dataset = _dataset(array, nodata=constants.DEFAULT_NODATA)
+ stage = RasterInterpolate(_pixel_box_wkt(4, 4))
+
+ with pytest.raises(ValueError, match="too little data around the polygon"):
+ stage.process(dataset)
+
+
+def test_interpolate_raises_when_too_few_known_points():
+ array = np.full((5, 5), constants.DEFAULT_NODATA, dtype=np.float32)
+ array[0, 0] = 1.0
+ array[0, 1] = 2.0
+ dataset = _dataset(array, nodata=constants.DEFAULT_NODATA)
+ stage = RasterInterpolate(_pixel_box_wkt(2, 2))
+
+ with pytest.raises(ValueError, match="too little data around the polygon"):
+ stage.process(dataset)
+
+
+@pytest.mark.parametrize(
+ "invalid_input",
+ ["not a dataset", None, (_dataset(np.zeros((3, 3))),)],
+ ids=["string", "none", "tuple"],
+)
+def test_interpolate_invalid_input_raises_error(invalid_input: Any):
+ stage = RasterInterpolate(_pixel_box_wkt(1, 1))
+
+ with pytest.raises(exceptions.InvalidStageInputError):
+ stage.process(invalid_input)
diff --git a/components/processing/test_processing/pinta_processing/filter/test_union.py b/components/processing/test_processing/pinta_processing/filter/test_union.py
new file mode 100644
index 00000000..a1157b05
--- /dev/null
+++ b/components/processing/test_processing/pinta_processing/filter/test_union.py
@@ -0,0 +1,164 @@
+# Copyright (c) 2026 National Land Survey of Finland
+# (https://www.maanmittauslaitos.fi/en).
+# This file is part of the Pinta.
+# Licensed under the MIT License; see the repository LICENSE file.
+
+from typing import Any
+
+import affine
+import numpy as np
+import pytest
+
+from pinta_processing import core, exceptions
+from pinta_processing.filters import RasterUnion
+from pinta_processing_test_utils import constants
+
+_TRANSFORM = affine.Affine(1.0, 0.0, 0.0, 0.0, -1.0, 0.0)
+
+
+def _dataset(
+ array: np.ndarray,
+ *,
+ transform: affine.Affine = _TRANSFORM,
+ crs: str = constants.DEFAULT_CRS,
+ nodata: float | None = None,
+) -> core.RasterDataset:
+ return core.RasterDataset(
+ array=np.asarray(array, dtype=np.float32),
+ transform=transform,
+ crs=crs,
+ nodata=nodata,
+ )
+
+
+def test_union_last_dataset_wins_on_overlap():
+ first = _dataset(np.full((2, 2), 1.0))
+ second = _dataset(np.full((2, 2), 2.0))
+ third = _dataset(np.full((2, 2), 3.0))
+
+ result = RasterUnion().process((first, second, third))
+
+ assert np.allclose(result.array, 3.0)
+
+
+def test_union_fills_from_earlier_where_later_is_nodata():
+ first = _dataset(np.full((2, 2), 1.0))
+ second_array = np.array([[2.0, constants.DEFAULT_NODATA], [2.0, 2.0]])
+ second = _dataset(second_array, nodata=constants.DEFAULT_NODATA)
+
+ result = RasterUnion().process((first, second))
+
+ # Later dataset wins everywhere except its nodata cell, kept from the first.
+ expected = np.array([[2.0, 1.0], [2.0, 2.0]])
+ assert np.allclose(result.array, expected)
+
+
+def test_union_combines_disjoint_extents():
+ # Left raster covers columns 0..1, right raster columns 2..3.
+ left = _dataset(np.full((2, 2), 1.0))
+ right_transform = affine.Affine(1.0, 0.0, 2.0, 0.0, -1.0, 0.0)
+ right = _dataset(np.full((2, 2), 2.0), transform=right_transform)
+
+ result = RasterUnion().process((left, right))
+
+ assert result.array.shape == (2, 4)
+ assert result.transform == _TRANSFORM
+ assert np.allclose(result.array[:, :2], 1.0)
+ assert np.allclose(result.array[:, 2:], 2.0)
+
+
+def test_union_extends_extent_and_overlaps():
+ # Base covers columns 0..2, overlay columns 1..3, overlapping on column 1..2.
+ base = _dataset(np.full((2, 3), 1.0))
+ overlay_transform = affine.Affine(1.0, 0.0, 1.0, 0.0, -1.0, 0.0)
+ overlay = _dataset(np.full((2, 3), 2.0), transform=overlay_transform)
+
+ result = RasterUnion().process((base, overlay))
+
+ assert result.array.shape == (2, 4)
+ # Column 0 only from base, columns 1..3 from the winning overlay.
+ expected = np.array([1.0, 2.0, 2.0, 2.0])
+ assert np.allclose(result.array, np.tile(expected, (2, 1)))
+
+
+def test_union_fills_gaps_with_nodata():
+ # Two disjoint rasters leave the opposite corners uncovered.
+ top_left = _dataset(np.full((2, 2), 5.0), nodata=constants.DEFAULT_NODATA)
+ bottom_right_transform = affine.Affine(1.0, 0.0, 2.0, 0.0, -1.0, -2.0)
+ bottom_right = _dataset(
+ np.full((2, 2), 7.0),
+ transform=bottom_right_transform,
+ nodata=constants.DEFAULT_NODATA,
+ )
+
+ result = RasterUnion().process((top_left, bottom_right))
+
+ assert result.array.shape == (4, 4)
+ assert result.nodata == constants.DEFAULT_NODATA
+ # Uncovered cells are filled with nodata.
+ assert result.array[0, 2] == constants.DEFAULT_NODATA
+ assert result.array[2, 0] == constants.DEFAULT_NODATA
+ assert result.array[0, 0] == 5.0
+ assert result.array[2, 2] == 7.0
+
+
+def test_union_single_dataset_returns_equivalent_raster():
+ only = _dataset(np.array([[1.0, 2.0], [3.0, 4.0]]))
+
+ result = RasterUnion().process((only,))
+
+ assert np.allclose(result.array, only.array)
+ assert result.transform == only.transform
+
+
+def test_union_preserves_crs():
+ first = _dataset(np.full((2, 2), 1.0), crs="EPSG:3067")
+ second = _dataset(np.full((2, 2), 2.0), crs="EPSG:3067")
+
+ result = RasterUnion().process((first, second))
+
+ assert result.crs == "EPSG:3067"
+
+
+def test_union_does_not_modify_inputs():
+ first = _dataset(np.full((2, 2), 1.0))
+ second = _dataset(np.full((2, 2), 2.0))
+ original_first = first.array.copy()
+ original_second = second.array.copy()
+
+ RasterUnion().process((first, second))
+
+ assert np.allclose(first.array, original_first)
+ assert np.allclose(second.array, original_second)
+
+
+@pytest.mark.parametrize(
+ "invalid_input",
+ [
+ "not a tuple",
+ None,
+ (),
+ (_dataset(np.zeros((2, 2))), "not a dataset"),
+ ],
+ ids=["string", "none", "empty-tuple", "non-dataset-member"],
+)
+def test_union_invalid_input_raises_error(invalid_input: Any):
+ with pytest.raises(exceptions.InvalidStageInputError):
+ RasterUnion().process(invalid_input)
+
+
+def test_union_rejects_mismatched_crs():
+ first = _dataset(np.full((2, 2), 1.0), crs="EPSG:3067")
+ second = _dataset(np.full((2, 2), 2.0), crs="EPSG:4326")
+
+ with pytest.raises(ValueError, match="same CRS"):
+ RasterUnion().process((first, second))
+
+
+def test_union_rejects_mismatched_pixel_size():
+ first = _dataset(np.full((2, 2), 1.0))
+ coarse_transform = affine.Affine(2.0, 0.0, 0.0, 0.0, -2.0, 0.0)
+ second = _dataset(np.full((2, 2), 2.0), transform=coarse_transform)
+
+ with pytest.raises(ValueError, match="same pixel size"):
+ RasterUnion().process((first, second))
diff --git a/components/processing/test_processing/pinta_processing/test_core.py b/components/processing/test_processing/pinta_processing/test_core.py
index d9719f0a..b460ecd4 100644
--- a/components/processing/test_processing/pinta_processing/test_core.py
+++ b/components/processing/test_processing/pinta_processing/test_core.py
@@ -5,6 +5,7 @@
import copy
+import affine
import numpy as np
import pytest
import pytest_mock
@@ -81,6 +82,19 @@ def test_raster_dataset_converts_array_to_float32():
assert dataset.array.dtype == np.float32
+def test_raster_dataset_bounds():
+ # 3 rows x 4 cols, 1 m pixels, origin at (10, 20) top-left.
+ dataset = core.RasterDataset(
+ array=np.zeros((3, 4), dtype=np.float32),
+ transform=affine.Affine(1.0, 0.0, 10.0, 0.0, -1.0, 20.0),
+ crs=constants.DEFAULT_CRS,
+ nodata=None,
+ )
+
+ # bounds are ordered left, bottom, right, top
+ assert dataset.bounds == (10.0, 17.0, 14.0, 20.0)
+
+
def test_pipeline_executes_all_stages(mocker: pytest_mock.MockerFixture):
pipeline = DummyStage() | DummyStage() | DummyStage()
diff --git a/components/processing/test_processing/pinta_processing/test_pipelines.py b/components/processing/test_processing/pinta_processing/test_pipelines.py
index 2ef77687..fcc4dbb8 100644
--- a/components/processing/test_processing/pinta_processing/test_pipelines.py
+++ b/components/processing/test_processing/pinta_processing/test_pipelines.py
@@ -6,7 +6,9 @@
from pathlib import Path
from unittest.mock import MagicMock
+from pinta_db_utils.postgis import raster
from pytest_mock import MockerFixture
+from shapely import wkt as shapely_wkt
from pinta_processing import pipelines
@@ -66,3 +68,114 @@ def test_blast2dem_to_postgis_override_extra_param_defaults(
"ll": [111, 222],
"neighbors": ["a.laz", "b.laz", "c.laz"],
}
+
+
+def test_dissolve_update_area_unions_and_interpolates_donut(
+ mocker: MockerFixture,
+) -> None:
+ postgis_reader = mocker.patch(
+ "pinta_processing.reader.PostgisReader",
+ return_value=MagicMock(),
+ )
+ union = mocker.patch(
+ "pinta_processing.filters.RasterUnion",
+ return_value=MagicMock(),
+ )
+ interpolate = mocker.patch(
+ "pinta_processing.filters.RasterInterpolate",
+ return_value=MagicMock(),
+ )
+ downsample = mocker.patch(
+ "pinta_processing.filters.DownsampleOverview",
+ return_value=MagicMock(),
+ )
+ postgis_writer = mocker.patch(
+ "pinta_processing.writer.RasterPostgisWriter",
+ return_value=MagicMock(),
+ )
+ primary_session = MagicMock()
+ job_session = MagicMock()
+ geom_wkt = "POLYGON ((0 0, 0 10, 10 10, 10 0, 0 0))"
+ geom = shapely_wkt.loads(geom_wkt)
+
+ pipelines.dissolve_update_area(
+ primary_session=primary_session,
+ job_session=job_session,
+ geom_wkt=geom_wkt,
+ )
+
+ # Two readers: primary DEM (50 m buffer) and reference DEM (4 m buffer).
+ assert postgis_reader.call_count == 2
+ primary_call, reference_call = postgis_reader.call_args_list
+ assert primary_call.args[2] is primary_session
+ assert reference_call.args[2] is job_session
+
+ primary_wkt = shapely_wkt.loads(primary_call.args[3])
+ reference_wkt = shapely_wkt.loads(reference_call.args[3])
+ _assert_geometries_match(
+ primary_wkt, geom.buffer(pipelines.DISSOLVE_PRIMARY_DEM_BUFFER)
+ )
+ _assert_geometries_match(
+ reference_wkt, geom.buffer(pipelines.DISSOLVE_INTERPOLATE_AREA_BUFFER)
+ )
+
+ # The two DEMs are unioned before the seam is interpolated.
+ union.assert_called_once_with()
+
+ # The interpolate stage receives the donut ring between 4 m buffer and geom.
+ donut = shapely_wkt.loads(interpolate.call_args.args[0])
+ _assert_geometries_match(
+ donut, geom.buffer(pipelines.DISSOLVE_INTERPOLATE_AREA_BUFFER).difference(geom)
+ )
+ # The donut has a hole (the update area) cut out of it.
+ assert len(donut.interiors) == 1
+
+ # The blended patch is merged into dem_preview and its overviews: one base
+ # writer plus one writer per overview level, all in update mode.
+ levels = raster.DEFAULT_OVERVIEW_LEVELS
+ assert downsample.call_count == len(levels)
+ assert postgis_writer.call_count == 1 + len(levels)
+ assert all(
+ call.kwargs["mode"] == "update" for call in postgis_writer.call_args_list
+ )
+
+
+def test_postgis_to_postgis(
+ mocker: MockerFixture,
+) -> None:
+ postgis_reader = mocker.patch(
+ "pinta_processing.reader.PostgisReader",
+ return_value=MagicMock(),
+ )
+ postgis_writer = mocker.patch(
+ "pinta_processing.writer.RasterPostgisWriter",
+ return_value=MagicMock(),
+ )
+ from_session = MagicMock()
+ to_session = MagicMock()
+
+ pipelines.postgis_to_postgis(
+ from_session=from_session,
+ from_schema="dem",
+ from_table="dem",
+ to_session=to_session,
+ to_schema="user_data",
+ to_table="dem_preview",
+ tile_wkt="POINT (0 0)",
+ staging_tables=2,
+ )
+
+ # Reads the source table with the source session and tile geometry.
+ postgis_reader.assert_called_once_with("dem", "dem", from_session, "POINT (0 0)")
+
+ # The final writer targets the destination table/session with staging tables.
+ # (Overview writers also use RasterPostgisWriter, so assert on the last call.)
+ assert (
+ mocker.call("user_data", "dem_preview", to_session, 2)
+ in postgis_writer.call_args_list
+ )
+
+
+def _assert_geometries_match(actual: object, expected: object) -> None:
+ # WKT round-tripping perturbs vertices slightly, so compare with tolerance.
+ assert actual.symmetric_difference(expected).area < 1e-6 # type: ignore[attr-defined]
diff --git a/components/processing/test_processing/pinta_processing/writer/test_postgis.py b/components/processing/test_processing/pinta_processing/writer/test_postgis.py
index 6ba92a94..b6b9437d 100644
--- a/components/processing/test_processing/pinta_processing/writer/test_postgis.py
+++ b/components/processing/test_processing/pinta_processing/writer/test_postgis.py
@@ -11,6 +11,7 @@
from shapely.geometry import Point
from pinta_processing import core, exceptions
+from pinta_processing.filters import DownsampleOverview
from pinta_processing.writer import RasterPostgisWriter, VectorPostgisWriter
@@ -156,6 +157,118 @@ def test_resolve_partition_different_locations(dataset: core.RasterDataset):
)
+def _mock_update_session(mocker: MockerFixture) -> tuple:
+ """Session whose raw connection exposes cursor/copy as context managers."""
+ session = mocker.MagicMock()
+ raw_connection = session.connection.return_value.connection
+ cursor = raw_connection.cursor.return_value.__enter__.return_value
+ cursor.copy.return_value.__enter__.return_value = mocker.MagicMock()
+ return session, raw_connection, cursor
+
+
+def test_generate_tiles_returns_empty_for_zero_sized_raster():
+ """A degenerate raster (zero-sized dimension) yields no tiles."""
+ empty = core.RasterDataset(
+ array=np.empty((0, 0), dtype=np.float32),
+ transform=Affine.identity(),
+ crs="EPSG:3067",
+ nodata=-9999.0,
+ )
+ stage = RasterPostgisWriter("foo", "bar", None, tile_size=4) # type: ignore[arg-type]
+
+ assert stage._generate_tiles(empty) == []
+
+
+def test_update_mode_skips_raster_downsampled_below_one_pixel(
+ dataset: core.RasterDataset, mocker: MockerFixture
+):
+ """A small patch downsampled past its own size must not crash the writer.
+
+ Reproduces the overview edge case: a tiny update-area patch downsampled for a
+ high overview level (factor 128) collapses to a 0x0 raster, which previously
+ raised a rasterio WindowError while tiling.
+ """
+ session, raw_connection, cursor = _mock_update_session(mocker)
+ downsampled = DownsampleOverview(factor=128).process(dataset)
+ assert downsampled is not None
+ assert downsampled.array.size == 0 # 2x2 // 128 -> 0x0
+
+ stage = RasterPostgisWriter(
+ "myschema", "mytable", session, staging_tables=0, mode="update"
+ )
+ stage.process(downsampled)
+
+ cursor.execute.assert_not_called()
+ raw_connection.commit.assert_not_called()
+ raw_connection.rollback.assert_not_called()
+
+
+def test_update_mode_rejects_staging_tables():
+ """Update mode is only supported with staging_tables=0."""
+ with pytest.raises(ValueError, match="staging_tables"):
+ RasterPostgisWriter("foo", "bar", None, staging_tables=1, mode="update") # type: ignore[arg-type]
+
+
+def test_update_mode_merges_and_commits(
+ dataset: core.RasterDataset, mocker: MockerFixture
+):
+ """Update mode stages tiles, runs merge + insert, and commits once."""
+ session, raw_connection, cursor = _mock_update_session(mocker)
+
+ stage = RasterPostgisWriter(
+ "myschema", "mytable", session, staging_tables=0, tile_size=4, mode="update"
+ )
+ stage.process(dataset)
+
+ executed_sql = " ".join(str(call.args[0]) for call in cursor.execute.call_args_list)
+ assert "CREATE TEMP TABLE" in executed_sql
+ assert "ST_MapAlgebra" in executed_sql
+ assert "INSERT INTO myschema.mytable" in executed_sql
+
+ raw_connection.commit.assert_called_once()
+ raw_connection.rollback.assert_not_called()
+
+
+def test_update_mode_skips_all_nodata_data(
+ dataset: core.RasterDataset, mocker: MockerFixture
+):
+ """A fully nodata input stages nothing and merges nothing."""
+ session, raw_connection, cursor = _mock_update_session(mocker)
+ nodata_dataset = core.RasterDataset(
+ array=np.full_like(dataset.array, dataset.nodata),
+ transform=dataset.transform,
+ crs=dataset.crs,
+ nodata=dataset.nodata,
+ )
+
+ stage = RasterPostgisWriter(
+ "myschema", "mytable", session, staging_tables=0, tile_size=4, mode="update"
+ )
+ stage.process(nodata_dataset)
+
+ cursor.execute.assert_not_called()
+ raw_connection.commit.assert_not_called()
+ raw_connection.rollback.assert_not_called()
+
+
+def test_update_mode_rolls_back_on_failure(
+ dataset: core.RasterDataset, mocker: MockerFixture
+):
+ """A failure mid-write rolls back so no half-updated rows remain."""
+ session, raw_connection, cursor = _mock_update_session(mocker)
+ cursor.execute.side_effect = [None, RuntimeError("merge failed")]
+
+ stage = RasterPostgisWriter(
+ "myschema", "mytable", session, staging_tables=0, tile_size=4, mode="update"
+ )
+
+ with pytest.raises(RuntimeError, match="merge failed"):
+ stage.process(dataset)
+
+ raw_connection.rollback.assert_called_once()
+ raw_connection.commit.assert_not_called()
+
+
def test_vector_postgis_writer_writes_to_postgis(
vector_dataset: core.VectorDataset, mocker: MockerFixture
):
diff --git a/components/qgis_plugin/src/pinta_qgis_plugin/api/api_client.py b/components/qgis_plugin/src/pinta_qgis_plugin/api/api_client.py
index 59c40ab5..1e69b8c2 100644
--- a/components/qgis_plugin/src/pinta_qgis_plugin/api/api_client.py
+++ b/components/qgis_plugin/src/pinta_qgis_plugin/api/api_client.py
@@ -106,6 +106,19 @@ def start_reference_dem_workflow(self, production_area_id: str) -> None:
tr("Reference DEM workflow task created successfully"), success=True
)
+ @handle_api_errors(ApiEndpoint.workflows)
+ def start_dissolve_update_areas_workflow(self, production_area_id: str) -> None:
+ """Starts a dissolve update areas workflow for the given production area."""
+ self._start_workflow(
+ constants.DAG_ID_DISSOLVE_UPDATE_AREAS,
+ {"id": production_area_id},
+ production_area_id=production_area_id,
+ )
+ MsgBar.info(
+ tr("Dissolve update areas workflow task created successfully"),
+ success=True,
+ )
+
def _start_workflow(
self,
dag_tag: str,
diff --git a/components/qgis_plugin/src/pinta_qgis_plugin/layers/config.py b/components/qgis_plugin/src/pinta_qgis_plugin/layers/config.py
index cf5be0bc..d274c580 100644
--- a/components/qgis_plugin/src/pinta_qgis_plugin/layers/config.py
+++ b/components/qgis_plugin/src/pinta_qgis_plugin/layers/config.py
@@ -74,6 +74,9 @@ class VectorLayerConfig(DatabaseLayerConfig):
)
read_only: bool = False
read_only_fields: list[str] = dataclasses.field(default_factory=list)
+ # Map of field name -> QGIS expression evaluated to the field's default value
+ # when a new feature is created (e.g. {"id": "uuid('WithoutBraces')"}).
+ default_expressions: dict[str, str] = dataclasses.field(default_factory=dict)
subset_string: str | None = None
@@ -97,6 +100,7 @@ def create( # noqa: PLR0913
style_path: Path | None = None,
read_only: bool = False, # noqa: FBT001, FBT002
read_only_fields: list[str] | None = None,
+ default_expressions: dict[str, str] | None = None,
value_maps: list[ValueMapConfig] | None = None,
visible_initially: bool = True, # noqa: FBT001, FBT002
) -> "ModelLayerConfig":
@@ -122,6 +126,7 @@ def create( # noqa: PLR0913
read_only_fields=[key_column]
if read_only_fields is None
else read_only_fields,
+ default_expressions=default_expressions or {},
value_maps=value_maps,
visible_initially=visible_initially,
)
diff --git a/components/qgis_plugin/src/pinta_qgis_plugin/layers/utils.py b/components/qgis_plugin/src/pinta_qgis_plugin/layers/utils.py
index 1e17255f..52ac4491 100644
--- a/components/qgis_plugin/src/pinta_qgis_plugin/layers/utils.py
+++ b/components/qgis_plugin/src/pinta_qgis_plugin/layers/utils.py
@@ -16,7 +16,12 @@
# You should have received a copy of the GNU General Public License
# along with Pinta QGIS Plugin. If not, see .
-from qgis.core import QgsEditorWidgetSetup, QgsMapLayer, QgsVectorLayer
+from qgis.core import (
+ QgsDefaultValue,
+ QgsEditorWidgetSetup,
+ QgsMapLayer,
+ QgsVectorLayer,
+)
from pinta_qgis_plugin.layers.config import BaseLayerConfig
@@ -35,6 +40,24 @@ def set_read_only_fields(layer: QgsVectorLayer, field_names: list[str]) -> None:
layer.setEditFormConfig(form_config)
+def set_default_value_expressions(
+ layer: QgsVectorLayer, default_expressions: dict[str, str]
+) -> None:
+ """Set QGIS default value expressions for the given fields.
+
+ Each expression is evaluated when a new feature is created, e.g. to generate
+ a UUID for a not-null primary key that has no database-side default.
+ """
+ if not default_expressions:
+ return
+
+ fields = layer.fields()
+ for field_name, expression in default_expressions.items():
+ field_index = fields.lookupField(field_name)
+ if field_index >= 0:
+ layer.setDefaultValueDefinition(field_index, QgsDefaultValue(expression))
+
+
def set_field_aliases(layer: QgsMapLayer, aliases: dict[str, str]) -> None:
"""Set field aliases for layer types that expose QGIS field alias APIs."""
if (
diff --git a/components/qgis_plugin/src/pinta_qgis_plugin/layers/vector_layer.py b/components/qgis_plugin/src/pinta_qgis_plugin/layers/vector_layer.py
index 6ee6cab5..7c96406a 100644
--- a/components/qgis_plugin/src/pinta_qgis_plugin/layers/vector_layer.py
+++ b/components/qgis_plugin/src/pinta_qgis_plugin/layers/vector_layer.py
@@ -59,6 +59,7 @@ def create_vector_layer(
layer.setReadOnly(config.read_only)
utils.set_field_aliases(layer, config.aliases)
utils.set_read_only_fields(layer, [LAYER_ID_COLUMN, *config.read_only_fields])
+ utils.set_default_value_expressions(layer, config.default_expressions)
if config.style_path is not None:
styles.apply_style(layer, config.style_path)
diff --git a/components/qgis_plugin/src/pinta_qgis_plugin/project/config/job_layers.py b/components/qgis_plugin/src/pinta_qgis_plugin/project/config/job_layers.py
index a2d6ff4e..42411e58 100644
--- a/components/qgis_plugin/src/pinta_qgis_plugin/project/config/job_layers.py
+++ b/components/qgis_plugin/src/pinta_qgis_plugin/project/config/job_layers.py
@@ -103,6 +103,11 @@
aliases={
**config.COMMON_ALIASES,
"elevation": tr("Elevation"),
+ "dirty": tr("Dirty"),
},
+ read_only_fields=["dirty"],
+ # The id primary key has no database-side default, so generate a UUID
+ # client-side when a new update area is digitised.
+ default_expressions={"id": "uuid('WithoutBraces')"},
),
]
diff --git a/components/qgis_plugin/src/pinta_qgis_plugin/project/groups/management_layer_collection.py b/components/qgis_plugin/src/pinta_qgis_plugin/project/groups/management_layer_collection.py
index aa241267..5d5bfde9 100644
--- a/components/qgis_plugin/src/pinta_qgis_plugin/project/groups/management_layer_collection.py
+++ b/components/qgis_plugin/src/pinta_qgis_plugin/project/groups/management_layer_collection.py
@@ -36,6 +36,7 @@
ACTION_TITLE_OPEN_PRODUCTION_AREA_LAYERS = tr("Open production area")
ACTION_TITLE_START_REFERENCE_DEM_WORKFLOW = tr("Start reference DEM workflow")
+ACTION_TITLE_START_DISSOLVE_UPDATE_AREAS = tr("Dissolve update areas")
class ManagementLayerCollection(BaseLayerCollection):
@@ -53,6 +54,7 @@ def _add_to_project(self) -> None:
if layer_config.layer_id == "production_area":
_add_open_production_area_layers_action(layer)
_add_start_reference_dem_workflow_action(layer)
+ _add_start_dissolve_update_areas_action(layer)
def _add_open_production_area_layers_action(layer: QgsVectorLayer) -> None:
@@ -86,3 +88,18 @@ def _add_start_reference_dem_workflow_action(layer: QgsVectorLayer) -> None:
short_title=ACTION_TITLE_START_REFERENCE_DEM_WORKFLOW,
command=command,
)
+
+
+def _add_start_dissolve_update_areas_action(layer: QgsVectorLayer) -> None:
+ """Add start dissolve update areas action to the layer."""
+ command = textwrap.dedent("""
+ from pinta_qgis_plugin.workflows import update_area
+ job_id = \'[%id%]\'
+ update_area.start_dissolve_update_areas_workflow(job_id)
+ """)
+ layer_utils.add_action_to_vector_layer(
+ layer,
+ description=tr("Start dissolve update areas workflow for production area"),
+ short_title=ACTION_TITLE_START_DISSOLVE_UPDATE_AREAS,
+ command=command,
+ )
diff --git a/components/qgis_plugin/src/pinta_qgis_plugin/resources/i18n/fi.qm b/components/qgis_plugin/src/pinta_qgis_plugin/resources/i18n/fi.qm
index a6dc05bf..d5f9d58a 100644
Binary files a/components/qgis_plugin/src/pinta_qgis_plugin/resources/i18n/fi.qm and b/components/qgis_plugin/src/pinta_qgis_plugin/resources/i18n/fi.qm differ
diff --git a/components/qgis_plugin/src/pinta_qgis_plugin/resources/i18n/fi.ts b/components/qgis_plugin/src/pinta_qgis_plugin/resources/i18n/fi.ts
index c37d2b4f..bdea0a2d 100644
--- a/components/qgis_plugin/src/pinta_qgis_plugin/resources/i18n/fi.ts
+++ b/components/qgis_plugin/src/pinta_qgis_plugin/resources/i18n/fi.ts
@@ -14,7 +14,7 @@
Tunniste
-
+
Add production area related layers to map
Lisää tuotantoalueen tasot karttalle
@@ -139,17 +139,17 @@
Hallitsematon virhe tapahtui
-
+
Could not start workflow
Laskennan käynnistäminen epäonnistui
-
+
Check log for more details
Tarkista loki lisätietojen varalta
-
+
Start reference DEM workflow for production area
Käynnistä vertausmallin työnkulku tuotantoalueelle
@@ -159,7 +159,7 @@
Käynnistä vertausmallin työnkulku
-
+
Reference DEM workflow task created successfully
Vertausmallin laskennan työnkulku käynnistettiin onnistuneesti
@@ -208,5 +208,25 @@
DEM preview
Tuloskorkeusmalli
+
+
+ Dissolve update areas workflow task created successfully
+ Tuloskorkeusmallin päivityksen työnkulku käynnistettiin onnistuneesti
+
+
+
+ Dissolve update areas
+ Päivitä tuloskorkeusmalli
+
+
+
+ Start dissolve update areas workflow for production area
+ Käynnistä tuloskorkeusmallin päivityksen työnkulku tuotantoalueelle
+
+
+
+ Dirty
+ Päivittämättä
+
diff --git a/components/qgis_plugin/src/pinta_qgis_plugin/workflows/update_area.py b/components/qgis_plugin/src/pinta_qgis_plugin/workflows/update_area.py
new file mode 100644
index 00000000..dfcca39a
--- /dev/null
+++ b/components/qgis_plugin/src/pinta_qgis_plugin/workflows/update_area.py
@@ -0,0 +1,26 @@
+# Copyright (C) 2026 Pinta QGIS Plugin Contributors.
+#
+#
+# This file is part of Pinta QGIS Plugin.
+#
+# Pinta QGIS Plugin is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 2 of the License, or
+# (at your option) any later version.
+#
+# Pinta QGIS Plugin is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with Pinta QGIS Plugin. If not, see .
+
+from pinta_qgis_plugin.api import api_client
+from pinta_qgis_plugin.utils import messages
+
+
+@messages.popup_if_fails
+def start_dissolve_update_areas_workflow(production_area_id: str) -> None:
+ """Starts a dissolve update areas workflow for the given production area."""
+ api_client.get_api_client().start_dissolve_update_areas_workflow(production_area_id)
diff --git a/components/qgis_plugin/test_qgis/layers/collections/test_management_layer_collection.py b/components/qgis_plugin/test_qgis/layers/collections/test_management_layer_collection.py
index 91274575..a98bc716 100644
--- a/components/qgis_plugin/test_qgis/layers/collections/test_management_layer_collection.py
+++ b/components/qgis_plugin/test_qgis/layers/collections/test_management_layer_collection.py
@@ -93,11 +93,17 @@ def test_add_to_project_adds_actions_to_production_area_layer(
"_add_start_reference_dem_workflow_action",
autospec=True,
)
+ mock_add_dissolve_action = mocker.patch.object(
+ management_layer_collection,
+ "_add_start_dissolve_update_areas_action",
+ autospec=True,
+ )
layer_collection.add_to_project()
mock_add_open_action.assert_called_once_with(production_area_layer)
mock_add_dem_update_action.assert_called_once_with(production_area_layer)
+ mock_add_dissolve_action.assert_called_once_with(production_area_layer)
def test_add_open_production_area_layers_delegates_to_layer_utils(
@@ -151,6 +157,32 @@ def test_add_start_reference_dem_processing_action_delegates_to_layer_utils(
assert "[%id%]" in command
+def test_add_start_dissolve_update_areas_action_delegates_to_layer_utils(
+ mocker: MockerFixture,
+):
+ layer = mocker.MagicMock()
+ mock_add_action = mocker.patch.object(
+ management_layer_collection.layer_utils,
+ "add_action_to_vector_layer",
+ autospec=True,
+ )
+
+ management_layer_collection._add_start_dissolve_update_areas_action(layer)
+
+ mock_add_action.assert_called_once()
+ call = mock_add_action.call_args
+ assert call.args == (layer,)
+ assert (
+ call.kwargs["description"]
+ == "Start dissolve update areas workflow for production area"
+ )
+ assert call.kwargs["short_title"] == "Dissolve update areas"
+ command = call.kwargs["command"]
+ assert "from pinta_qgis_plugin.workflows import update_area" in command
+ assert "update_area.start_dissolve_update_areas_workflow(job_id)" in command
+ assert "[%id%]" in command
+
+
def test_remove_layers_removes_all_layers(
production_area_layer: QgsVectorLayer,
layer_collection: management_layer_collection.ManagementLayerCollection,
diff --git a/components/qgis_plugin/test_qgis/layers/test_vector_layer.py b/components/qgis_plugin/test_qgis/layers/test_vector_layer.py
index 1abdfcb9..28d94991 100644
--- a/components/qgis_plugin/test_qgis/layers/test_vector_layer.py
+++ b/components/qgis_plugin/test_qgis/layers/test_vector_layer.py
@@ -175,6 +175,34 @@ def test_create_layer_sets_value_maps(mocker: MockerFixture, mock_uri: MagicMock
}
+def test_create_layer_sets_default_value_expressions(
+ mocker: MockerFixture, mock_uri: MagicMock
+):
+ layer = QgsVectorLayer(
+ "MultiPolygon?field=id:string&field=name:string", "", "memory"
+ )
+ mocker.patch.object(
+ vector_layer,
+ "_create_qgs_vector_layer",
+ autospec=True,
+ return_value=layer,
+ )
+ layer_config = config.VectorLayerConfig(
+ schema="user_data",
+ table_name="update_area",
+ layer_name="Update area",
+ layer_id="update_area",
+ key_column="id",
+ wkb_type=management_layers.PRODUCTION_AREA.wkb_type,
+ default_expressions={"id": "uuid('WithoutBraces')"},
+ )
+
+ vector_layer.create_vector_layer(layer_config, PROVIDER, mock_uri)
+
+ default_value = layer.defaultValueDefinition(layer.fields().lookupField("id"))
+ assert default_value.expression() == "uuid('WithoutBraces')"
+
+
def test_create_layer_with_invalid_layer_raises_exception(
mocker: MockerFixture,
):
diff --git a/components/qgis_plugin/test_qgis/processing/test_api_client.py b/components/qgis_plugin/test_qgis/processing/test_api_client.py
index 702eb18d..ffad406c 100644
--- a/components/qgis_plugin/test_qgis/processing/test_api_client.py
+++ b/components/qgis_plugin/test_qgis/processing/test_api_client.py
@@ -197,6 +197,42 @@ def test_start_reference_dem_workflow_http_error_without_response_uses_fallback_
assert exc_info.value.bar_msg["details"] == "Check log for more details"
+def test_start_dissolve_update_areas_workflow_posts_workflow_payload(
+ client: api_client.PintaAPIClient,
+ mock_post: MagicMock,
+ mocker: MockerFixture,
+) -> None:
+ mocker.patch.object(api_client, "MsgBar", autospec=True)
+
+ client.start_dissolve_update_areas_workflow("area-1")
+
+ expected_url = (
+ f"http://example.test/workflows/{constants.DAG_ID_DISSOLVE_UPDATE_AREAS}"
+ )
+ mock_post.assert_called_once_with(
+ expected_url,
+ json={
+ "parameters": {"id": "area-1"},
+ "production_area_id": "area-1",
+ },
+ timeout=10,
+ )
+ mock_post.return_value.raise_for_status.assert_called_once_with()
+
+
+def test_start_dissolve_update_areas_workflow_shows_success_message(
+ client: api_client.PintaAPIClient,
+ mock_post: MagicMock,
+ mocker: MockerFixture,
+) -> None:
+ mock_msg_bar = mocker.patch.object(api_client, "MsgBar", autospec=True)
+
+ client.start_dissolve_update_areas_workflow("area-1")
+
+ mock_msg_bar.info.assert_called_once()
+ assert mock_msg_bar.info.call_args.kwargs == {"success": True}
+
+
def test_get_api_client_is_cached() -> None:
api_client.get_api_client.cache_clear()
try:
diff --git a/whitelist.txt b/whitelist.txt
index 4835c47d..62d918d7 100644
--- a/whitelist.txt
+++ b/whitelist.txt
@@ -181,3 +181,8 @@ pyqt
rasters
lte
gt
+datasets
+griddata
+rasterize
+isclose
+tol