diff --git a/components/dags/src/pinta_dags/config.py b/components/dags/src/pinta_dags/config.py index b52507cc..5178fab0 100644 --- a/components/dags/src/pinta_dags/config.py +++ b/components/dags/src/pinta_dags/config.py @@ -36,6 +36,11 @@ class AirflowVariable(enum.StrEnum): ) CALCULATE_DEM_DIFF_STAGING_TABLES = "pinta_calculate_dem_diff_staging_tables" + # Maximum number of update area dissolve pipelines running in parallel. + DISSOLVE_UPDATE_AREAS_MAX_PARALLEL_PIPELINES = ( + "pinta_dissolve_update_areas_max_parallel_pipelines" + ) + def connection_uri_template(conn_id: str) -> str: """Jinja template for a connection's SQLAlchemy URI with the psycopg3 driver.""" diff --git a/components/dags/src/pinta_dags/dags/dissolve_update_areas.py b/components/dags/src/pinta_dags/dags/dissolve_update_areas.py new file mode 100644 index 00000000..460513cf --- /dev/null +++ b/components/dags/src/pinta_dags/dags/dissolve_update_areas.py @@ -0,0 +1,142 @@ +# Copyright (c) 2026 National Land Survey of Finland +# (https://www.maanmittauslaitos.fi/en). +# This file is part of the Pinta. +# Licensed under the MIT License; see the repository LICENSE file. + +import datetime +from typing import cast + +from airflow.sdk import DAG, Param, Variable, dag, task +from pinta_common import constants + +from pinta_dags import config +from pinta_dags.config import AirflowVariable +from pinta_dags.tasks import ( + build_job_connection_uri_task, + find_update_area_geometries, + get_database_name, + set_processing_status_completed, + set_processing_status_failed, + set_processing_status_started, +) + + +def _get_max_parallel_pipelines() -> int: + var = AirflowVariable.DISSOLVE_UPDATE_AREAS_MAX_PARALLEL_PIPELINES + max_parallel = int(Variable.get(var, 4)) + if max_parallel < 1: + msg = f"{var} must be at least 1" + raise ValueError(msg) + return max_parallel + + +def create_dissolve_update_areas_dag( + *, + dag_id: str, +) -> DAG: + @dag( + dag_id=dag_id, + tags=[dag_id], + dag_display_name="Dissolve update areas", + schedule=None, + params={ + "id": Param( + "", + type="string", + format="uuid", + description=("Production area id as UUID"), + ) + }, + is_paused_upon_creation=False, + ) + def dissolve_update_areas_dag() -> None: + # Precondition: the production area must already have its job database + # provisioned and database_name set for production area by orchestrator DAG. + + @task.docker( + **config.PINTA_CONTAINER_TASK_ARGS, + max_active_tis_per_dag=_get_max_parallel_pipelines(), + # Parallel tasks merging into the same base/overview tiles can + # deadlock on the concurrent row updates; retry to ride out the loser. + retries=3, + retry_delay=datetime.timedelta(seconds=10), + ) + def dissolve_update_area( + primary_connection_uri: str, + job_connection_uri: str, + geom_wkt: str, + ) -> None: + import sqlalchemy + import sqlmodel + from pinta_processing import pipelines + + with ( + sqlmodel.Session( + sqlalchemy.create_engine(primary_connection_uri) + ) as primary_session, + sqlmodel.Session( + sqlalchemy.create_engine(job_connection_uri) + ) as job_session, + ): + pipeline = pipelines.dissolve_update_area( + primary_session=primary_session, + job_session=job_session, + geom_wkt=geom_wkt, + ) + pipeline.execute() + + primary_connection_uri = config.connection_uri_template("pinta_processing_db") + job_connection_uri = config.connection_uri_template("pinta_job_db") + + prod_area_id = "{{ params.id }}" + + status_started = set_processing_status_started( + primary_connection_uri, prod_area_id + ) + database_name = cast( + "str", get_database_name(primary_connection_uri, prod_area_id) + ) + job_db_uri = cast( + "str", + build_job_connection_uri_task( + base_uri=job_connection_uri, + database_name=database_name, + ), + ) + geom_wkt_list = find_update_area_geometries(job_db_uri) + + dissolved_areas = dissolve_update_area.partial( + primary_connection_uri=primary_connection_uri, + job_connection_uri=job_db_uri, + ).expand(geom_wkt=geom_wkt_list) + + status_completed = set_processing_status_completed( + primary_connection_uri, prod_area_id + ) + status_failed = set_processing_status_failed( + primary_connection_uri, prod_area_id + ) + + # Stamp STARTED before any work, then run the dissolve chain. + status_started >> database_name + geom_wkt_list >> dissolved_areas + + # Resolve the final status off every task that can fail (each is a direct + # upstream, so ONE_FAILED still fires when an early step fails and the + # mapped task never runs). NONE_FAILED marks COMPLETED otherwise. + processing_steps = [ + status_started, + database_name, + job_db_uri, + geom_wkt_list, + dissolved_areas, + ] + processing_steps >> status_completed + processing_steps >> status_failed + + return dissolve_update_areas_dag() + + +DAG_ID = constants.DAG_ID_DISSOLVE_UPDATE_AREAS + +globals()[DAG_ID] = create_dissolve_update_areas_dag(dag_id=DAG_ID) diff --git a/components/dags/src/pinta_dags/tasks.py b/components/dags/src/pinta_dags/tasks.py index fd71e3df..cf986af8 100644 --- a/components/dags/src/pinta_dags/tasks.py +++ b/components/dags/src/pinta_dags/tasks.py @@ -5,7 +5,7 @@ """Airflow tasks shared across Pinta DAGs.""" -from airflow.sdk import task +from airflow.sdk import TriggerRule, task from pinta_dags import config @@ -78,6 +78,22 @@ def find_production_area_tile_geometries( return [to_shape(tile.geom).wkt for tile in area_in_db.tiles] +@task.docker(**config.PINTA_CONTAINER_TASK_ARGS) +def find_update_area_geometries( + connection_uri: str, +) -> list[str]: + """Return the geometries (as WKT) of all update areas in the job database.""" + import sqlalchemy + import sqlmodel + from geoalchemy2.shape import to_shape + from pinta_db.job_db.models.user import UpdateArea + + engine = sqlalchemy.create_engine(connection_uri) + with sqlmodel.Session(engine) as session: + update_areas = session.exec(sqlmodel.select(UpdateArea)).all() + return [to_shape(area.geom).wkt for area in update_areas] + + @task def build_job_connection_uri_task( base_uri: str, @@ -142,3 +158,68 @@ def merge_dem_staging_tables( staging_tables=staging_tables, session=session, ) + + +@task.docker(**config.PINTA_CONTAINER_TASK_ARGS) +def set_processing_status_started(connection_uri: str, production_area_id: str) -> None: + """Mark the production area as processing started.""" + import sqlalchemy + import sqlmodel + from pinta_db.primary_db.models.management import ProcessingStatus, ProductionArea + + engine = sqlalchemy.create_engine(connection_uri) + with sqlmodel.Session(engine) as session: + area_in_db = session.exec( + sqlmodel.select(ProductionArea).where( + ProductionArea.id == production_area_id + ) + ).first() + if area_in_db: + area_in_db.processing_status = ProcessingStatus.STARTED + session.commit() + + +@task.docker( + **config.PINTA_CONTAINER_TASK_ARGS, + trigger_rule=TriggerRule.NONE_FAILED, +) +def set_processing_status_completed( + connection_uri: str, production_area_id: str +) -> None: + """Mark the production area as processing completed when nothing failed.""" + import sqlalchemy + import sqlmodel + from pinta_db.primary_db.models.management import ProcessingStatus, ProductionArea + + engine = sqlalchemy.create_engine(connection_uri) + with sqlmodel.Session(engine) as session: + area_in_db = session.exec( + sqlmodel.select(ProductionArea).where( + ProductionArea.id == production_area_id + ) + ).first() + if area_in_db: + area_in_db.processing_status = ProcessingStatus.COMPLETED + session.commit() + + +@task.docker( + **config.PINTA_CONTAINER_TASK_ARGS, + trigger_rule=TriggerRule.ONE_FAILED, +) +def set_processing_status_failed(connection_uri: str, production_area_id: str) -> None: + """Mark the production area as processing failed when any upstream failed.""" + import sqlalchemy + import sqlmodel + from pinta_db.primary_db.models.management import ProcessingStatus, ProductionArea + + engine = sqlalchemy.create_engine(connection_uri) + with sqlmodel.Session(engine) as session: + area_in_db = session.exec( + sqlmodel.select(ProductionArea).where( + ProductionArea.id == production_area_id + ) + ).first() + if area_in_db: + area_in_db.processing_status = ProcessingStatus.FAILURE + session.commit() diff --git a/components/dags/test_dags/dags/test_dissolve_update_areas.py b/components/dags/test_dags/dags/test_dissolve_update_areas.py new file mode 100644 index 00000000..dc174fe9 --- /dev/null +++ b/components/dags/test_dags/dags/test_dissolve_update_areas.py @@ -0,0 +1,154 @@ +# Copyright (c) 2026 National Land Survey of Finland +# (https://www.maanmittauslaitos.fi/en). +# This file is part of the Pinta. +# Licensed under the MIT License; see the repository LICENSE file. + +import uuid +from typing import TYPE_CHECKING +from unittest.mock import MagicMock + +import pytest +from airflow.models import DagBag, dagbag + +from pinta_dags.dags import dissolve_update_areas + +if TYPE_CHECKING: + from airflow.sdk import DAG + from pytest_mock import MockerFixture + + +def create_dag_to_test() -> "DAG": + dag = dissolve_update_areas.create_dissolve_update_areas_dag( + dag_id=f"some_id_{uuid.uuid4()}" + ) + + assert str(dag.dag_id).startswith("some_id") + + dag_bag = DagBag(include_examples=False) + dag_bag.bag_dag(dag) + dagbag.sync_bag_to_db(dag_bag, "mock-dags", None) + + return dag + + +@pytest.fixture(autouse=True) +def mock_airflow_settings(monkeypatch: "pytest.MonkeyPatch") -> None: + monkeypatch.setenv("AIRFLOW_CONN_PINTA_PROCESSING_DB", "postgres://mockaddr:123/db") + monkeypatch.setenv("AIRFLOW_CONN_PINTA_JOB_DB_ADMIN", "postgres://mockaddr:123/db") + monkeypatch.setenv("AIRFLOW_CONN_PINTA_JOB_DB", "postgres://mockaddr:123/db") + monkeypatch.setenv( + "AIRFLOW_VAR_PINTA_DISSOLVE_UPDATE_AREAS_MAX_PARALLEL_PIPELINES", "2" + ) + + +def test_dissolve_update_areas_all_tasks() -> None: + dag = create_dag_to_test() + + assert set(dag.task_ids) == { + "set_processing_status_started", + "get_database_name", + "build_job_connection_uri_task", + "find_update_area_geometries", + "dissolve_update_area", + "set_processing_status_completed", + "set_processing_status_failed", + } + + +def test_dependencies() -> None: + dag = create_dag_to_test() + assert dag is not None + + status_started = dag.get_task("set_processing_status_started") + get_database_name = dag.get_task("get_database_name") + build_job_connection_uri_task = dag.get_task("build_job_connection_uri_task") + find_update_area_geometries = dag.get_task("find_update_area_geometries") + dissolve_update_area = dag.get_task("dissolve_update_area") + + assert status_started.task_id in get_database_name.upstream_task_ids + assert get_database_name.task_id in build_job_connection_uri_task.upstream_task_ids + assert ( + build_job_connection_uri_task.task_id + in find_update_area_geometries.upstream_task_ids + ) + assert find_update_area_geometries.task_id in dissolve_update_area.upstream_task_ids + + +def test_processing_status_tasks() -> None: + dag = create_dag_to_test() + + status_completed = dag.get_task("set_processing_status_completed") + status_failed = dag.get_task("set_processing_status_failed") + + # Both terminal status tasks fan in from every step that can fail so the + # status is always resolved, even when an early step fails. + expected_upstream = { + "set_processing_status_started", + "get_database_name", + "build_job_connection_uri_task", + "find_update_area_geometries", + "dissolve_update_area", + } + assert expected_upstream <= status_completed.upstream_task_ids + assert expected_upstream <= status_failed.upstream_task_ids + + assert status_completed.trigger_rule == "none_failed" + assert status_failed.trigger_rule == "one_failed" + + +def test_get_max_parallel_pipelines_reads_variable() -> None: + assert dissolve_update_areas._get_max_parallel_pipelines() == 2 + + +def test_get_max_parallel_pipelines_rejects_below_one( + monkeypatch: "pytest.MonkeyPatch", +) -> None: + monkeypatch.setenv( + "AIRFLOW_VAR_PINTA_DISSOLVE_UPDATE_AREAS_MAX_PARALLEL_PIPELINES", "0" + ) + with pytest.raises(ValueError, match="must be at least 1"): + dissolve_update_areas._get_max_parallel_pipelines() + + +def test_get_max_parallel_pipelines_defaults_when_unset( + monkeypatch: "pytest.MonkeyPatch", +) -> None: + monkeypatch.delenv( + "AIRFLOW_VAR_PINTA_DISSOLVE_UPDATE_AREAS_MAX_PARALLEL_PIPELINES", + raising=False, + ) + assert dissolve_update_areas._get_max_parallel_pipelines() == 4 + + +def test_dissolve_update_area_builds_and_executes_pipeline( + mocker: "MockerFixture", +) -> None: + mocker.patch("sqlalchemy.create_engine") + mocker.patch("sqlmodel.Session") + # Inject a mock pipelines module so the task body's ``from pinta_processing + # import pipelines`` resolves to the mock instead of importing the real + # module (which would load heavy DB modules and leak into the sys.modules + # patching other DAG tests rely on). + mock_pipeline = MagicMock() + mock_pipelines_module = MagicMock() + mock_pipelines_module.dissolve_update_area.return_value = mock_pipeline + mocker.patch.dict( + "sys.modules", + {"pinta_processing.pipelines": mock_pipelines_module}, + ) + + dag = create_dag_to_test() + dissolve_update_area = dag.get_task("dissolve_update_area").python_callable + + dissolve_update_area( + primary_connection_uri="postgres://primary", + job_connection_uri="postgres://job", + geom_wkt="POLYGON ((0 0, 0 1, 1 1, 1 0, 0 0))", + ) + + mock_pipelines_module.dissolve_update_area.assert_called_once() + kwargs = mock_pipelines_module.dissolve_update_area.call_args.kwargs + assert kwargs["geom_wkt"] == "POLYGON ((0 0, 0 1, 1 1, 1 0, 0 0))" + assert "primary_session" in kwargs + assert "job_session" in kwargs + mock_pipeline.execute.assert_called_once_with() diff --git a/components/db/migrations/job/versions/010_add_dirty_flag_to_update_area.py b/components/db/migrations/job/versions/010_add_dirty_flag_to_update_area.py new file mode 100644 index 00000000..88103731 --- /dev/null +++ b/components/db/migrations/job/versions/010_add_dirty_flag_to_update_area.py @@ -0,0 +1,45 @@ +# Copyright (c) 2026 National Land Survey of Finland +# (https://www.maanmittauslaitos.fi/en). +# This file is part of the Pinta. +# Licensed under the MIT License; see the repository LICENSE file. + +"""Add dirty flag to update_area table + +Revision ID: 010 +Revises: 009 +Create Date: 2026-07-02 + +""" + +from collections.abc import Sequence + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "010" +down_revision: str | Sequence[str] | None = "009" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + +_SCHEMA = "user_data" +_TABLE = "update_area" + + +def upgrade() -> None: + """Upgrade schema.""" + op.add_column( + _TABLE, + sa.Column( + "dirty", + sa.Boolean(), + nullable=False, + server_default=sa.true(), + ), + schema=_SCHEMA, + ) + + +def downgrade() -> None: + """Downgrade schema.""" + op.drop_column(_TABLE, "dirty", schema=_SCHEMA) diff --git a/components/db/src/pinta_common/constants.py b/components/db/src/pinta_common/constants.py index 4dc3bfa7..91b1647e 100644 --- a/components/db/src/pinta_common/constants.py +++ b/components/db/src/pinta_common/constants.py @@ -10,4 +10,5 @@ DAG_ID_CALCULATE_REFERENCE_DEM = "calculate_reference_dem" DAG_ID_CALCULATE_DEM_DIFF = "calculate_dem_diff" DAG_ID_INITIALIZE_DEM_PREVIEW = "initialize_dem_preview" +DAG_ID_DISSOLVE_UPDATE_AREAS = "dissolve_update_areas" DAG_ID_CALCULATE_RASTERS_FOR_PRODUCTION_AREA = "calculate_rasters_for_production_area" diff --git a/components/db/src/pinta_db/job_db/models/user.py b/components/db/src/pinta_db/job_db/models/user.py index b7afc8f4..cf851568 100644 --- a/components/db/src/pinta_db/job_db/models/user.py +++ b/components/db/src/pinta_db/job_db/models/user.py @@ -6,7 +6,7 @@ from typing import Any from geoalchemy2 import Geometry -from sqlalchemy import Column +from sqlalchemy import Boolean, Column, true from sqlmodel import Field from pinta_common import Settings @@ -21,6 +21,10 @@ class UpdateArea(job_base.UserVectorBase, table=True): # type: ignore[call-arg] geom: Any = Field( sa_column=Column(Geometry(POLYGON, srid=Settings.DB_SRID, nullable=False)) ) + dirty: bool = Field( + default=True, + sa_column=Column(Boolean, nullable=False, server_default=true()), + ) class DemPreview(job_base.UserBase, base.RasterBase, table=True): # type: ignore[call-arg] diff --git a/components/e2e/test_e2e/test_dissolve_update_areas.py b/components/e2e/test_e2e/test_dissolve_update_areas.py new file mode 100644 index 00000000..7e9f47d9 --- /dev/null +++ b/components/e2e/test_e2e/test_dissolve_update_areas.py @@ -0,0 +1,186 @@ +# Copyright (c) 2026 National Land Survey of Finland +# (https://www.maanmittauslaitos.fi/en). +# This file is part of the Pinta. +# Licensed under the MIT License; see the repository LICENSE file. + +import typing + +import pytest +import sqlmodel +from pinta_db.primary_db.models.management import ProcessingStatus, ProductionArea +from pinta_db_test_utils import db_utils +from pinta_db_utils import database_utils, engine_utils +from pinta_e2e_utils import layers +from pinta_e2e_utils.airflow_client import AirflowClient, DagRun + +if typing.TYPE_CHECKING: + from unittest.mock import MagicMock + + from pinta_qgis_plugin.plugin import Plugin + from pytestqt.qtbot import QtBot + from sqlmodel import Session + +# The dissolve workflow only spins up a couple of short-lived task containers, +# but the docker pulls / cold starts can still be slow, so keep a generous budget. +WORKFLOW_TIMEOUT_S = 240.0 + +# Constant value the seeded reference DEM is flattened to. Chosen well outside the +# real DEM elevation range so a mean over the update area unambiguously tells the +# original DEM apart from a preview that has taken the reference values. +REFERENCE_DEM_VALUE = 1000.0 + +# Radius (m) of the update area probe polygon around the DEM centroid. Kept small +# so the primary DEM read (buffered 50 m) stays inside the seeded coverage. +UPDATE_AREA_RADIUS_M = 15 + + +def _dem_preview_mean(database_name: str, area_ewkt: str) -> float: + """Return the mean of the job DEM preview clipped to ``area_ewkt``.""" + credentials = db_utils.get_job_admin_credentials(database_name) + with engine_utils.get_autocommit_connection(credentials) as connection: + return connection.execute( + sqlmodel.text( + "SELECT (ST_SummaryStats(ST_Union(" + " ST_Clip(rast, ST_GeomFromEWKT(:area), true)" + "))).mean " + "FROM user_data.dem_preview " + "WHERE ST_Intersects(rast, ST_GeomFromEWKT(:area))" + ).bindparams(area=area_ewkt) + ).scalar_one() + + +@pytest.fixture +def dissolve_update_area_setup( + seeded_processing_dem: int, + created_db: str, + db: "Session", +) -> str: + """Provision the job database for a dissolve run without the DEM workflow. + + ``seeded_processing_dem`` populates the primary ``dem.dem`` table. This fixture + then provisions a job database the way the orchestrator DAG would, copies the + primary DEM into ``user_data.dem_preview``, seeds a distinct constant-valued + ``reference.dem`` and creates an update area polygon. Returns the update area + geometry as EWKT so the test can probe the preview inside it. + """ + production_area = db.exec(sqlmodel.select(ProductionArea)).first() + assert production_area is not None + + # The orchestrator DAG normally provisions the job db and stamps + # database_name; do the same by hand so the dissolve DAG's precondition holds. + database_name = f"job_{production_area.id}" + production_area.database_name = database_name + db.add(production_area) + db.commit() + + with engine_utils.get_autocommit_connection( + db_utils.get_job_admin_credentials("postgres") + ) as admin_connection: + database_utils.initialize_db_from_template( + admin_connection, database_name, replace_existing=True + ) + + primary = db_utils.get_primary_admin_credentials(created_db) + job = db_utils.get_job_admin_credentials(database_name) + with ( + engine_utils.get_autocommit_connection(primary) as primary_connection, + engine_utils.get_autocommit_connection(job) as job_connection, + ): + tiles = ( + primary_connection.execute(sqlmodel.text("SELECT rast::text FROM dem.dem")) + .scalars() + .all() + ) + assert tiles, "No primary DEM tiles were seeded" + for tile in tiles: + # Copy the primary DEM verbatim into the preview. + job_connection.execute( + sqlmodel.text( + "INSERT INTO user_data.dem_preview (rast) " + "VALUES (CAST(:rast AS raster))" + ).bindparams(rast=tile) + ) + # Seed the reference DEM as a flat surface at REFERENCE_DEM_VALUE, + # keeping the nodata mask, so it is clearly distinct from the preview. + job_connection.execute( + sqlmodel.text( + "INSERT INTO reference.dem (rast) " + "VALUES (ST_MapAlgebra(CAST(:rast AS raster), 1, '32BF', :value))" + ).bindparams(rast=tile, value=str(REFERENCE_DEM_VALUE)) + ) + + area_ewkt = job_connection.execute( + sqlmodel.text( + "SELECT ST_AsEWKT(ST_Buffer(ST_Centroid(ST_Union(rast::geometry)), " + ":radius)) FROM user_data.dem_preview" + ).bindparams(radius=UPDATE_AREA_RADIUS_M) + ).scalar_one() + job_connection.execute( + sqlmodel.text( + "INSERT INTO user_data.update_area (id, geom, dirty) " + "VALUES (gen_random_uuid(), ST_GeomFromEWKT(:geom), true)" + ).bindparams(geom=area_ewkt) + ) + + return area_ewkt + + +@pytest.mark.xdist_group("airflow") +def test_dissolve_update_areas_workflow( + qgis_plugin: "Plugin", + qtbot: "QtBot", + m_error_dialog: "MagicMock", + airflow_client: "AirflowClient", + db: "Session", + dissolve_update_area_setup: str, +) -> None: + # Importing here to avoid wrong DB name in environment + from pinta_qgis_plugin.api import api_client # noqa: PLC0415 + from pinta_qgis_plugin.project.groups import ( # noqa: PLC0415 + management_layer_collection, + ) + + area_ewkt = dissolve_update_area_setup + production_area = db.exec(sqlmodel.select(ProductionArea)).first() + assert production_area is not None + database_name = production_area.database_name + assert database_name is not None + + # The preview starts from the primary DEM, i.e. the real elevations, well + # below the flat reference surface it should be dissolved to. + mean_before = _dem_preview_mean(database_name, area_ewkt) + assert mean_before != pytest.approx(REFERENCE_DEM_VALUE, abs=100) + + production_area_layer = layers.get_vector_layer_by_model(ProductionArea) + assert production_area_layer.featureCount() == 1 + action = layers.find_layer_action( + production_area_layer, + management_layer_collection.ACTION_TITLE_START_DISSOLVE_UPDATE_AREAS, + ) + feature = next(production_area_layer.getFeatures()) + + client = api_client.get_api_client() + with qtbot.waitSignal(client.workflow_started, timeout=10000) as blocker: + layers.run_layer_action(production_area_layer, action, feature) + dag_id, dag_run_id = blocker.args + + m_error_dialog.assert_not_called() + dag_run = DagRun(id=dag_id, run_id=dag_run_id) + + state = airflow_client.wait_for_dag_run(dag_run, timeout=WORKFLOW_TIMEOUT_S) + assert state == "success", ( + f"DAG run finished with state={state}\n" + f"{airflow_client.describe_failed_run(dag_run)}" + ) + + # The dissolve unions the reference DEM (priority) into the preview inside the + # update area, so the preview must now match the flat reference surface. + mean_after = _dem_preview_mean(database_name, area_ewkt) + assert mean_after == pytest.approx(REFERENCE_DEM_VALUE, abs=50) + assert abs(mean_after - mean_before) > 100 + + # The workflow stamps the production area processing status COMPLETED on success. + db.expire_all() + completed_area = db.exec(sqlmodel.select(ProductionArea)).first() + assert completed_area is not None + assert completed_area.processing_status == ProcessingStatus.COMPLETED diff --git a/components/processing/src/pinta_processing/core.py b/components/processing/src/pinta_processing/core.py index aec7d707..4be4a272 100644 --- a/components/processing/src/pinta_processing/core.py +++ b/components/processing/src/pinta_processing/core.py @@ -26,6 +26,16 @@ def __post_init__(self) -> None: # Bypass frozen dataclass assignment for construction-time normalization. object.__setattr__(self, "array", self.array.astype(np.float32, copy=False)) + @property + def bounds(self) -> tuple[float, float, float, float]: + """Return the raster's map bounds as (left, bottom, right, top).""" + height, width = self.array.shape + left = self.transform.c + top = self.transform.f + right = left + self.transform.a * width + bottom = top + self.transform.e * height + return left, bottom, right, top + @staticmethod def from_rasterio(src: rasterio.DatasetReader) -> "RasterDataset": """Construct dataset from rasterio reader.""" diff --git a/components/processing/src/pinta_processing/filters/__init__.py b/components/processing/src/pinta_processing/filters/__init__.py index 3cd3a894..cd5a8f16 100644 --- a/components/processing/src/pinta_processing/filters/__init__.py +++ b/components/processing/src/pinta_processing/filters/__init__.py @@ -5,8 +5,10 @@ from pinta_processing.filters.diff import RasterDiff from pinta_processing.filters.filter import RasterFilter +from pinta_processing.filters.interpolate import RasterInterpolate from pinta_processing.filters.multiply import MultiplyValues from pinta_processing.filters.overview import DownsampleOverview +from pinta_processing.filters.union import RasterUnion from pinta_processing.filters.vectorize import VectorizeRaster __all__ = [ @@ -14,5 +16,7 @@ "MultiplyValues", "RasterDiff", "RasterFilter", + "RasterInterpolate", + "RasterUnion", "VectorizeRaster", ] diff --git a/components/processing/src/pinta_processing/filters/interpolate.py b/components/processing/src/pinta_processing/filters/interpolate.py new file mode 100644 index 00000000..181e0577 --- /dev/null +++ b/components/processing/src/pinta_processing/filters/interpolate.py @@ -0,0 +1,120 @@ +# Copyright (c) 2026 National Land Survey of Finland +# (https://www.maanmittauslaitos.fi/en). +# This file is part of the Pinta. +# Licensed under the MIT License; see the repository LICENSE file. + +import numpy as np +from rasterio import features +from scipy.interpolate import griddata +from scipy.ndimage import binary_dilation, generate_binary_structure +from shapely import wkt as shapely_wkt +from shapely.geometry.base import BaseGeometry + +from pinta_processing import core, exceptions + +_MIN_KNOWN_POINTS = 4 +_SAMPLING_MARGIN = 5 + + +class RasterInterpolate(core.Stage): + """Interpolate the raster pixels inside a polygon with cubic interpolation. + + Pixels whose centre falls inside the polygon are recomputed with SciPy's + cubic griddata interpolation from the surrounding valid pixels. The raster + must hold enough valid data around the polygon for every target pixel + to fall within the interpolation domain. + """ + + def __init__(self, wkt: str) -> None: + super().__init__() + self.wkt = wkt + self._polygon = self._parse_polygon(wkt) + + @staticmethod + def _parse_polygon(wkt: str) -> BaseGeometry: + """Parse the interpolation WKT and ensure it is a polygon.""" + try: + geometry = shapely_wkt.loads(wkt) + except Exception as error: + message = f"RasterInterpolate polygon WKT could not be parsed: {error}" + raise ValueError(message) from error + + if geometry.geom_type != "Polygon": + message = ( + f"RasterInterpolate polygon must be a Polygon, got {geometry.geom_type}" + ) + raise ValueError(message) + return geometry + + def process(self, data: core.RasterDataset) -> core.RasterDataset: + """Interpolate the pixels inside the polygon and return the raster.""" + if not isinstance(data, core.RasterDataset): + raise exceptions.InvalidStageInputError( + stage_name=RasterInterpolate.__name__, + expected_type=core.RasterDataset.__name__, + received_type=type(data).__name__, + ) + + array = data.array.astype(np.float64, copy=True) + if data.nodata is not None: + array[array == data.nodata] = np.nan + + target_mask = features.rasterize( + [(self._polygon, 1)], + out_shape=array.shape, + transform=data.transform, + fill=0, + all_touched=False, + dtype="uint8", + ).astype(bool) + + interpolated = self._interpolate(array, target_mask) + + result = array.copy() + result[target_mask] = interpolated + if data.nodata is not None: + result[np.isnan(result)] = data.nodata + + return core.RasterDataset( + array=result, + transform=data.transform, + crs=data.crs, + nodata=data.nodata, + ) + + def _interpolate(self, array: np.ndarray, target_mask: np.ndarray) -> np.ndarray: + """Cubic-interpolate the target pixels from surrounding valid data.""" + if not target_mask.any(): + message = "RasterInterpolate polygon does not cover any raster pixels" + raise ValueError(message) + + known_mask = self._sampling_mask(target_mask) & ~np.isnan(array) & ~target_mask + if int(known_mask.sum()) < _MIN_KNOWN_POINTS: + message = ( + "RasterInterpolate has too little data around the polygon to " + "interpolate" + ) + raise ValueError(message) + + rows, cols = np.indices(array.shape) + points = np.column_stack((rows[known_mask], cols[known_mask])) + values = array[known_mask] + targets = np.column_stack((rows[target_mask], cols[target_mask])) + + interpolated = griddata(points, values, targets, method="cubic") + + if np.isnan(interpolated).any(): + message = ( + "RasterInterpolate has too little data around the polygon to " + "interpolate" + ) + raise ValueError(message) + return interpolated + + @staticmethod + def _sampling_mask(target_mask: np.ndarray) -> np.ndarray: + """Return the band of pixels around the polygon used for sampling.""" + structure = generate_binary_structure(2, 2) + return binary_dilation( + target_mask, structure=structure, iterations=_SAMPLING_MARGIN + ) diff --git a/components/processing/src/pinta_processing/filters/union.py b/components/processing/src/pinta_processing/filters/union.py new file mode 100644 index 00000000..e6efbcfe --- /dev/null +++ b/components/processing/src/pinta_processing/filters/union.py @@ -0,0 +1,141 @@ +# Copyright (c) 2026 National Land Survey of Finland +# (https://www.maanmittauslaitos.fi/en). +# This file is part of the Pinta. +# Licensed under the MIT License; see the repository LICENSE file. + +import math + +import affine +import numpy as np + +from pinta_processing import core, exceptions + +# Relative tolerance for comparing pixel sizes across datasets. +_PIXEL_SIZE_RELATIVE_TOLERANCE = 1e-9 + + +class RasterUnion(core.Stage): + """Union several aligned raster datasets into one, the last dataset winning. + + Concat input rasters to a single raster covering the union of their extents. For + individual pixels last non nodata value wins. + + The inputs must already share the same CRS and pixel size; this stage neither + reprojects nor resamples and raises if they do not match. + """ + + def process(self, data: tuple[core.RasterDataset, ...]) -> core.RasterDataset: + """Merge the input rasters over their union, the last dataset winning.""" + datasets = self._validate_input(data) + self._validate_alignment(datasets) + + transform, height, width = self._build_merged_transform(datasets) + + merged = np.full((height, width), np.nan, dtype=np.float64) + for dataset in datasets: + placed = self._place_on_grid(dataset, transform, height, width) + valid = ~np.isnan(placed) + merged[valid] = placed[valid] + + nodata = next((d.nodata for d in datasets if d.nodata is not None), None) + if nodata is not None: + merged[np.isnan(merged)] = nodata + + return core.RasterDataset( + array=merged, + transform=transform, + crs=datasets[0].crs, + nodata=nodata, + ) + + @staticmethod + def _validate_input( + data: tuple[core.RasterDataset, ...], + ) -> tuple[core.RasterDataset, ...]: + """Ensure the input is a non-empty tuple of raster datasets.""" + if ( + not isinstance(data, tuple) + or len(data) == 0 + or not all(isinstance(item, core.RasterDataset) for item in data) + ): + raise exceptions.InvalidStageInputError( + stage_name=RasterUnion.__name__, + expected_type="tuple[RasterDataset, ...]", + received_type=type(data).__name__, + ) + return data + + @staticmethod + def _validate_alignment(datasets: tuple[core.RasterDataset, ...]) -> None: + """Ensure all datasets share the same CRS and pixel size.""" + reference = datasets[0] + for dataset in datasets[1:]: + if dataset.crs != reference.crs: + msg = ( + "RasterUnion inputs must share the same CRS, got " + f"{reference.crs} and {dataset.crs}" + ) + raise ValueError(msg) + if not ( + math.isclose( + dataset.transform.a, + reference.transform.a, + rel_tol=_PIXEL_SIZE_RELATIVE_TOLERANCE, + ) + and math.isclose( + dataset.transform.e, + reference.transform.e, + rel_tol=_PIXEL_SIZE_RELATIVE_TOLERANCE, + ) + ): + msg = ( + "RasterUnion inputs must share the same pixel size, got " + f"({reference.transform.a}, {reference.transform.e}) and " + f"({dataset.transform.a}, {dataset.transform.e})" + ) + raise ValueError(msg) + + @staticmethod + def _to_nan_array(dataset: core.RasterDataset) -> np.ndarray: + """Return the raster values as float64 with nodata replaced by nan.""" + array = dataset.array.astype(np.float64, copy=True) + if dataset.nodata is not None: + array[array == dataset.nodata] = np.nan + return array + + def _build_merged_transform( + self, datasets: tuple[core.RasterDataset, ...] + ) -> tuple[affine.Affine, int, int]: + """Build the transform and shape of the grid covering all datasets.""" + lefts, bottoms, rights, tops = zip( + *(dataset.bounds for dataset in datasets), strict=True + ) + left = min(lefts) + top = max(tops) + right = max(rights) + bottom = min(bottoms) + transform = affine.Affine( + datasets[0].transform.a, 0.0, left, 0.0, datasets[0].transform.e, top + ) + width = round((right - left) / transform.a) + height = round((bottom - top) / transform.e) + return transform, height, width + + def _place_on_grid( + self, + dataset: core.RasterDataset, + transform: affine.Affine, + height: int, + width: int, + ) -> np.ndarray: + """Position dataset inside given grid, filling gaps with nan.""" + col_offset = round((dataset.transform.c - transform.c) / transform.a) + row_offset = round((dataset.transform.f - transform.f) / transform.e) + grid = np.full((height, width), np.nan, dtype=np.float64) + source = self._to_nan_array(dataset) + source_height, source_width = source.shape + grid[ + row_offset : row_offset + source_height, + col_offset : col_offset + source_width, + ] = source + return grid diff --git a/components/processing/src/pinta_processing/pipelines.py b/components/processing/src/pinta_processing/pipelines.py index 25d02eca..28d88626 100644 --- a/components/processing/src/pinta_processing/pipelines.py +++ b/components/processing/src/pinta_processing/pipelines.py @@ -9,10 +9,11 @@ import numpy as np from pinta_common import Settings -from pinta_db.job_db.models import reference +from pinta_db.job_db.models import reference, user from pinta_db.primary_db.models import dem from pinta_db_utils import model_utils from pinta_db_utils.postgis import raster +from shapely import wkt as shapely_wkt from sqlmodel import Session from pinta_processing import core, filters, reader, writer @@ -20,6 +21,8 @@ from pinta_processing.utils import tm35_map_sheet_utils DEFAULT_BUFFERED = 300 +DISSOLVE_PRIMARY_DEM_BUFFER = 50 # Read dem from primary db around the update area +DISSOLVE_INTERPOLATE_AREA_BUFFER = 4 # Interpolate zone around the update area DEFAULT_LASTOOLS_PARAMS = { "buffered": DEFAULT_BUFFERED, "kill": 300, @@ -172,6 +175,57 @@ def calculate_diff_models( ) +def dissolve_update_area( + primary_session: Session, + job_session: Session, + geom_wkt: str, +) -> core.Pipeline: + """Merge the primary and reference DEM and smooth the update area seam. + + - Read primary DEM buffered by DISSOLVE_PRIMARY_DEM_BUFFER around udpate area. + - Read reference DEM buffered by DISSOLVE_INTERPOLATE_AREA_BUFFER around udpate area + - Union the DEMs, reference dem has priority. + - Interpolate DISSOLVE_INTERPOLATE_AREA_BUFFER meters wide donut outside the update + area to smooth the seam. + + The blended result is merged into dem_preview and its overviews. Overviews are + downsampled from the blended patch, so under concurrent update-area tasks a shared + overview tile may end up with slightly stale value (overview are visualization-only) + Only dem_preview needs to be eventually consistent, which the + tile-level merge guarantees. + """ + geom = shapely_wkt.loads(geom_wkt) + primary_dem_area = geom.buffer(DISSOLVE_PRIMARY_DEM_BUFFER) + reference_dem_area = geom.buffer(DISSOLVE_INTERPOLATE_AREA_BUFFER) + buffer_zone_area = geom.buffer(DISSOLVE_INTERPOLATE_AREA_BUFFER).difference(geom) + + dem_schema, dem_table = model_utils.schema_and_table(dem.Dem) + reference_schema, reference_dem_table = model_utils.schema_and_table(reference.Dem) + preview_schema, preview_table = model_utils.schema_and_table(user.DemPreview) + + return ( + reader.PostgisReader( + dem_schema, dem_table, primary_session, primary_dem_area.wkt + ) + | core.Zip( + reader.PostgisReader( + reference_schema, + reference_dem_table, + job_session, + reference_dem_area.wkt, + ) + ) + | filters.RasterUnion() + | filters.RasterInterpolate(buffer_zone_area.wkt) + | _generate_overview_stages( + preview_schema, preview_table, job_session, staging_tables=0, mode="update" + ) + | writer.RasterPostgisWriter( + preview_schema, preview_table, job_session, mode="update" + ) + ) + + def postgis_to_postgis( # noqa: PLR0913 from_session: Session, from_schema: str, @@ -200,6 +254,7 @@ def _generate_overview_stages( table_name: str, session: Session, staging_tables: int, + mode: writer.WriterMode = "insert", ) -> core.Stage: return functools.reduce( operator.or_, @@ -213,6 +268,7 @@ def _generate_overview_stages( ), session, staging_tables, + mode, ) ) for level in raster.DEFAULT_OVERVIEW_LEVELS @@ -220,13 +276,14 @@ def _generate_overview_stages( ) -def _overview_to_postgis( +def _overview_to_postgis( # noqa: PLR0913 factor: int, schema: str, table_name: str, session: Session, staging_tables: int, + mode: writer.WriterMode = "insert", ) -> core.Pipeline: return filters.DownsampleOverview(factor) | writer.RasterPostgisWriter( - schema, table_name, session, staging_tables + schema, table_name, session, staging_tables, mode=mode ) diff --git a/components/processing/src/pinta_processing/writer/__init__.py b/components/processing/src/pinta_processing/writer/__init__.py index 9335559e..7ed8ebaa 100644 --- a/components/processing/src/pinta_processing/writer/__init__.py +++ b/components/processing/src/pinta_processing/writer/__init__.py @@ -4,6 +4,15 @@ # Licensed under the MIT License; see the repository LICENSE file. from pinta_processing.writer.geotiff import GeotiffWriter -from pinta_processing.writer.postgis import RasterPostgisWriter, VectorPostgisWriter +from pinta_processing.writer.postgis import ( + RasterPostgisWriter, + VectorPostgisWriter, + WriterMode, +) -__all__ = ["GeotiffWriter", "RasterPostgisWriter", "VectorPostgisWriter"] +__all__ = [ + "GeotiffWriter", + "RasterPostgisWriter", + "VectorPostgisWriter", + "WriterMode", +] diff --git a/components/processing/src/pinta_processing/writer/postgis.py b/components/processing/src/pinta_processing/writer/postgis.py index 59622d55..72b57701 100644 --- a/components/processing/src/pinta_processing/writer/postgis.py +++ b/components/processing/src/pinta_processing/writer/postgis.py @@ -14,6 +14,7 @@ import math import struct from dataclasses import dataclass +from typing import Literal import numpy as np import sqlmodel @@ -25,6 +26,15 @@ LOGGER = logging.getLogger(__name__) +WriterMode = Literal["insert", "update"] + +# Output pixel type for merged tiles, matching the float32 DEM band. +_MERGE_PIXEL_TYPE = "32BF" + +# When updating existing tiles, TEMP table holding incoming tiles before they are +# merged into the target table. +_UPDATE_STAGING_TABLE = "_pinta_raster_update_staging" + @dataclass class DataTypeConfig: @@ -51,19 +61,24 @@ def from_numpy_dtype(numpy_dtype: str) -> "DataTypeConfig": class RasterPostgisWriter(core.Stage): """Write raster data to PostGIS table using COPY FROM stdin.""" - def __init__( + def __init__( # noqa: PLR0913 self, schema: str, table_name: str, session: sqlmodel.Session, staging_tables: int = 0, tile_size: int | None = None, + mode: WriterMode = "insert", ) -> None: super().__init__() + if mode == "update" and staging_tables != 0: + msg = "update mode requires staging_tables=0" + raise ValueError(msg) self.schema = schema self.table_name = table_name self.session = session self.staging_tables = staging_tables + self.mode = mode self.tile_size = ( tile_size if tile_size is not None else Settings.DB_DEFAULT_TILE_SIZE ) @@ -80,6 +95,9 @@ def process(self, data: core.RasterDataset | None) -> None: if data.crs is None: msg = "CRS is required for writing to PostGIS. " raise ValueError(msg) + + if self.mode == "update": + return self._update_to_postgis(data) if self.staging_tables == 0: return self._write_to_postgis(data, self.table_name) @@ -99,6 +117,82 @@ def _write_to_postgis(self, data: core.RasterDataset, table_name: str) -> None: raw_connection.commit() + def _update_to_postgis(self, data: core.RasterDataset) -> None: + """Merge raster tiles into an existing PostGIS table. + + Incoming pixels overwrite the target, target pixels are + kept where the incoming tile is nodata. Tiles with no matching row yet + are inserted. + """ + tiles = [tile for tile in self._generate_tiles(data) if _tile_has_data(tile)] + if not tiles: + LOGGER.info( + "No data to merge into table %s.%s", self.schema, self.table_name + ) + return + + raw_connection = self.session.connection().connection + nodata_value = data.nodata if data.nodata is not None else 0.0 + LOGGER.info("Merging data into table %s.%s", self.schema, self.table_name) + try: + with raw_connection.cursor() as cursor: + cursor.execute( + f"CREATE TEMP TABLE {_UPDATE_STAGING_TABLE} " + "(rast raster) ON COMMIT DROP" + ) + copy_sql = f"COPY {_UPDATE_STAGING_TABLE} (rast) FROM STDIN" + with cursor.copy(copy_sql) as copy: + for tile in tiles: + raster_bytes = self._raster_dataset_to_postgis_bytes(tile) + copy.write(raster_bytes.hex() + "\n") + + cursor.execute(self._merge_update_sql(), (nodata_value, nodata_value)) + cursor.execute(self._merge_insert_sql()) + + raw_connection.commit() + except Exception: + raw_connection.rollback() + raise + + def _merge_update_sql(self) -> str: + """SQL merging staged tiles into existing rows via ST_MapAlgebra. + + Matches rows by extent. Takes two nodata + parameters: the both-nodata fill value and the merged band's nodata. + """ + # Schema/table come from trusted model metadata, not user input. + target = f"{self.schema}.{self.table_name}" + return f""" + UPDATE {target} AS target + SET rast = ST_SetBandNoDataValue( + ST_MapAlgebra( + target.rast, staging.rast, + '[rast2]', -- both valid: incoming wins + '{_MERGE_PIXEL_TYPE}', -- output pixel type + 'FIRST', -- keep target extent + '[rast2]', -- target nodata: use incoming + '[rast1]', -- incoming nodata: keep target + %s -- both nodata: fill value + ), + %s + ) + FROM {_UPDATE_STAGING_TABLE} AS staging + WHERE target.rast::geometry = staging.rast::geometry + """ # noqa: S608 + + def _merge_insert_sql(self) -> str: + # Schema/table come from trusted model metadata, not user input. + target = f"{self.schema}.{self.table_name}" + return f""" + INSERT INTO {target} (rast) + SELECT staging.rast + FROM {_UPDATE_STAGING_TABLE} AS staging + WHERE NOT EXISTS ( + SELECT 1 FROM {target} AS target + WHERE target.rast::geometry = staging.rast::geometry + ) + """ # noqa: S608 + def _generate_tiles( self, data: core.RasterDataset, @@ -121,6 +215,10 @@ def _generate_tiles( tile_height = self.tile_size * pixel_height height, width = data.array.shape[:2] + + if height == 0 or width == 0: + return [] + nodata_value = data.nodata if data.nodata is not None else 0.0 # Raster bounds @@ -272,6 +370,16 @@ def _bits_to_int(*bits: tuple[int, int]) -> int: return int("".join(f"{value:0{size}b}" for value, size in bits), 2) +def _tile_has_data(tile: core.RasterDataset) -> bool: + """Return True if the tile has at least one non-nodata pixel.""" + nodata = tile.nodata + if nodata is None: + return True + if isinstance(nodata, float) and math.isnan(nodata): + return bool(np.any(~np.isnan(tile.array))) + return bool(np.any(tile.array != nodata)) + + class VectorPostgisWriter(core.Stage): """Write vector data to PostGIS table.""" diff --git a/components/processing/test_processing/pinta_processing/filter/test_interpolate.py b/components/processing/test_processing/pinta_processing/filter/test_interpolate.py new file mode 100644 index 00000000..072e40ea --- /dev/null +++ b/components/processing/test_processing/pinta_processing/filter/test_interpolate.py @@ -0,0 +1,153 @@ +# Copyright (c) 2026 National Land Survey of Finland +# (https://www.maanmittauslaitos.fi/en). +# This file is part of the Pinta. +# Licensed under the MIT License; see the repository LICENSE file. + +from typing import Any + +import affine +import numpy as np +import pytest +from shapely.geometry import box + +from pinta_processing import core, exceptions +from pinta_processing.filters import RasterInterpolate +from pinta_processing_test_utils import constants + +# Transform maps pixel (row, col) centre to (col + 0.5, -(row + 0.5)). +_TRANSFORM = affine.Affine(1.0, 0.0, 0.0, 0.0, -1.0, 0.0) + + +def _dataset( + array: np.ndarray, + *, + transform: affine.Affine = _TRANSFORM, + crs: str = constants.DEFAULT_CRS, + nodata: float | None = None, +) -> core.RasterDataset: + return core.RasterDataset( + array=np.asarray(array, dtype=np.float32), + transform=transform, + crs=crs, + nodata=nodata, + ) + + +def _pixel_box_wkt(row: int, col: int) -> str: + """WKT box selecting the single pixel at (row, col).""" + return box(col, -(row + 1), col + 1, -row).wkt + + +def _linear_field(rows: int, cols: int) -> np.ndarray: + """A planar field z = row + col, exactly reproducible by cubic interpolation.""" + return np.fromfunction(lambda r, c: r + c, (rows, cols), dtype=np.float64) + + +def test_interpolate_replaces_polygon_pixels_from_surrounding_data(): + field = _linear_field(7, 7) + corrupted = field.copy() + corrupted[3, 3] = 999.0 + dataset = _dataset(corrupted) + stage = RasterInterpolate(_pixel_box_wkt(3, 3)) + + result = stage.process(dataset) + + # Cubic interpolation of a planar field recovers the true value ~= 6.0. + assert np.isclose(result.array[3, 3], 6.0, atol=1e-3) + + +def test_interpolate_leaves_pixels_outside_polygon_untouched(): + field = _linear_field(7, 7) + corrupted = field.copy() + corrupted[3, 3] = 999.0 + dataset = _dataset(corrupted) + stage = RasterInterpolate(_pixel_box_wkt(3, 3)) + + result = stage.process(dataset) + + outside = np.ones((7, 7), dtype=bool) + outside[3, 3] = False + assert np.allclose(result.array[outside], corrupted[outside]) + + +def test_interpolate_preserves_metadata(): + dataset = _dataset(_linear_field(7, 7), nodata=constants.DEFAULT_NODATA) + stage = RasterInterpolate(_pixel_box_wkt(3, 3)) + + result = stage.process(dataset) + + assert result.transform == dataset.transform + assert result.crs == constants.DEFAULT_CRS + assert result.nodata == constants.DEFAULT_NODATA + + +def test_interpolate_does_not_modify_input(): + field = _linear_field(7, 7) + dataset = _dataset(field) + original = dataset.array.copy() + stage = RasterInterpolate(_pixel_box_wkt(3, 3)) + + stage.process(dataset) + + assert np.allclose(dataset.array, original) + + +@pytest.mark.parametrize( + "invalid_wkt", + [ + "POINT (1 1)", + "LINESTRING (0 0, 1 1)", + ], + ids=["point", "linestring"], +) +def test_interpolate_rejects_non_polygon(invalid_wkt: str): + with pytest.raises(ValueError, match="Polygon"): + RasterInterpolate(invalid_wkt) + + +def test_interpolate_rejects_unparseable_wkt(): + with pytest.raises(ValueError, match="could not be parsed"): + RasterInterpolate("not wkt") + + +def test_interpolate_raises_when_polygon_covers_no_pixels(): + dataset = _dataset(_linear_field(5, 5)) + stage = RasterInterpolate(box(100, 100, 101, 101).wkt) + + with pytest.raises(ValueError, match="does not cover any raster pixels"): + stage.process(dataset) + + +def test_interpolate_raises_when_data_does_not_surround_polygon(): + # Valid data only in the top rows, so a middle pixel is outside the + # interpolation domain (no data below it). + array = np.full((8, 8), constants.DEFAULT_NODATA, dtype=np.float32) + array[0:2, :] = 10.0 + dataset = _dataset(array, nodata=constants.DEFAULT_NODATA) + stage = RasterInterpolate(_pixel_box_wkt(4, 4)) + + with pytest.raises(ValueError, match="too little data around the polygon"): + stage.process(dataset) + + +def test_interpolate_raises_when_too_few_known_points(): + array = np.full((5, 5), constants.DEFAULT_NODATA, dtype=np.float32) + array[0, 0] = 1.0 + array[0, 1] = 2.0 + dataset = _dataset(array, nodata=constants.DEFAULT_NODATA) + stage = RasterInterpolate(_pixel_box_wkt(2, 2)) + + with pytest.raises(ValueError, match="too little data around the polygon"): + stage.process(dataset) + + +@pytest.mark.parametrize( + "invalid_input", + ["not a dataset", None, (_dataset(np.zeros((3, 3))),)], + ids=["string", "none", "tuple"], +) +def test_interpolate_invalid_input_raises_error(invalid_input: Any): + stage = RasterInterpolate(_pixel_box_wkt(1, 1)) + + with pytest.raises(exceptions.InvalidStageInputError): + stage.process(invalid_input) diff --git a/components/processing/test_processing/pinta_processing/filter/test_union.py b/components/processing/test_processing/pinta_processing/filter/test_union.py new file mode 100644 index 00000000..a1157b05 --- /dev/null +++ b/components/processing/test_processing/pinta_processing/filter/test_union.py @@ -0,0 +1,164 @@ +# Copyright (c) 2026 National Land Survey of Finland +# (https://www.maanmittauslaitos.fi/en). +# This file is part of the Pinta. +# Licensed under the MIT License; see the repository LICENSE file. + +from typing import Any + +import affine +import numpy as np +import pytest + +from pinta_processing import core, exceptions +from pinta_processing.filters import RasterUnion +from pinta_processing_test_utils import constants + +_TRANSFORM = affine.Affine(1.0, 0.0, 0.0, 0.0, -1.0, 0.0) + + +def _dataset( + array: np.ndarray, + *, + transform: affine.Affine = _TRANSFORM, + crs: str = constants.DEFAULT_CRS, + nodata: float | None = None, +) -> core.RasterDataset: + return core.RasterDataset( + array=np.asarray(array, dtype=np.float32), + transform=transform, + crs=crs, + nodata=nodata, + ) + + +def test_union_last_dataset_wins_on_overlap(): + first = _dataset(np.full((2, 2), 1.0)) + second = _dataset(np.full((2, 2), 2.0)) + third = _dataset(np.full((2, 2), 3.0)) + + result = RasterUnion().process((first, second, third)) + + assert np.allclose(result.array, 3.0) + + +def test_union_fills_from_earlier_where_later_is_nodata(): + first = _dataset(np.full((2, 2), 1.0)) + second_array = np.array([[2.0, constants.DEFAULT_NODATA], [2.0, 2.0]]) + second = _dataset(second_array, nodata=constants.DEFAULT_NODATA) + + result = RasterUnion().process((first, second)) + + # Later dataset wins everywhere except its nodata cell, kept from the first. + expected = np.array([[2.0, 1.0], [2.0, 2.0]]) + assert np.allclose(result.array, expected) + + +def test_union_combines_disjoint_extents(): + # Left raster covers columns 0..1, right raster columns 2..3. + left = _dataset(np.full((2, 2), 1.0)) + right_transform = affine.Affine(1.0, 0.0, 2.0, 0.0, -1.0, 0.0) + right = _dataset(np.full((2, 2), 2.0), transform=right_transform) + + result = RasterUnion().process((left, right)) + + assert result.array.shape == (2, 4) + assert result.transform == _TRANSFORM + assert np.allclose(result.array[:, :2], 1.0) + assert np.allclose(result.array[:, 2:], 2.0) + + +def test_union_extends_extent_and_overlaps(): + # Base covers columns 0..2, overlay columns 1..3, overlapping on column 1..2. + base = _dataset(np.full((2, 3), 1.0)) + overlay_transform = affine.Affine(1.0, 0.0, 1.0, 0.0, -1.0, 0.0) + overlay = _dataset(np.full((2, 3), 2.0), transform=overlay_transform) + + result = RasterUnion().process((base, overlay)) + + assert result.array.shape == (2, 4) + # Column 0 only from base, columns 1..3 from the winning overlay. + expected = np.array([1.0, 2.0, 2.0, 2.0]) + assert np.allclose(result.array, np.tile(expected, (2, 1))) + + +def test_union_fills_gaps_with_nodata(): + # Two disjoint rasters leave the opposite corners uncovered. + top_left = _dataset(np.full((2, 2), 5.0), nodata=constants.DEFAULT_NODATA) + bottom_right_transform = affine.Affine(1.0, 0.0, 2.0, 0.0, -1.0, -2.0) + bottom_right = _dataset( + np.full((2, 2), 7.0), + transform=bottom_right_transform, + nodata=constants.DEFAULT_NODATA, + ) + + result = RasterUnion().process((top_left, bottom_right)) + + assert result.array.shape == (4, 4) + assert result.nodata == constants.DEFAULT_NODATA + # Uncovered cells are filled with nodata. + assert result.array[0, 2] == constants.DEFAULT_NODATA + assert result.array[2, 0] == constants.DEFAULT_NODATA + assert result.array[0, 0] == 5.0 + assert result.array[2, 2] == 7.0 + + +def test_union_single_dataset_returns_equivalent_raster(): + only = _dataset(np.array([[1.0, 2.0], [3.0, 4.0]])) + + result = RasterUnion().process((only,)) + + assert np.allclose(result.array, only.array) + assert result.transform == only.transform + + +def test_union_preserves_crs(): + first = _dataset(np.full((2, 2), 1.0), crs="EPSG:3067") + second = _dataset(np.full((2, 2), 2.0), crs="EPSG:3067") + + result = RasterUnion().process((first, second)) + + assert result.crs == "EPSG:3067" + + +def test_union_does_not_modify_inputs(): + first = _dataset(np.full((2, 2), 1.0)) + second = _dataset(np.full((2, 2), 2.0)) + original_first = first.array.copy() + original_second = second.array.copy() + + RasterUnion().process((first, second)) + + assert np.allclose(first.array, original_first) + assert np.allclose(second.array, original_second) + + +@pytest.mark.parametrize( + "invalid_input", + [ + "not a tuple", + None, + (), + (_dataset(np.zeros((2, 2))), "not a dataset"), + ], + ids=["string", "none", "empty-tuple", "non-dataset-member"], +) +def test_union_invalid_input_raises_error(invalid_input: Any): + with pytest.raises(exceptions.InvalidStageInputError): + RasterUnion().process(invalid_input) + + +def test_union_rejects_mismatched_crs(): + first = _dataset(np.full((2, 2), 1.0), crs="EPSG:3067") + second = _dataset(np.full((2, 2), 2.0), crs="EPSG:4326") + + with pytest.raises(ValueError, match="same CRS"): + RasterUnion().process((first, second)) + + +def test_union_rejects_mismatched_pixel_size(): + first = _dataset(np.full((2, 2), 1.0)) + coarse_transform = affine.Affine(2.0, 0.0, 0.0, 0.0, -2.0, 0.0) + second = _dataset(np.full((2, 2), 2.0), transform=coarse_transform) + + with pytest.raises(ValueError, match="same pixel size"): + RasterUnion().process((first, second)) diff --git a/components/processing/test_processing/pinta_processing/test_core.py b/components/processing/test_processing/pinta_processing/test_core.py index d9719f0a..b460ecd4 100644 --- a/components/processing/test_processing/pinta_processing/test_core.py +++ b/components/processing/test_processing/pinta_processing/test_core.py @@ -5,6 +5,7 @@ import copy +import affine import numpy as np import pytest import pytest_mock @@ -81,6 +82,19 @@ def test_raster_dataset_converts_array_to_float32(): assert dataset.array.dtype == np.float32 +def test_raster_dataset_bounds(): + # 3 rows x 4 cols, 1 m pixels, origin at (10, 20) top-left. + dataset = core.RasterDataset( + array=np.zeros((3, 4), dtype=np.float32), + transform=affine.Affine(1.0, 0.0, 10.0, 0.0, -1.0, 20.0), + crs=constants.DEFAULT_CRS, + nodata=None, + ) + + # bounds are ordered left, bottom, right, top + assert dataset.bounds == (10.0, 17.0, 14.0, 20.0) + + def test_pipeline_executes_all_stages(mocker: pytest_mock.MockerFixture): pipeline = DummyStage() | DummyStage() | DummyStage() diff --git a/components/processing/test_processing/pinta_processing/test_pipelines.py b/components/processing/test_processing/pinta_processing/test_pipelines.py index 2ef77687..fcc4dbb8 100644 --- a/components/processing/test_processing/pinta_processing/test_pipelines.py +++ b/components/processing/test_processing/pinta_processing/test_pipelines.py @@ -6,7 +6,9 @@ from pathlib import Path from unittest.mock import MagicMock +from pinta_db_utils.postgis import raster from pytest_mock import MockerFixture +from shapely import wkt as shapely_wkt from pinta_processing import pipelines @@ -66,3 +68,114 @@ def test_blast2dem_to_postgis_override_extra_param_defaults( "ll": [111, 222], "neighbors": ["a.laz", "b.laz", "c.laz"], } + + +def test_dissolve_update_area_unions_and_interpolates_donut( + mocker: MockerFixture, +) -> None: + postgis_reader = mocker.patch( + "pinta_processing.reader.PostgisReader", + return_value=MagicMock(), + ) + union = mocker.patch( + "pinta_processing.filters.RasterUnion", + return_value=MagicMock(), + ) + interpolate = mocker.patch( + "pinta_processing.filters.RasterInterpolate", + return_value=MagicMock(), + ) + downsample = mocker.patch( + "pinta_processing.filters.DownsampleOverview", + return_value=MagicMock(), + ) + postgis_writer = mocker.patch( + "pinta_processing.writer.RasterPostgisWriter", + return_value=MagicMock(), + ) + primary_session = MagicMock() + job_session = MagicMock() + geom_wkt = "POLYGON ((0 0, 0 10, 10 10, 10 0, 0 0))" + geom = shapely_wkt.loads(geom_wkt) + + pipelines.dissolve_update_area( + primary_session=primary_session, + job_session=job_session, + geom_wkt=geom_wkt, + ) + + # Two readers: primary DEM (50 m buffer) and reference DEM (4 m buffer). + assert postgis_reader.call_count == 2 + primary_call, reference_call = postgis_reader.call_args_list + assert primary_call.args[2] is primary_session + assert reference_call.args[2] is job_session + + primary_wkt = shapely_wkt.loads(primary_call.args[3]) + reference_wkt = shapely_wkt.loads(reference_call.args[3]) + _assert_geometries_match( + primary_wkt, geom.buffer(pipelines.DISSOLVE_PRIMARY_DEM_BUFFER) + ) + _assert_geometries_match( + reference_wkt, geom.buffer(pipelines.DISSOLVE_INTERPOLATE_AREA_BUFFER) + ) + + # The two DEMs are unioned before the seam is interpolated. + union.assert_called_once_with() + + # The interpolate stage receives the donut ring between 4 m buffer and geom. + donut = shapely_wkt.loads(interpolate.call_args.args[0]) + _assert_geometries_match( + donut, geom.buffer(pipelines.DISSOLVE_INTERPOLATE_AREA_BUFFER).difference(geom) + ) + # The donut has a hole (the update area) cut out of it. + assert len(donut.interiors) == 1 + + # The blended patch is merged into dem_preview and its overviews: one base + # writer plus one writer per overview level, all in update mode. + levels = raster.DEFAULT_OVERVIEW_LEVELS + assert downsample.call_count == len(levels) + assert postgis_writer.call_count == 1 + len(levels) + assert all( + call.kwargs["mode"] == "update" for call in postgis_writer.call_args_list + ) + + +def test_postgis_to_postgis( + mocker: MockerFixture, +) -> None: + postgis_reader = mocker.patch( + "pinta_processing.reader.PostgisReader", + return_value=MagicMock(), + ) + postgis_writer = mocker.patch( + "pinta_processing.writer.RasterPostgisWriter", + return_value=MagicMock(), + ) + from_session = MagicMock() + to_session = MagicMock() + + pipelines.postgis_to_postgis( + from_session=from_session, + from_schema="dem", + from_table="dem", + to_session=to_session, + to_schema="user_data", + to_table="dem_preview", + tile_wkt="POINT (0 0)", + staging_tables=2, + ) + + # Reads the source table with the source session and tile geometry. + postgis_reader.assert_called_once_with("dem", "dem", from_session, "POINT (0 0)") + + # The final writer targets the destination table/session with staging tables. + # (Overview writers also use RasterPostgisWriter, so assert on the last call.) + assert ( + mocker.call("user_data", "dem_preview", to_session, 2) + in postgis_writer.call_args_list + ) + + +def _assert_geometries_match(actual: object, expected: object) -> None: + # WKT round-tripping perturbs vertices slightly, so compare with tolerance. + assert actual.symmetric_difference(expected).area < 1e-6 # type: ignore[attr-defined] diff --git a/components/processing/test_processing/pinta_processing/writer/test_postgis.py b/components/processing/test_processing/pinta_processing/writer/test_postgis.py index 6ba92a94..b6b9437d 100644 --- a/components/processing/test_processing/pinta_processing/writer/test_postgis.py +++ b/components/processing/test_processing/pinta_processing/writer/test_postgis.py @@ -11,6 +11,7 @@ from shapely.geometry import Point from pinta_processing import core, exceptions +from pinta_processing.filters import DownsampleOverview from pinta_processing.writer import RasterPostgisWriter, VectorPostgisWriter @@ -156,6 +157,118 @@ def test_resolve_partition_different_locations(dataset: core.RasterDataset): ) +def _mock_update_session(mocker: MockerFixture) -> tuple: + """Session whose raw connection exposes cursor/copy as context managers.""" + session = mocker.MagicMock() + raw_connection = session.connection.return_value.connection + cursor = raw_connection.cursor.return_value.__enter__.return_value + cursor.copy.return_value.__enter__.return_value = mocker.MagicMock() + return session, raw_connection, cursor + + +def test_generate_tiles_returns_empty_for_zero_sized_raster(): + """A degenerate raster (zero-sized dimension) yields no tiles.""" + empty = core.RasterDataset( + array=np.empty((0, 0), dtype=np.float32), + transform=Affine.identity(), + crs="EPSG:3067", + nodata=-9999.0, + ) + stage = RasterPostgisWriter("foo", "bar", None, tile_size=4) # type: ignore[arg-type] + + assert stage._generate_tiles(empty) == [] + + +def test_update_mode_skips_raster_downsampled_below_one_pixel( + dataset: core.RasterDataset, mocker: MockerFixture +): + """A small patch downsampled past its own size must not crash the writer. + + Reproduces the overview edge case: a tiny update-area patch downsampled for a + high overview level (factor 128) collapses to a 0x0 raster, which previously + raised a rasterio WindowError while tiling. + """ + session, raw_connection, cursor = _mock_update_session(mocker) + downsampled = DownsampleOverview(factor=128).process(dataset) + assert downsampled is not None + assert downsampled.array.size == 0 # 2x2 // 128 -> 0x0 + + stage = RasterPostgisWriter( + "myschema", "mytable", session, staging_tables=0, mode="update" + ) + stage.process(downsampled) + + cursor.execute.assert_not_called() + raw_connection.commit.assert_not_called() + raw_connection.rollback.assert_not_called() + + +def test_update_mode_rejects_staging_tables(): + """Update mode is only supported with staging_tables=0.""" + with pytest.raises(ValueError, match="staging_tables"): + RasterPostgisWriter("foo", "bar", None, staging_tables=1, mode="update") # type: ignore[arg-type] + + +def test_update_mode_merges_and_commits( + dataset: core.RasterDataset, mocker: MockerFixture +): + """Update mode stages tiles, runs merge + insert, and commits once.""" + session, raw_connection, cursor = _mock_update_session(mocker) + + stage = RasterPostgisWriter( + "myschema", "mytable", session, staging_tables=0, tile_size=4, mode="update" + ) + stage.process(dataset) + + executed_sql = " ".join(str(call.args[0]) for call in cursor.execute.call_args_list) + assert "CREATE TEMP TABLE" in executed_sql + assert "ST_MapAlgebra" in executed_sql + assert "INSERT INTO myschema.mytable" in executed_sql + + raw_connection.commit.assert_called_once() + raw_connection.rollback.assert_not_called() + + +def test_update_mode_skips_all_nodata_data( + dataset: core.RasterDataset, mocker: MockerFixture +): + """A fully nodata input stages nothing and merges nothing.""" + session, raw_connection, cursor = _mock_update_session(mocker) + nodata_dataset = core.RasterDataset( + array=np.full_like(dataset.array, dataset.nodata), + transform=dataset.transform, + crs=dataset.crs, + nodata=dataset.nodata, + ) + + stage = RasterPostgisWriter( + "myschema", "mytable", session, staging_tables=0, tile_size=4, mode="update" + ) + stage.process(nodata_dataset) + + cursor.execute.assert_not_called() + raw_connection.commit.assert_not_called() + raw_connection.rollback.assert_not_called() + + +def test_update_mode_rolls_back_on_failure( + dataset: core.RasterDataset, mocker: MockerFixture +): + """A failure mid-write rolls back so no half-updated rows remain.""" + session, raw_connection, cursor = _mock_update_session(mocker) + cursor.execute.side_effect = [None, RuntimeError("merge failed")] + + stage = RasterPostgisWriter( + "myschema", "mytable", session, staging_tables=0, tile_size=4, mode="update" + ) + + with pytest.raises(RuntimeError, match="merge failed"): + stage.process(dataset) + + raw_connection.rollback.assert_called_once() + raw_connection.commit.assert_not_called() + + def test_vector_postgis_writer_writes_to_postgis( vector_dataset: core.VectorDataset, mocker: MockerFixture ): diff --git a/components/qgis_plugin/src/pinta_qgis_plugin/api/api_client.py b/components/qgis_plugin/src/pinta_qgis_plugin/api/api_client.py index 59c40ab5..1e69b8c2 100644 --- a/components/qgis_plugin/src/pinta_qgis_plugin/api/api_client.py +++ b/components/qgis_plugin/src/pinta_qgis_plugin/api/api_client.py @@ -106,6 +106,19 @@ def start_reference_dem_workflow(self, production_area_id: str) -> None: tr("Reference DEM workflow task created successfully"), success=True ) + @handle_api_errors(ApiEndpoint.workflows) + def start_dissolve_update_areas_workflow(self, production_area_id: str) -> None: + """Starts a dissolve update areas workflow for the given production area.""" + self._start_workflow( + constants.DAG_ID_DISSOLVE_UPDATE_AREAS, + {"id": production_area_id}, + production_area_id=production_area_id, + ) + MsgBar.info( + tr("Dissolve update areas workflow task created successfully"), + success=True, + ) + def _start_workflow( self, dag_tag: str, diff --git a/components/qgis_plugin/src/pinta_qgis_plugin/layers/config.py b/components/qgis_plugin/src/pinta_qgis_plugin/layers/config.py index cf5be0bc..d274c580 100644 --- a/components/qgis_plugin/src/pinta_qgis_plugin/layers/config.py +++ b/components/qgis_plugin/src/pinta_qgis_plugin/layers/config.py @@ -74,6 +74,9 @@ class VectorLayerConfig(DatabaseLayerConfig): ) read_only: bool = False read_only_fields: list[str] = dataclasses.field(default_factory=list) + # Map of field name -> QGIS expression evaluated to the field's default value + # when a new feature is created (e.g. {"id": "uuid('WithoutBraces')"}). + default_expressions: dict[str, str] = dataclasses.field(default_factory=dict) subset_string: str | None = None @@ -97,6 +100,7 @@ def create( # noqa: PLR0913 style_path: Path | None = None, read_only: bool = False, # noqa: FBT001, FBT002 read_only_fields: list[str] | None = None, + default_expressions: dict[str, str] | None = None, value_maps: list[ValueMapConfig] | None = None, visible_initially: bool = True, # noqa: FBT001, FBT002 ) -> "ModelLayerConfig": @@ -122,6 +126,7 @@ def create( # noqa: PLR0913 read_only_fields=[key_column] if read_only_fields is None else read_only_fields, + default_expressions=default_expressions or {}, value_maps=value_maps, visible_initially=visible_initially, ) diff --git a/components/qgis_plugin/src/pinta_qgis_plugin/layers/utils.py b/components/qgis_plugin/src/pinta_qgis_plugin/layers/utils.py index 1e17255f..52ac4491 100644 --- a/components/qgis_plugin/src/pinta_qgis_plugin/layers/utils.py +++ b/components/qgis_plugin/src/pinta_qgis_plugin/layers/utils.py @@ -16,7 +16,12 @@ # You should have received a copy of the GNU General Public License # along with Pinta QGIS Plugin. If not, see . -from qgis.core import QgsEditorWidgetSetup, QgsMapLayer, QgsVectorLayer +from qgis.core import ( + QgsDefaultValue, + QgsEditorWidgetSetup, + QgsMapLayer, + QgsVectorLayer, +) from pinta_qgis_plugin.layers.config import BaseLayerConfig @@ -35,6 +40,24 @@ def set_read_only_fields(layer: QgsVectorLayer, field_names: list[str]) -> None: layer.setEditFormConfig(form_config) +def set_default_value_expressions( + layer: QgsVectorLayer, default_expressions: dict[str, str] +) -> None: + """Set QGIS default value expressions for the given fields. + + Each expression is evaluated when a new feature is created, e.g. to generate + a UUID for a not-null primary key that has no database-side default. + """ + if not default_expressions: + return + + fields = layer.fields() + for field_name, expression in default_expressions.items(): + field_index = fields.lookupField(field_name) + if field_index >= 0: + layer.setDefaultValueDefinition(field_index, QgsDefaultValue(expression)) + + def set_field_aliases(layer: QgsMapLayer, aliases: dict[str, str]) -> None: """Set field aliases for layer types that expose QGIS field alias APIs.""" if ( diff --git a/components/qgis_plugin/src/pinta_qgis_plugin/layers/vector_layer.py b/components/qgis_plugin/src/pinta_qgis_plugin/layers/vector_layer.py index 6ee6cab5..7c96406a 100644 --- a/components/qgis_plugin/src/pinta_qgis_plugin/layers/vector_layer.py +++ b/components/qgis_plugin/src/pinta_qgis_plugin/layers/vector_layer.py @@ -59,6 +59,7 @@ def create_vector_layer( layer.setReadOnly(config.read_only) utils.set_field_aliases(layer, config.aliases) utils.set_read_only_fields(layer, [LAYER_ID_COLUMN, *config.read_only_fields]) + utils.set_default_value_expressions(layer, config.default_expressions) if config.style_path is not None: styles.apply_style(layer, config.style_path) diff --git a/components/qgis_plugin/src/pinta_qgis_plugin/project/config/job_layers.py b/components/qgis_plugin/src/pinta_qgis_plugin/project/config/job_layers.py index a2d6ff4e..42411e58 100644 --- a/components/qgis_plugin/src/pinta_qgis_plugin/project/config/job_layers.py +++ b/components/qgis_plugin/src/pinta_qgis_plugin/project/config/job_layers.py @@ -103,6 +103,11 @@ aliases={ **config.COMMON_ALIASES, "elevation": tr("Elevation"), + "dirty": tr("Dirty"), }, + read_only_fields=["dirty"], + # The id primary key has no database-side default, so generate a UUID + # client-side when a new update area is digitised. + default_expressions={"id": "uuid('WithoutBraces')"}, ), ] diff --git a/components/qgis_plugin/src/pinta_qgis_plugin/project/groups/management_layer_collection.py b/components/qgis_plugin/src/pinta_qgis_plugin/project/groups/management_layer_collection.py index aa241267..5d5bfde9 100644 --- a/components/qgis_plugin/src/pinta_qgis_plugin/project/groups/management_layer_collection.py +++ b/components/qgis_plugin/src/pinta_qgis_plugin/project/groups/management_layer_collection.py @@ -36,6 +36,7 @@ ACTION_TITLE_OPEN_PRODUCTION_AREA_LAYERS = tr("Open production area") ACTION_TITLE_START_REFERENCE_DEM_WORKFLOW = tr("Start reference DEM workflow") +ACTION_TITLE_START_DISSOLVE_UPDATE_AREAS = tr("Dissolve update areas") class ManagementLayerCollection(BaseLayerCollection): @@ -53,6 +54,7 @@ def _add_to_project(self) -> None: if layer_config.layer_id == "production_area": _add_open_production_area_layers_action(layer) _add_start_reference_dem_workflow_action(layer) + _add_start_dissolve_update_areas_action(layer) def _add_open_production_area_layers_action(layer: QgsVectorLayer) -> None: @@ -86,3 +88,18 @@ def _add_start_reference_dem_workflow_action(layer: QgsVectorLayer) -> None: short_title=ACTION_TITLE_START_REFERENCE_DEM_WORKFLOW, command=command, ) + + +def _add_start_dissolve_update_areas_action(layer: QgsVectorLayer) -> None: + """Add start dissolve update areas action to the layer.""" + command = textwrap.dedent(""" + from pinta_qgis_plugin.workflows import update_area + job_id = \'[%id%]\' + update_area.start_dissolve_update_areas_workflow(job_id) + """) + layer_utils.add_action_to_vector_layer( + layer, + description=tr("Start dissolve update areas workflow for production area"), + short_title=ACTION_TITLE_START_DISSOLVE_UPDATE_AREAS, + command=command, + ) diff --git a/components/qgis_plugin/src/pinta_qgis_plugin/resources/i18n/fi.qm b/components/qgis_plugin/src/pinta_qgis_plugin/resources/i18n/fi.qm index a6dc05bf..d5f9d58a 100644 Binary files a/components/qgis_plugin/src/pinta_qgis_plugin/resources/i18n/fi.qm and b/components/qgis_plugin/src/pinta_qgis_plugin/resources/i18n/fi.qm differ diff --git a/components/qgis_plugin/src/pinta_qgis_plugin/resources/i18n/fi.ts b/components/qgis_plugin/src/pinta_qgis_plugin/resources/i18n/fi.ts index c37d2b4f..bdea0a2d 100644 --- a/components/qgis_plugin/src/pinta_qgis_plugin/resources/i18n/fi.ts +++ b/components/qgis_plugin/src/pinta_qgis_plugin/resources/i18n/fi.ts @@ -14,7 +14,7 @@ Tunniste - + Add production area related layers to map Lisää tuotantoalueen tasot karttalle @@ -139,17 +139,17 @@ Hallitsematon virhe tapahtui - + Could not start workflow Laskennan käynnistäminen epäonnistui - + Check log for more details Tarkista loki lisätietojen varalta - + Start reference DEM workflow for production area Käynnistä vertausmallin työnkulku tuotantoalueelle @@ -159,7 +159,7 @@ Käynnistä vertausmallin työnkulku - + Reference DEM workflow task created successfully Vertausmallin laskennan työnkulku käynnistettiin onnistuneesti @@ -208,5 +208,25 @@ DEM preview Tuloskorkeusmalli + + + Dissolve update areas workflow task created successfully + Tuloskorkeusmallin päivityksen työnkulku käynnistettiin onnistuneesti + + + + Dissolve update areas + Päivitä tuloskorkeusmalli + + + + Start dissolve update areas workflow for production area + Käynnistä tuloskorkeusmallin päivityksen työnkulku tuotantoalueelle + + + + Dirty + Päivittämättä + diff --git a/components/qgis_plugin/src/pinta_qgis_plugin/workflows/update_area.py b/components/qgis_plugin/src/pinta_qgis_plugin/workflows/update_area.py new file mode 100644 index 00000000..dfcca39a --- /dev/null +++ b/components/qgis_plugin/src/pinta_qgis_plugin/workflows/update_area.py @@ -0,0 +1,26 @@ +# Copyright (C) 2026 Pinta QGIS Plugin Contributors. +# +# +# This file is part of Pinta QGIS Plugin. +# +# Pinta QGIS Plugin is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 2 of the License, or +# (at your option) any later version. +# +# Pinta QGIS Plugin is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Pinta QGIS Plugin. If not, see . + +from pinta_qgis_plugin.api import api_client +from pinta_qgis_plugin.utils import messages + + +@messages.popup_if_fails +def start_dissolve_update_areas_workflow(production_area_id: str) -> None: + """Starts a dissolve update areas workflow for the given production area.""" + api_client.get_api_client().start_dissolve_update_areas_workflow(production_area_id) diff --git a/components/qgis_plugin/test_qgis/layers/collections/test_management_layer_collection.py b/components/qgis_plugin/test_qgis/layers/collections/test_management_layer_collection.py index 91274575..a98bc716 100644 --- a/components/qgis_plugin/test_qgis/layers/collections/test_management_layer_collection.py +++ b/components/qgis_plugin/test_qgis/layers/collections/test_management_layer_collection.py @@ -93,11 +93,17 @@ def test_add_to_project_adds_actions_to_production_area_layer( "_add_start_reference_dem_workflow_action", autospec=True, ) + mock_add_dissolve_action = mocker.patch.object( + management_layer_collection, + "_add_start_dissolve_update_areas_action", + autospec=True, + ) layer_collection.add_to_project() mock_add_open_action.assert_called_once_with(production_area_layer) mock_add_dem_update_action.assert_called_once_with(production_area_layer) + mock_add_dissolve_action.assert_called_once_with(production_area_layer) def test_add_open_production_area_layers_delegates_to_layer_utils( @@ -151,6 +157,32 @@ def test_add_start_reference_dem_processing_action_delegates_to_layer_utils( assert "[%id%]" in command +def test_add_start_dissolve_update_areas_action_delegates_to_layer_utils( + mocker: MockerFixture, +): + layer = mocker.MagicMock() + mock_add_action = mocker.patch.object( + management_layer_collection.layer_utils, + "add_action_to_vector_layer", + autospec=True, + ) + + management_layer_collection._add_start_dissolve_update_areas_action(layer) + + mock_add_action.assert_called_once() + call = mock_add_action.call_args + assert call.args == (layer,) + assert ( + call.kwargs["description"] + == "Start dissolve update areas workflow for production area" + ) + assert call.kwargs["short_title"] == "Dissolve update areas" + command = call.kwargs["command"] + assert "from pinta_qgis_plugin.workflows import update_area" in command + assert "update_area.start_dissolve_update_areas_workflow(job_id)" in command + assert "[%id%]" in command + + def test_remove_layers_removes_all_layers( production_area_layer: QgsVectorLayer, layer_collection: management_layer_collection.ManagementLayerCollection, diff --git a/components/qgis_plugin/test_qgis/layers/test_vector_layer.py b/components/qgis_plugin/test_qgis/layers/test_vector_layer.py index 1abdfcb9..28d94991 100644 --- a/components/qgis_plugin/test_qgis/layers/test_vector_layer.py +++ b/components/qgis_plugin/test_qgis/layers/test_vector_layer.py @@ -175,6 +175,34 @@ def test_create_layer_sets_value_maps(mocker: MockerFixture, mock_uri: MagicMock } +def test_create_layer_sets_default_value_expressions( + mocker: MockerFixture, mock_uri: MagicMock +): + layer = QgsVectorLayer( + "MultiPolygon?field=id:string&field=name:string", "", "memory" + ) + mocker.patch.object( + vector_layer, + "_create_qgs_vector_layer", + autospec=True, + return_value=layer, + ) + layer_config = config.VectorLayerConfig( + schema="user_data", + table_name="update_area", + layer_name="Update area", + layer_id="update_area", + key_column="id", + wkb_type=management_layers.PRODUCTION_AREA.wkb_type, + default_expressions={"id": "uuid('WithoutBraces')"}, + ) + + vector_layer.create_vector_layer(layer_config, PROVIDER, mock_uri) + + default_value = layer.defaultValueDefinition(layer.fields().lookupField("id")) + assert default_value.expression() == "uuid('WithoutBraces')" + + def test_create_layer_with_invalid_layer_raises_exception( mocker: MockerFixture, ): diff --git a/components/qgis_plugin/test_qgis/processing/test_api_client.py b/components/qgis_plugin/test_qgis/processing/test_api_client.py index 702eb18d..ffad406c 100644 --- a/components/qgis_plugin/test_qgis/processing/test_api_client.py +++ b/components/qgis_plugin/test_qgis/processing/test_api_client.py @@ -197,6 +197,42 @@ def test_start_reference_dem_workflow_http_error_without_response_uses_fallback_ assert exc_info.value.bar_msg["details"] == "Check log for more details" +def test_start_dissolve_update_areas_workflow_posts_workflow_payload( + client: api_client.PintaAPIClient, + mock_post: MagicMock, + mocker: MockerFixture, +) -> None: + mocker.patch.object(api_client, "MsgBar", autospec=True) + + client.start_dissolve_update_areas_workflow("area-1") + + expected_url = ( + f"http://example.test/workflows/{constants.DAG_ID_DISSOLVE_UPDATE_AREAS}" + ) + mock_post.assert_called_once_with( + expected_url, + json={ + "parameters": {"id": "area-1"}, + "production_area_id": "area-1", + }, + timeout=10, + ) + mock_post.return_value.raise_for_status.assert_called_once_with() + + +def test_start_dissolve_update_areas_workflow_shows_success_message( + client: api_client.PintaAPIClient, + mock_post: MagicMock, + mocker: MockerFixture, +) -> None: + mock_msg_bar = mocker.patch.object(api_client, "MsgBar", autospec=True) + + client.start_dissolve_update_areas_workflow("area-1") + + mock_msg_bar.info.assert_called_once() + assert mock_msg_bar.info.call_args.kwargs == {"success": True} + + def test_get_api_client_is_cached() -> None: api_client.get_api_client.cache_clear() try: diff --git a/whitelist.txt b/whitelist.txt index 4835c47d..62d918d7 100644 --- a/whitelist.txt +++ b/whitelist.txt @@ -181,3 +181,8 @@ pyqt rasters lte gt +datasets +griddata +rasterize +isclose +tol