From 32c417a7b124a07a6d2ceb83f702a0565c14f212 Mon Sep 17 00:00:00 2001 From: sijandh35 Date: Tue, 21 Apr 2026 06:25:29 +0000 Subject: [PATCH 1/2] [Fixes #14153] Validation of Uploads --- geonode/documents/api/tests.py | 39 +++++++++ geonode/documents/api/views.py | 27 +++--- geonode/documents/enumerations.py | 30 +++++++ geonode/documents/forms.py | 8 +- geonode/documents/tests.py | 13 +++ geonode/upload/validators.py | 138 ++++++++++++++++++++++++++++++ pyproject.toml | 2 + 7 files changed, 242 insertions(+), 15 deletions(-) create mode 100644 geonode/upload/validators.py diff --git a/geonode/documents/api/tests.py b/geonode/documents/api/tests.py index 42f677a189e..733b945543b 100644 --- a/geonode/documents/api/tests.py +++ b/geonode/documents/api/tests.py @@ -17,6 +17,8 @@ # ######################################################################### import logging +from tempfile import NamedTemporaryFile +from unittest.mock import patch from django.contrib.auth import get_user_model from urllib.parse import urljoin @@ -136,6 +138,43 @@ def test_creation_should_rase_exec_for_unsupported_files(self): self.assertEqual(400, actual.status_code) self.assertDictEqual(expected, actual.json()) + @patch("geonode.upload.validators.FileValidator._detect_mime", return_value="application/x-dosexec") + def test_creation_rejects_file_with_mismatched_content_type(self, _mock_detect_mime): + self.client.force_login(self.admin) + with NamedTemporaryFile(suffix=".pdf") as test_file: + test_file.write(b"MZ executable content") + test_file.flush() + payload = { + "document": { + "title": "Fake PDF document", + "metadata_only": True, + "file_path": test_file.name, + } + } + actual = self.client.post(self.url, data=payload, format="json") + + self.assertEqual(400, actual.status_code) + self.assertEqual("document_exception", actual.json()["code"]) + self.assertEqual( + ["The uploaded file content does not match the expected file type for .pdf files."], + actual.json()["errors"], + ) + + @patch("geonode.upload.validators.FileValidator._get_max_size", return_value=1) + def test_creation_rejects_file_exceeding_document_size_limit(self, _mock_get_max_size): + self.client.force_login(self.admin) + payload = { + "document": { + "title": "Oversized document", + "metadata_only": True, + "file_path": self.valid_file_path, + } + } + actual = self.client.post(self.url, data=payload, format="json") + + self.assertEqual(400, actual.status_code) + self.assertEqual("total_upload_size_exceeded", actual.json()["code"]) + def test_document_listing_advertised(self): document = Document.objects.first() document.advertised = False diff --git a/geonode/documents/api/views.py b/geonode/documents/api/views.py index 5d6d139f28f..ba51bcb4a77 100644 --- a/geonode/documents/api/views.py +++ b/geonode/documents/api/views.py @@ -17,15 +17,18 @@ # ######################################################################### -from drf_spectacular.utils import extend_schema +import logging from pathlib import Path from uuid import uuid4 + +from django.conf import settings +from django.core.exceptions import ValidationError +from drf_spectacular.utils import extend_schema from dynamic_rest.viewsets import DynamicModelViewSet from dynamic_rest.filters import DynamicFilterBackend, DynamicSortingFilter from rest_framework.decorators import action from rest_framework.permissions import IsAuthenticatedOrReadOnly -from geonode import settings from geonode.base.api.filters import DynamicSearchFilter, ExtentFilter, AdvertisedFilter from geonode.base.api.pagination import GeoNodeApiPagination @@ -37,12 +40,11 @@ from geonode.documents.models import Document from geonode.metadata.multilang.views import MultiLangViewMixin from geonode.resource.registry import document_manager +from geonode.upload.validators import FileValidator from .serializers import DocumentSerializer from .permissions import DocumentPermissionsFilter -import logging - logger = logging.getLogger(__name__) @@ -102,12 +104,17 @@ def perform_create(self, serializer): if file and doc_url: raise DocumentException(detail="Either a file or a URL must be specified, not both") - if not extension and file: - filename = file if isinstance(file, str) else file.name - extension = Path(filename).suffix.replace(".", "") - - if extension not in settings.ALLOWED_DOCUMENT_TYPES: - raise DocumentException("The file provided is not in the supported extensions list") + if file: + try: + file_validator = FileValidator(file, context="document") + file_validator.validate() + except ValidationError as exc: + raise DocumentException(detail=exc.messages[0]) + extension = file_validator.extension + else: + extension = (extension or Path(doc_url).suffix.replace(".", "")).lower() + if extension not in settings.ALLOWED_DOCUMENT_TYPES: + raise DocumentException("The file provided is not in the supported extensions list") try: request_user = self.request.user diff --git a/geonode/documents/enumerations.py b/geonode/documents/enumerations.py index d462770f5ab..ddf33c858de 100644 --- a/geonode/documents/enumerations.py +++ b/geonode/documents/enumerations.py @@ -124,3 +124,33 @@ "flv": "video/flv", "vdo": "video/vdo", } + +DOCUMENT_MAGIC_MIMETYPE_MAP = {extension: {mime_type} for extension, mime_type in DOCUMENT_MIMETYPE_MAP.items()} + +DOCUMENT_MAGIC_MIMETYPE_MAP.update( + { + "log": {"text/plain", "text/csv"}, + "xml": {"application/xml", "text/xml"}, + "sld": {"text/plain", "application/xml", "text/xml"}, + "qml": {"text/plain", "application/xml", "text/xml"}, + "doc": {"application/msword", "application/x-ole-storage"}, + "ods": {"text/plain", "application/vnd.oasis.opendocument.spreadsheet"}, + "odt": {"text/plain", "application/vnd.oasis.opendocument.text"}, + "xls": {"application/excel", "application/vnd.ms-excel", "application/x-ole-storage"}, + "ppt": {"application/powerpoint", "application/vnd.ms-powerpoint", "application/x-ole-storage"}, + "docx": { + "application/msword", + "application/vnd.openxmlformats-officedocument.wordprocessingml.document", + }, + "xlsx": { + "application/vnd.ms-excel", + "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", + }, + "pptx": { + "application/x-mspowerpoint", + "application/vnd.openxmlformats-officedocument.presentationml.presentation", + }, + "odp": {"application/odp", "application/vnd.oasis.opendocument.presentation"}, + "csv": {"text/plain", "text/csv"}, + } +) diff --git a/geonode/documents/forms.py b/geonode/documents/forms.py index 62b27562638..16c0dd0d82a 100644 --- a/geonode/documents/forms.py +++ b/geonode/documents/forms.py @@ -17,12 +17,10 @@ # ######################################################################### -import os import json import logging from django import forms -from django.conf import settings from django.forms import HiddenInput from django.utils.translation import gettext_lazy as _ from django.template.defaultfilters import filesizeformat @@ -30,6 +28,7 @@ from geonode.documents.models import Document from geonode.upload.models import UploadSizeLimit from geonode.upload.api.exceptions import FileUploadLimitException +from geonode.upload.validators import FileValidator logger = logging.getLogger(__name__) @@ -137,8 +136,7 @@ def clean_doc_file(self): """ doc_file = self.cleaned_data.get("doc_file") - if doc_file and not os.path.splitext(doc_file.name)[1].lower()[1:] in settings.ALLOWED_DOCUMENT_TYPES: - logger.debug("This file type is not allowed") - raise forms.ValidationError(_("This file type is not allowed")) + if doc_file: + FileValidator(doc_file, context="document").validate() return doc_file diff --git a/geonode/documents/tests.py b/geonode/documents/tests.py index 9a12da25f18..992993e734e 100644 --- a/geonode/documents/tests.py +++ b/geonode/documents/tests.py @@ -338,6 +338,19 @@ def test_upload_document_form_size_limit(self): ) self.assertEqual(form.errors, {"doc_file": [expected_error]}) + @patch("geonode.upload.validators.FileValidator._detect_mime", return_value="application/x-dosexec") + def test_upload_document_form_rejects_file_with_mismatched_content_type(self, _mock_detect_mime): + form_data = { + "title": "GeoNode Map", + "permissions": '{"anonymous":"document_readonly","authenticated":"resourcebase_readwrite","users":[]}', + } + file_data = {"doc_file": SimpleUploadedFile("fake.pdf", b"MZ executable content", "application/pdf")} + + form = DocumentCreateForm(form_data, file_data) + + self.assertFalse(form.is_valid()) + self.assertIn("doc_file", form.errors) + def test_document_embed(self): """/documents/1 -> Test accessing the embed view of a document""" d = Document.objects.all().first() diff --git a/geonode/upload/validators.py b/geonode/upload/validators.py new file mode 100644 index 00000000000..54143e15ed5 --- /dev/null +++ b/geonode/upload/validators.py @@ -0,0 +1,138 @@ +######################################################################### +# +# Copyright (C) 2026 OSGeo +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +######################################################################### + +from pathlib import Path +import magic + +from django.conf import settings +from django.core.exceptions import ValidationError +from django.template.defaultfilters import filesizeformat +from django.utils.translation import gettext_lazy as _ + +from geonode.documents.enumerations import DOCUMENT_MAGIC_MIMETYPE_MAP +from geonode.upload.api.exceptions import FileUploadLimitException +from geonode.upload.models import UploadSizeLimit + + +DOCUMENT_UPLOAD_SIZE_SLUG = "document_upload_size" +DEFAULT_SAMPLE_SIZE = 4096 + + +class FileValidator: + def __init__(self, file, context="document"): + self.file = file + self.context = context + self.extension = None + self.detected_mime = None + + def validate(self): + self.validate_extension() + self.validate_size() + self.validate_magic_mime() + return True + + def validate_extension(self): + extension = self._get_extension() + if self.context == "document" and extension not in settings.ALLOWED_DOCUMENT_TYPES: + raise ValidationError(_("The file provided is not in the supported extensions list")) + self.extension = extension + + def validate_size(self): + file_size = self._get_file_size() + if file_size is None: + return + + max_size = self._get_max_size() + if file_size > max_size: + raise FileUploadLimitException( + _(f"File size size exceeds {filesizeformat(max_size)}. Please try again with a smaller file.") + ) + + def validate_magic_mime(self): + sample = self._read_sample() + if not sample: + raise ValidationError(_("The uploaded file is empty or could not be read.")) + + detected_mime = self._detect_mime(sample) + self.detected_mime = detected_mime + if not detected_mime: + raise ValidationError(_("The uploaded file type could not be detected.")) + + expected_mimes = DOCUMENT_MAGIC_MIMETYPE_MAP.get(self.extension, set()) + + if not expected_mimes: + raise ValidationError(_(f"File type .{self.extension} cannot be verified by MIME detection.")) + + if detected_mime not in expected_mimes: + raise ValidationError( + _("The uploaded file content does not match the expected file type " f"for .{self.extension} files.") + ) + + def _get_extension(self): + filename = self.file if isinstance(self.file, str) else getattr(self.file, "name", "") + extension = Path(filename).suffix.replace(".", "").lower() + if not extension: + raise ValidationError(_("The uploaded file has no extension.")) + return extension + + def _get_file_size(self): + if isinstance(self.file, str): + try: + return Path(self.file).stat().st_size + except OSError: + return None + return getattr(self.file, "size", None) + + def _get_max_size(self): + slug = DOCUMENT_UPLOAD_SIZE_SLUG if self.context == "document" else f"{self.context}_upload_size" + try: + max_size_db_obj = UploadSizeLimit.objects.get(slug=slug) + except UploadSizeLimit.DoesNotExist: + max_size_db_obj = UploadSizeLimit.objects.create_default_limit_with_slug(slug=slug) + return max_size_db_obj.max_size + + def _read_sample(self): + if isinstance(self.file, str): + try: + with open(self.file, "rb") as file_pointer: + return file_pointer.read(DEFAULT_SAMPLE_SIZE) + except OSError: + raise ValidationError(_("The uploaded file could not be read.")) + + position = None + try: + position = self.file.tell() + except (AttributeError, OSError): + pass + + sample = self.file.read(DEFAULT_SAMPLE_SIZE) + + if position is not None: + try: + self.file.seek(position) + except (AttributeError, OSError): + pass + + return sample + + def _detect_mime(self, sample): + try: + return magic.from_buffer(sample, mime=True) + except Exception: + raise ValidationError(_("File type validation could not inspect the uploaded file.")) diff --git a/pyproject.toml b/pyproject.toml index 9fb297a4990..ef5ce9f968f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -161,6 +161,8 @@ dependencies = [ "jwcrypto>=1.5.7", # dependency for XLSX handler "python-calamine==0.6.2", + # dependency for file validation + "python-magic==0.4.27", ] [project.optional-dependencies] From bd17bffef2aa500998e4b01dd8da4f2b4a736d51 Mon Sep 17 00:00:00 2001 From: sijandh35 Date: Tue, 21 Apr 2026 07:42:26 +0000 Subject: [PATCH 2/2] [Fixes #14153] update gemni reviews --- geonode/documents/api/views.py | 3 ++- geonode/upload/validators.py | 5 ++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/geonode/documents/api/views.py b/geonode/documents/api/views.py index ba51bcb4a77..dd3bc4339a0 100644 --- a/geonode/documents/api/views.py +++ b/geonode/documents/api/views.py @@ -20,6 +20,7 @@ import logging from pathlib import Path from uuid import uuid4 +from urllib.parse import urlparse from django.conf import settings from django.core.exceptions import ValidationError @@ -112,7 +113,7 @@ def perform_create(self, serializer): raise DocumentException(detail=exc.messages[0]) extension = file_validator.extension else: - extension = (extension or Path(doc_url).suffix.replace(".", "")).lower() + extension = (extension or Path(urlparse(doc_url).path).suffix.replace(".", "")).lower() if extension not in settings.ALLOWED_DOCUMENT_TYPES: raise DocumentException("The file provided is not in the supported extensions list") diff --git a/geonode/upload/validators.py b/geonode/upload/validators.py index 54143e15ed5..2255e644196 100644 --- a/geonode/upload/validators.py +++ b/geonode/upload/validators.py @@ -102,10 +102,9 @@ def _get_file_size(self): def _get_max_size(self): slug = DOCUMENT_UPLOAD_SIZE_SLUG if self.context == "document" else f"{self.context}_upload_size" try: - max_size_db_obj = UploadSizeLimit.objects.get(slug=slug) + return UploadSizeLimit.objects.get(slug=slug).max_size except UploadSizeLimit.DoesNotExist: - max_size_db_obj = UploadSizeLimit.objects.create_default_limit_with_slug(slug=slug) - return max_size_db_obj.max_size + return settings.DEFAULT_MAX_UPLOAD_SIZE def _read_sample(self): if isinstance(self.file, str):