From 6c006a2e40fa9f4e6a2e56823867e36435191d03 Mon Sep 17 00:00:00 2001 From: Christopher Green Date: Fri, 16 Jan 2026 11:19:04 -0600 Subject: [PATCH 01/11] Add PDF restriction methods Assisted-by: Codex --- src/pdfrest/client.py | 217 ++++++++++++++++++++++++++++++++ src/pdfrest/models/_internal.py | 97 ++++++++++++++ src/pdfrest/types/__init__.py | 4 + src/pdfrest/types/public.py | 17 +++ 4 files changed, 335 insertions(+) diff --git a/src/pdfrest/client.py b/src/pdfrest/client.py index 0035227c..ce0b47a8 100644 --- a/src/pdfrest/client.py +++ b/src/pdfrest/client.py @@ -92,12 +92,14 @@ PdfRedactionApplyPayload, PdfRedactionPreviewPayload, PdfRestRawFileResponse, + PdfRestrictPayload, PdfSplitPayload, PdfToExcelPayload, PdfToPdfaPayload, PdfToPdfxPayload, PdfToPowerpointPayload, PdfToWordPayload, + PdfUnrestrictPayload, PdfXfaToAcroformsPayload, PngPdfRestPayload, SummarizePdfTextPayload, @@ -120,6 +122,7 @@ PdfMergeInput, PdfPageSelection, PdfRedactionInstruction, + PdfRestriction, PdfRGBColor, PdfXType, PngColorModel, @@ -2684,6 +2687,113 @@ def flatten_pdf_forms( timeout=timeout, ) + def add_permissions_password( + self, + file: PdfRestFile | Sequence[PdfRestFile], + *, + new_permissions_password: str, + restrictions: Sequence[PdfRestriction] | None = None, + current_open_password: str | None = None, + output: str | None = None, + extra_query: Query | None = None, + extra_headers: AnyMapping | None = None, + extra_body: Body | None = None, + timeout: TimeoutTypes | None = None, + ) -> PdfRestFileBasedResponse: + """Add a permissions password and optional restrictions to a PDF.""" + + payload: dict[str, Any] = { + "files": file, + "new_permissions_password": new_permissions_password, + } + if restrictions is not None: + payload["restrictions"] = restrictions + if current_open_password is not None: + payload["current_open_password"] = current_open_password + if output is not None: + payload["output"] = output + + return self._post_file_operation( + endpoint="/restricted-pdf", + payload=payload, + payload_model=PdfRestrictPayload, + extra_query=extra_query, + extra_headers=extra_headers, + extra_body=extra_body, + timeout=timeout, + ) + + def change_permissions_password( + self, + file: PdfRestFile | Sequence[PdfRestFile], + *, + current_permissions_password: str, + new_permissions_password: str, + restrictions: Sequence[PdfRestriction] | None = None, + current_open_password: str | None = None, + output: str | None = None, + extra_query: Query | None = None, + extra_headers: AnyMapping | None = None, + extra_body: Body | None = None, + timeout: TimeoutTypes | None = None, + ) -> PdfRestFileBasedResponse: + """Rotate the permissions password and optionally update restrictions.""" + + payload: dict[str, Any] = { + "files": file, + "current_permissions_password": current_permissions_password, + "new_permissions_password": new_permissions_password, + } + if restrictions is not None: + payload["restrictions"] = restrictions + if current_open_password is not None: + payload["current_open_password"] = current_open_password + if output is not None: + payload["output"] = output + + return self._post_file_operation( + endpoint="/restricted-pdf", + payload=payload, + payload_model=PdfRestrictPayload, + extra_query=extra_query, + extra_headers=extra_headers, + extra_body=extra_body, + timeout=timeout, + ) + + def remove_permissions_password( + self, + file: PdfRestFile | Sequence[PdfRestFile], + *, + current_permissions_password: str, + current_open_password: str | None = None, + output: str | None = None, + extra_query: Query | None = None, + extra_headers: AnyMapping | None = None, + extra_body: Body | None = None, + timeout: TimeoutTypes | None = None, + ) -> PdfRestFileBasedResponse: + """Remove permissions restrictions from a PDF.""" + + payload: dict[str, Any] = { + "files": file, + "current_permissions_password": current_permissions_password, + } + if current_open_password is not None: + payload["current_open_password"] = current_open_password + if output is not None: + payload["output"] = output + + return self._post_file_operation( + endpoint="/unrestricted-pdf", + payload=payload, + payload_model=PdfUnrestrictPayload, + extra_query=extra_query, + extra_headers=extra_headers, + extra_body=extra_body, + timeout=timeout, + ) + def compress_pdf( self, file: PdfRestFile | Sequence[PdfRestFile], @@ -3721,6 +3831,113 @@ async def flatten_pdf_forms( timeout=timeout, ) + async def add_permissions_password( + self, + file: PdfRestFile | Sequence[PdfRestFile], + *, + new_permissions_password: str, + restrictions: Sequence[PdfRestriction] | None = None, + current_open_password: str | None = None, + output: str | None = None, + extra_query: Query | None = None, + extra_headers: AnyMapping | None = None, + extra_body: Body | None = None, + timeout: TimeoutTypes | None = None, + ) -> PdfRestFileBasedResponse: + """Asynchronously add a permissions password and optional restrictions to a PDF.""" + + payload: dict[str, Any] = { + "files": file, + "new_permissions_password": new_permissions_password, + } + if restrictions is not None: + payload["restrictions"] = restrictions + if current_open_password is not None: + payload["current_open_password"] = current_open_password + if output is not None: + payload["output"] = output + + return await self._post_file_operation( + endpoint="/restricted-pdf", + payload=payload, + payload_model=PdfRestrictPayload, + extra_query=extra_query, + extra_headers=extra_headers, + extra_body=extra_body, + timeout=timeout, + ) + + async def change_permissions_password( + self, + file: PdfRestFile | Sequence[PdfRestFile], + *, + current_permissions_password: str, + new_permissions_password: str, + restrictions: Sequence[PdfRestriction] | None = None, + current_open_password: str | None = None, + output: str | None = None, + extra_query: Query | None = None, + extra_headers: AnyMapping | None = None, + extra_body: Body | None = None, + timeout: TimeoutTypes | None = None, + ) -> PdfRestFileBasedResponse: + """Asynchronously rotate the permissions password and optionally update restrictions.""" + + payload: dict[str, Any] = { + "files": file, + "current_permissions_password": current_permissions_password, + "new_permissions_password": new_permissions_password, + } + if restrictions is not None: + payload["restrictions"] = restrictions + if current_open_password is not None: + payload["current_open_password"] = current_open_password + if output is not None: + payload["output"] = output + + return await self._post_file_operation( + endpoint="/restricted-pdf", + payload=payload, + payload_model=PdfRestrictPayload, + extra_query=extra_query, + extra_headers=extra_headers, + extra_body=extra_body, + timeout=timeout, + ) + + async def remove_permissions_password( + self, + file: PdfRestFile | Sequence[PdfRestFile], + *, + current_permissions_password: str, + current_open_password: str | None = None, + output: str | None = None, + extra_query: Query | None = None, + extra_headers: AnyMapping | None = None, + extra_body: Body | None = None, + timeout: TimeoutTypes | None = None, + ) -> PdfRestFileBasedResponse: + """Asynchronously remove permissions restrictions from a PDF.""" + + payload: dict[str, Any] = { + "files": file, + "current_permissions_password": current_permissions_password, + } + if current_open_password is not None: + payload["current_open_password"] = current_open_password + if output is not None: + payload["output"] = output + + return await self._post_file_operation( + endpoint="/unrestricted-pdf", + payload=payload, + payload_model=PdfUnrestrictPayload, + extra_query=extra_query, + extra_headers=extra_headers, + extra_body=extra_body, + timeout=timeout, + ) + async def compress_pdf( self, file: PdfRestFile | Sequence[PdfRestFile], diff --git a/src/pdfrest/models/_internal.py b/src/pdfrest/models/_internal.py index 1c654f0d..9c916e1c 100644 --- a/src/pdfrest/models/_internal.py +++ b/src/pdfrest/models/_internal.py @@ -26,6 +26,7 @@ OcrLanguage, PdfAType, PdfInfoQuery, + PdfRestriction, PdfXType, SummaryFormat, SummaryOutputFormat, @@ -1142,6 +1143,102 @@ class PdfFlattenAnnotationsPayload(BaseModel): ] = None +class PdfRestrictPayload(BaseModel): + """Adapt caller options into a pdfRest-ready restrict-PDF request payload.""" + + files: Annotated[ + list[PdfRestFile], + Field( + min_length=1, + max_length=1, + validation_alias=AliasChoices("file", "files"), + serialization_alias="id", + ), + BeforeValidator(_ensure_list), + AfterValidator( + _allowed_mime_types("application/pdf", error_msg="Must be a PDF file") + ), + PlainSerializer(_serialize_as_first_file_id), + ] + new_permissions_password: Annotated[ + str, + Field( + serialization_alias="new_permissions_password", + min_length=6, + max_length=128, + ), + ] + restrictions: Annotated[ + list[PdfRestriction] | None, + Field(serialization_alias="restrictions", min_length=1, default=None), + BeforeValidator(_ensure_list), + ] = None + current_permissions_password: Annotated[ + str | None, + Field( + serialization_alias="current_permissions_password", + min_length=1, + max_length=128, + default=None, + ), + ] = None + current_open_password: Annotated[ + str | None, + Field( + serialization_alias="current_open_password", + min_length=1, + max_length=128, + default=None, + ), + ] = None + output: Annotated[ + str | None, + Field(serialization_alias="output", min_length=1, default=None), + AfterValidator(_validate_output_prefix), + ] = None + + +class PdfUnrestrictPayload(BaseModel): + """Adapt caller options into a pdfRest-ready unrestrict-PDF request payload.""" + + files: Annotated[ + list[PdfRestFile], + Field( + min_length=1, + max_length=1, + validation_alias=AliasChoices("file", "files"), + serialization_alias="id", + ), + BeforeValidator(_ensure_list), + AfterValidator( + _allowed_mime_types("application/pdf", error_msg="Must be a PDF file") + ), + PlainSerializer(_serialize_as_first_file_id), + ] + current_permissions_password: Annotated[ + str, + Field( + serialization_alias="current_permissions_password", + min_length=1, + max_length=128, + ), + ] + current_open_password: Annotated[ + str | None, + Field( + serialization_alias="current_open_password", + min_length=1, + max_length=128, + default=None, + ), + ] = None + output: Annotated[ + str | None, + Field(serialization_alias="output", min_length=1, default=None), + AfterValidator(_validate_output_prefix), + ] = None + + class BmpPdfRestPayload(BasePdfRestGraphicPayload[Literal["rgb", "gray"]]): """Adapt caller options into a pdfRest-ready BMP request payload.""" diff --git a/src/pdfrest/types/__init__.py b/src/pdfrest/types/__init__.py index 48f78b03..0d139641 100644 --- a/src/pdfrest/types/__init__.py +++ b/src/pdfrest/types/__init__.py @@ -3,6 +3,7 @@ from .public import ( ALL_OCR_LANGUAGES, ALL_PDF_INFO_QUERIES, + ALL_PDF_RESTRICTIONS, BmpColorModel, CompressionLevel, ExtractTextGranularity, @@ -19,6 +20,7 @@ PdfRedactionInstruction, PdfRedactionPreset, PdfRedactionType, + PdfRestriction, PdfRGBColor, PdfXType, PngColorModel, @@ -32,6 +34,7 @@ __all__ = [ "ALL_OCR_LANGUAGES", "ALL_PDF_INFO_QUERIES", + "ALL_PDF_RESTRICTIONS", "BmpColorModel", "CompressionLevel", "ExtractTextGranularity", @@ -49,6 +52,7 @@ "PdfRedactionInstruction", "PdfRedactionPreset", "PdfRedactionType", + "PdfRestriction", "PdfXType", "PngColorModel", "SummaryFormat", diff --git a/src/pdfrest/types/public.py b/src/pdfrest/types/public.py index 6472f2e7..020f012f 100644 --- a/src/pdfrest/types/public.py +++ b/src/pdfrest/types/public.py @@ -15,6 +15,7 @@ __all__ = ( "ALL_OCR_LANGUAGES", "ALL_PDF_INFO_QUERIES", + "ALL_PDF_RESTRICTIONS", "BmpColorModel", "CompressionLevel", "ExtractTextGranularity", @@ -32,6 +33,7 @@ "PdfRedactionInstruction", "PdfRedactionPreset", "PdfRedactionType", + "PdfRestriction", "PdfXType", "PngColorModel", "SummaryFormat", @@ -160,3 +162,18 @@ class PdfMergeSource(TypedDict, total=False): ALL_OCR_LANGUAGES: tuple[OcrLanguage, ...] = cast( tuple[OcrLanguage, ...], get_args(OcrLanguage) ) + +PdfRestriction = Literal[ + "print_low", + "print_high", + "edit_document_assembly", + "edit_fill_and_sign_form_fields", + "edit_annotations", + "edit_content", + "copy_content", + "accessibility_off", +] + +ALL_PDF_RESTRICTIONS: tuple[PdfRestriction, ...] = cast( + tuple[PdfRestriction, ...], get_args(PdfRestriction) +) From 8b4339228c0a7a156602873ad3147246d8ded110 Mon Sep 17 00:00:00 2001 From: Christopher Green Date: Fri, 16 Jan 2026 11:37:42 -0600 Subject: [PATCH 02/11] Add Restrict/Unrestrict PDF tests Assisted-by: Codex --- tests/live/test_live_permissions_password.py | 269 +++++++ tests/test_permissions_password.py | 695 +++++++++++++++++++ 2 files changed, 964 insertions(+) create mode 100644 tests/live/test_live_permissions_password.py create mode 100644 tests/test_permissions_password.py diff --git a/tests/live/test_live_permissions_password.py b/tests/live/test_live_permissions_password.py new file mode 100644 index 00000000..c99c0ad4 --- /dev/null +++ b/tests/live/test_live_permissions_password.py @@ -0,0 +1,269 @@ +from __future__ import annotations + +from typing import cast, get_args +from uuid import uuid4 + +import pytest + +from pdfrest import AsyncPdfRestClient, PdfRestApiError, PdfRestClient +from pdfrest.models import PdfRestFile +from pdfrest.types import PdfRestriction + +from ..resources import get_test_resource_path + + +def make_password(label: str) -> str: + return f"{label}-{uuid4().hex}" + + +PDF_RESTRICTIONS: tuple[PdfRestriction, ...] = cast( + tuple[PdfRestriction, ...], get_args(PdfRestriction) +) +RESTRICTION_PARAMS = [ + pytest.param(restriction, id=restriction) for restriction in PDF_RESTRICTIONS +] + + +@pytest.fixture(scope="module") +def uploaded_pdf_for_permissions( + pdfrest_api_key: str, + pdfrest_live_base_url: str, +) -> PdfRestFile: + resource = get_test_resource_path("report.pdf") + with PdfRestClient( + api_key=pdfrest_api_key, + base_url=pdfrest_live_base_url, + ) as client: + return client.files.create_from_paths([resource])[0] + + +@pytest.mark.parametrize("restriction", RESTRICTION_PARAMS) +def test_live_add_permissions_password( + pdfrest_api_key: str, + pdfrest_live_base_url: str, + uploaded_pdf_for_permissions: PdfRestFile, + restriction: PdfRestriction, +) -> None: + with PdfRestClient( + api_key=pdfrest_api_key, + base_url=pdfrest_live_base_url, + ) as client: + new_password = make_password("live-perm") + response = client.add_permissions_password( + uploaded_pdf_for_permissions, + new_permissions_password=new_password, + restrictions=[restriction], + output="live-restrict", + ) + + assert response.output_files + output_file = response.output_file + assert output_file.type == "application/pdf" + assert output_file.name.startswith("live-restrict") + assert str(response.input_id) == str(uploaded_pdf_for_permissions.id) + + +@pytest.mark.asyncio +@pytest.mark.parametrize("restriction", RESTRICTION_PARAMS) +async def test_live_async_add_permissions_password( + pdfrest_api_key: str, + pdfrest_live_base_url: str, + uploaded_pdf_for_permissions: PdfRestFile, + restriction: PdfRestriction, +) -> None: + async with AsyncPdfRestClient( + api_key=pdfrest_api_key, + base_url=pdfrest_live_base_url, + ) as client: + new_password = make_password("async-live-perm") + response = await client.add_permissions_password( + uploaded_pdf_for_permissions, + new_permissions_password=new_password, + restrictions=[restriction], + output="async-restrict", + ) + + assert response.output_files + output_file = response.output_file + assert output_file.type == "application/pdf" + assert output_file.name.startswith("async-restrict") + assert str(response.input_id) == str(uploaded_pdf_for_permissions.id) + + +def test_live_change_permissions_password( + pdfrest_api_key: str, + pdfrest_live_base_url: str, + uploaded_pdf_for_permissions: PdfRestFile, +) -> None: + with PdfRestClient( + api_key=pdfrest_api_key, + base_url=pdfrest_live_base_url, + ) as client: + current_password = make_password("old-live") + new_password = make_password("new-live") + restricted_response = client.add_permissions_password( + uploaded_pdf_for_permissions, + new_permissions_password=current_password, + restrictions=["print_low"], + output="live-restrict-old", + ) + restricted_file = restricted_response.output_file + response = client.change_permissions_password( + restricted_file, + current_permissions_password=current_password, + new_permissions_password=new_password, + restrictions=["copy_content"], + output="live-restrict-new", + ) + + assert response.output_files + output_file = response.output_file + assert output_file.name.startswith("live-restrict-new") + assert output_file.type == "application/pdf" + assert str(response.input_id) == str(restricted_file.id) + + +@pytest.mark.asyncio +async def test_live_async_change_permissions_password( + pdfrest_api_key: str, + pdfrest_live_base_url: str, + uploaded_pdf_for_permissions: PdfRestFile, +) -> None: + async with AsyncPdfRestClient( + api_key=pdfrest_api_key, + base_url=pdfrest_live_base_url, + ) as client: + current_password = make_password("async-old") + new_password = make_password("async-new") + restricted_response = await client.add_permissions_password( + uploaded_pdf_for_permissions, + new_permissions_password=current_password, + restrictions=["edit_content"], + output="async-live-restrict-old", + ) + restricted_file = restricted_response.output_file + response = await client.change_permissions_password( + restricted_file, + current_permissions_password=current_password, + new_permissions_password=new_password, + restrictions=["edit_annotations"], + output="async-live-restrict-new", + ) + + assert response.output_files + output_file = response.output_file + assert output_file.name.startswith("async-live-restrict-new") + assert output_file.type == "application/pdf" + assert str(response.input_id) == str(restricted_file.id) + + +def test_live_remove_permissions_password( + pdfrest_api_key: str, + pdfrest_live_base_url: str, + uploaded_pdf_for_permissions: PdfRestFile, +) -> None: + with PdfRestClient( + api_key=pdfrest_api_key, + base_url=pdfrest_live_base_url, + ) as client: + current_password = make_password("remove-live") + restricted_response = client.add_permissions_password( + uploaded_pdf_for_permissions, + new_permissions_password=current_password, + restrictions=["print_high"], + output="live-to-remove", + ) + restricted_file = restricted_response.output_file + response = client.remove_permissions_password( + restricted_file, + current_permissions_password=current_password, + output="live-removed", + ) + + assert response.output_files + output_file = response.output_file + assert output_file.name.startswith("live-removed") + assert output_file.type == "application/pdf" + assert str(response.input_id) == str(restricted_file.id) + + +@pytest.mark.asyncio +async def test_live_async_remove_permissions_password( + pdfrest_api_key: str, + pdfrest_live_base_url: str, + uploaded_pdf_for_permissions: PdfRestFile, +) -> None: + async with AsyncPdfRestClient( + api_key=pdfrest_api_key, + base_url=pdfrest_live_base_url, + ) as client: + current_password = make_password("async-remove") + restricted_response = await client.add_permissions_password( + uploaded_pdf_for_permissions, + new_permissions_password=current_password, + restrictions=["accessibility_off"], + output="async-live-to-remove", + ) + restricted_file = restricted_response.output_file + response = await client.remove_permissions_password( + restricted_file, + current_permissions_password=current_password, + output="async-live-removed", + ) + + assert response.output_files + output_file = response.output_file + assert output_file.name.startswith("async-live-removed") + assert output_file.type == "application/pdf" + assert str(response.input_id) == str(restricted_file.id) + + +def test_live_remove_permissions_password_invalid_password( + pdfrest_api_key: str, + pdfrest_live_base_url: str, + uploaded_pdf_for_permissions: PdfRestFile, +) -> None: + with PdfRestClient( + api_key=pdfrest_api_key, + base_url=pdfrest_live_base_url, + ) as client: + correct_password = make_password("live-wrong") + wrong_password = make_password("incorrect") + restricted_response = client.add_permissions_password( + uploaded_pdf_for_permissions, + new_permissions_password=correct_password, + restrictions=["edit_annotations"], + output="live-invalid-remove", + ) + restricted_file = restricted_response.output_file + with pytest.raises(PdfRestApiError, match="permissions password"): + client.remove_permissions_password( + restricted_file, + current_permissions_password=wrong_password, + ) + + +@pytest.mark.asyncio +async def test_live_async_remove_permissions_password_invalid_password( + pdfrest_api_key: str, + pdfrest_live_base_url: str, + uploaded_pdf_for_permissions: PdfRestFile, +) -> None: + async with AsyncPdfRestClient( + api_key=pdfrest_api_key, + base_url=pdfrest_live_base_url, + ) as client: + correct_password = make_password("async-live-wrong") + wrong_password = make_password("async-wrong") + restricted_response = await client.add_permissions_password( + uploaded_pdf_for_permissions, + new_permissions_password=correct_password, + restrictions=["edit_content"], + output="async-live-invalid-remove", + ) + restricted_file = restricted_response.output_file + with pytest.raises(PdfRestApiError, match="permissions password"): + await client.remove_permissions_password( + restricted_file, + current_permissions_password=wrong_password, + ) diff --git a/tests/test_permissions_password.py b/tests/test_permissions_password.py new file mode 100644 index 00000000..866dd416 --- /dev/null +++ b/tests/test_permissions_password.py @@ -0,0 +1,695 @@ +from __future__ import annotations + +import json +from uuid import uuid4 + +import httpx +import pytest +from pydantic import ValidationError + +from pdfrest import AsyncPdfRestClient, PdfRestClient +from pdfrest.models import PdfRestFile, PdfRestFileBasedResponse, PdfRestFileID +from pdfrest.models._internal import PdfRestrictPayload, PdfUnrestrictPayload +from pdfrest.types import PdfRestriction + +from .graphics_test_helpers import ( + ASYNC_API_KEY, + VALID_API_KEY, + build_file_info_payload, + make_pdf_file, +) + + +def make_password(label: str) -> str: + return f"{label}-{uuid4().hex}" + + +def make_non_pdf_file(file_id: str) -> PdfRestFile: + return PdfRestFile.model_validate( + build_file_info_payload( + file_id, + "example.png", + "image/png", + ) + ) + + +@pytest.mark.parametrize( + "restrictions", + [ + pytest.param(None, id="none"), + pytest.param(["print_low"], id="single"), + pytest.param(["print_low", "copy_content"], id="multiple"), + ], +) +def test_add_permissions_password_success( + monkeypatch: pytest.MonkeyPatch, restrictions: list[PdfRestriction] | None +) -> None: + monkeypatch.delenv("PDFREST_API_KEY", raising=False) + input_file = make_pdf_file(PdfRestFileID.generate(1)) + output_id = str(PdfRestFileID.generate()) + new_password = make_password("secure") + open_password = make_password("open") + payload_input: dict[str, object] = { + "files": [input_file], + "new_permissions_password": new_password, + "current_open_password": open_password, + "output": "restricted", + } + if restrictions is not None: + payload_input["restrictions"] = restrictions + payload_dump = PdfRestrictPayload.model_validate(payload_input).model_dump( + mode="json", by_alias=True, exclude_none=True, exclude_unset=True + ) + + seen: dict[str, int] = {"post": 0, "get": 0} + + def handler(request: httpx.Request) -> httpx.Response: + if request.method == "POST" and request.url.path == "/restricted-pdf": + seen["post"] += 1 + payload = json.loads(request.content.decode("utf-8")) + assert payload == payload_dump + return httpx.Response( + 200, + json={ + "inputId": [input_file.id], + "outputId": [output_id], + }, + ) + if request.method == "GET" and request.url.path == f"/resource/{output_id}": + seen["get"] += 1 + assert request.url.params["format"] == "info" + return httpx.Response( + 200, + json=build_file_info_payload( + output_id, + "restricted.pdf", + "application/pdf", + ), + ) + msg = f"Unexpected request {request.method} {request.url}" + raise AssertionError(msg) + + transport = httpx.MockTransport(handler) + with PdfRestClient(api_key=VALID_API_KEY, transport=transport) as client: + response = client.add_permissions_password( + input_file, + new_permissions_password=new_password, + restrictions=restrictions, + current_open_password=open_password, + output="restricted", + ) + + assert seen == {"post": 1, "get": 1} + assert isinstance(response, PdfRestFileBasedResponse) + assert response.output_file.name == "restricted.pdf" + assert response.output_file.type == "application/pdf" + assert str(response.input_id) == str(input_file.id) + + +def test_add_permissions_password_request_customization( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.delenv("PDFREST_API_KEY", raising=False) + input_file = make_pdf_file(PdfRestFileID.generate(1)) + output_id = str(PdfRestFileID.generate()) + captured_timeout: dict[str, float | dict[str, float] | None] = {} + new_password = make_password("custom") + payload_dump = PdfRestrictPayload.model_validate( + { + "files": [input_file], + "new_permissions_password": new_password, + "restrictions": ["print_high"], + "output": "custom-restricted", + } + ).model_dump(mode="json", by_alias=True, exclude_none=True, exclude_unset=True) + + def handler(request: httpx.Request) -> httpx.Response: + if request.method == "POST" and request.url.path == "/restricted-pdf": + assert request.url.params["trace"] == "sync" + assert request.headers["X-Debug"] == "sync" + captured_timeout["value"] = request.extensions.get("timeout") + payload = json.loads(request.content.decode("utf-8")) + assert payload == {**payload_dump, "keep_open": "yes"} + return httpx.Response( + 200, + json={ + "inputId": [input_file.id], + "outputId": [output_id], + }, + ) + if request.method == "GET" and request.url.path == f"/resource/{output_id}": + assert request.url.params["format"] == "info" + assert request.url.params["trace"] == "sync" + assert request.headers["X-Debug"] == "sync" + return httpx.Response( + 200, + json=build_file_info_payload( + output_id, + "custom-restricted.pdf", + "application/pdf", + ), + ) + msg = f"Unexpected request {request.method} {request.url}" + raise AssertionError(msg) + + transport = httpx.MockTransport(handler) + with PdfRestClient(api_key=VALID_API_KEY, transport=transport) as client: + response = client.add_permissions_password( + input_file, + new_permissions_password=new_password, + restrictions=["print_high"], + output="custom-restricted", + extra_query={"trace": "sync"}, + extra_headers={"X-Debug": "sync"}, + extra_body={"keep_open": "yes"}, + timeout=0.55, + ) + + assert isinstance(response, PdfRestFileBasedResponse) + assert response.output_file.name == "custom-restricted.pdf" + timeout_value = captured_timeout["value"] + assert timeout_value is not None + if isinstance(timeout_value, dict): + assert all(pytest.approx(0.55) == value for value in timeout_value.values()) + else: + assert timeout_value == pytest.approx(0.55) + + +def test_change_permissions_password_request_customization( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.delenv("PDFREST_API_KEY", raising=False) + input_file = make_pdf_file(PdfRestFileID.generate(1)) + output_id = str(PdfRestFileID.generate()) + captured_timeout: dict[str, float | dict[str, float] | None] = {} + current_password = make_password("old") + new_password = make_password("new") + payload_dump = PdfRestrictPayload.model_validate( + { + "files": [input_file], + "current_permissions_password": current_password, + "new_permissions_password": new_password, + "restrictions": ["edit_content"], + "output": "rotated", + } + ).model_dump(mode="json", by_alias=True, exclude_none=True, exclude_unset=True) + + def handler(request: httpx.Request) -> httpx.Response: + if request.method == "POST" and request.url.path == "/restricted-pdf": + assert request.url.params["trace"] == "sync" + assert request.headers["X-Debug"] == "sync" + captured_timeout["value"] = request.extensions.get("timeout") + payload = json.loads(request.content.decode("utf-8")) + assert payload == {**payload_dump, "diagnostics": "on"} + return httpx.Response( + 200, + json={ + "inputId": [input_file.id], + "outputId": [output_id], + }, + ) + if request.method == "GET" and request.url.path == f"/resource/{output_id}": + assert request.url.params["format"] == "info" + assert request.url.params["trace"] == "sync" + assert request.headers["X-Debug"] == "sync" + return httpx.Response( + 200, + json=build_file_info_payload( + output_id, + "rotated.pdf", + "application/pdf", + ), + ) + msg = f"Unexpected request {request.method} {request.url}" + raise AssertionError(msg) + + transport = httpx.MockTransport(handler) + with PdfRestClient(api_key=VALID_API_KEY, transport=transport) as client: + response = client.change_permissions_password( + input_file, + current_permissions_password=current_password, + new_permissions_password=new_password, + restrictions=["edit_content"], + output="rotated", + extra_query={"trace": "sync"}, + extra_headers={"X-Debug": "sync"}, + extra_body={"diagnostics": "on"}, + timeout=0.77, + ) + + assert isinstance(response, PdfRestFileBasedResponse) + assert response.output_file.name == "rotated.pdf" + timeout_value = captured_timeout["value"] + assert timeout_value is not None + if isinstance(timeout_value, dict): + assert all(pytest.approx(0.77) == value for value in timeout_value.values()) + else: + assert timeout_value == pytest.approx(0.77) + + +def test_remove_permissions_password_success( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.delenv("PDFREST_API_KEY", raising=False) + input_file = make_pdf_file(PdfRestFileID.generate(1)) + output_id = str(PdfRestFileID.generate()) + current_password = make_password("old") + payload_dump = PdfUnrestrictPayload.model_validate( + { + "files": [input_file], + "current_permissions_password": current_password, + "output": "clean", + } + ).model_dump(mode="json", by_alias=True, exclude_none=True, exclude_unset=True) + + seen: dict[str, int] = {"post": 0, "get": 0} + + def handler(request: httpx.Request) -> httpx.Response: + if request.method == "POST" and request.url.path == "/unrestricted-pdf": + seen["post"] += 1 + payload = json.loads(request.content.decode("utf-8")) + assert payload == payload_dump + return httpx.Response( + 200, + json={ + "inputId": [input_file.id], + "outputId": [output_id], + }, + ) + if request.method == "GET" and request.url.path == f"/resource/{output_id}": + seen["get"] += 1 + assert request.url.params["format"] == "info" + return httpx.Response( + 200, + json=build_file_info_payload( + output_id, + "clean.pdf", + "application/pdf", + ), + ) + msg = f"Unexpected request {request.method} {request.url}" + raise AssertionError(msg) + + transport = httpx.MockTransport(handler) + with PdfRestClient(api_key=VALID_API_KEY, transport=transport) as client: + response = client.remove_permissions_password( + input_file, + current_permissions_password=current_password, + output="clean", + ) + + assert seen == {"post": 1, "get": 1} + assert isinstance(response, PdfRestFileBasedResponse) + assert response.output_file.name == "clean.pdf" + assert response.output_file.type == "application/pdf" + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "restrictions", + [ + pytest.param(None, id="none"), + pytest.param(["print_low"], id="single"), + pytest.param(["print_low", "copy_content"], id="multiple"), + ], +) +async def test_async_add_permissions_password_success( + monkeypatch: pytest.MonkeyPatch, restrictions: list[PdfRestriction] | None +) -> None: + monkeypatch.delenv("PDFREST_API_KEY", raising=False) + input_file = make_pdf_file(PdfRestFileID.generate(1)) + output_id = str(PdfRestFileID.generate()) + new_password = make_password("secure") + payload_input: dict[str, object] = { + "files": [input_file], + "new_permissions_password": new_password, + "output": "restricted", + } + if restrictions is not None: + payload_input["restrictions"] = restrictions + payload_dump = PdfRestrictPayload.model_validate(payload_input).model_dump( + mode="json", by_alias=True, exclude_none=True, exclude_unset=True + ) + + seen: dict[str, int] = {"post": 0, "get": 0} + + def handler(request: httpx.Request) -> httpx.Response: + if request.method == "POST" and request.url.path == "/restricted-pdf": + seen["post"] += 1 + payload = json.loads(request.content.decode("utf-8")) + assert payload == payload_dump + return httpx.Response( + 200, + json={ + "inputId": [input_file.id], + "outputId": [output_id], + }, + ) + if request.method == "GET" and request.url.path == f"/resource/{output_id}": + seen["get"] += 1 + assert request.url.params["format"] == "info" + return httpx.Response( + 200, + json=build_file_info_payload( + output_id, + "restricted.pdf", + "application/pdf", + ), + ) + msg = f"Unexpected request {request.method} {request.url}" + raise AssertionError(msg) + + transport = httpx.MockTransport(handler) + async with AsyncPdfRestClient(api_key=ASYNC_API_KEY, transport=transport) as client: + response = await client.add_permissions_password( + input_file, + new_permissions_password=new_password, + restrictions=restrictions, + output="restricted", + ) + + assert seen == {"post": 1, "get": 1} + assert isinstance(response, PdfRestFileBasedResponse) + assert response.output_file.name == "restricted.pdf" + assert response.output_file.type == "application/pdf" + assert str(response.input_id) == str(input_file.id) + + +@pytest.mark.asyncio +async def test_async_add_permissions_password_request_customization( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.delenv("PDFREST_API_KEY", raising=False) + input_file = make_pdf_file(PdfRestFileID.generate(2)) + output_id = str(PdfRestFileID.generate()) + captured_timeout: dict[str, float | dict[str, float] | None] = {} + new_password = make_password("async") + payload_dump = PdfRestrictPayload.model_validate( + { + "files": [input_file], + "new_permissions_password": new_password, + "restrictions": ["print_high"], + } + ).model_dump(mode="json", by_alias=True, exclude_none=True, exclude_unset=True) + + def handler(request: httpx.Request) -> httpx.Response: + if request.method == "POST" and request.url.path == "/restricted-pdf": + assert request.url.params["trace"] == "async" + assert request.headers["X-Debug"] == "async" + captured_timeout["value"] = request.extensions.get("timeout") + payload = json.loads(request.content.decode("utf-8")) + assert payload == {**payload_dump, "keep_open": "yes"} + return httpx.Response( + 200, + json={ + "inputId": [input_file.id], + "outputId": [output_id], + }, + ) + if request.method == "GET" and request.url.path == f"/resource/{output_id}": + assert request.url.params["format"] == "info" + assert request.url.params["trace"] == "async" + assert request.headers["X-Debug"] == "async" + return httpx.Response( + 200, + json=build_file_info_payload( + output_id, + "async-restricted.pdf", + "application/pdf", + ), + ) + msg = f"Unexpected request {request.method} {request.url}" + raise AssertionError(msg) + + transport = httpx.MockTransport(handler) + async with AsyncPdfRestClient(api_key=ASYNC_API_KEY, transport=transport) as client: + response = await client.add_permissions_password( + input_file, + new_permissions_password=new_password, + restrictions=["print_high"], + extra_query={"trace": "async"}, + extra_headers={"X-Debug": "async"}, + extra_body={"keep_open": "yes"}, + timeout=0.66, + ) + + assert isinstance(response, PdfRestFileBasedResponse) + assert response.output_file.name == "async-restricted.pdf" + timeout_value = captured_timeout["value"] + assert timeout_value is not None + if isinstance(timeout_value, dict): + assert all(pytest.approx(0.66) == value for value in timeout_value.values()) + else: + assert timeout_value == pytest.approx(0.66) + + +@pytest.mark.asyncio +async def test_async_change_permissions_password_request_customization( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.delenv("PDFREST_API_KEY", raising=False) + input_file = make_pdf_file(PdfRestFileID.generate(2)) + output_id = str(PdfRestFileID.generate()) + captured_timeout: dict[str, float | dict[str, float] | None] = {} + current_password = make_password("old-async") + new_password = make_password("new-async") + payload_dump = PdfRestrictPayload.model_validate( + { + "files": [input_file], + "current_permissions_password": current_password, + "new_permissions_password": new_password, + "restrictions": ["edit_content"], + "output": "async-rotated", + } + ).model_dump(mode="json", by_alias=True, exclude_none=True, exclude_unset=True) + + def handler(request: httpx.Request) -> httpx.Response: + if request.method == "POST" and request.url.path == "/restricted-pdf": + assert request.url.params["trace"] == "async" + assert request.headers["X-Debug"] == "async" + captured_timeout["value"] = request.extensions.get("timeout") + payload = json.loads(request.content.decode("utf-8")) + assert payload == {**payload_dump, "diagnostics": "on"} + return httpx.Response( + 200, + json={ + "inputId": [input_file.id], + "outputId": [output_id], + }, + ) + if request.method == "GET" and request.url.path == f"/resource/{output_id}": + assert request.url.params["format"] == "info" + assert request.url.params["trace"] == "async" + assert request.headers["X-Debug"] == "async" + return httpx.Response( + 200, + json=build_file_info_payload( + output_id, + "async-rotated.pdf", + "application/pdf", + ), + ) + msg = f"Unexpected request {request.method} {request.url}" + raise AssertionError(msg) + + transport = httpx.MockTransport(handler) + async with AsyncPdfRestClient(api_key=ASYNC_API_KEY, transport=transport) as client: + response = await client.change_permissions_password( + input_file, + current_permissions_password=current_password, + new_permissions_password=new_password, + restrictions=["edit_content"], + output="async-rotated", + extra_query={"trace": "async"}, + extra_headers={"X-Debug": "async"}, + extra_body={"diagnostics": "on"}, + timeout=0.88, + ) + + assert isinstance(response, PdfRestFileBasedResponse) + assert response.output_file.name == "async-rotated.pdf" + timeout_value = captured_timeout["value"] + assert timeout_value is not None + if isinstance(timeout_value, dict): + assert all(pytest.approx(0.88) == value for value in timeout_value.values()) + else: + assert timeout_value == pytest.approx(0.88) + + +@pytest.mark.asyncio +async def test_async_remove_permissions_password_success( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.delenv("PDFREST_API_KEY", raising=False) + input_file = make_pdf_file(PdfRestFileID.generate(2)) + output_id = str(PdfRestFileID.generate()) + current_password = make_password("secret") + payload_dump = PdfUnrestrictPayload.model_validate( + { + "files": [input_file], + "current_permissions_password": current_password, + } + ).model_dump(mode="json", by_alias=True, exclude_none=True, exclude_unset=True) + + def handler(request: httpx.Request) -> httpx.Response: + if request.method == "POST" and request.url.path == "/unrestricted-pdf": + payload = json.loads(request.content.decode("utf-8")) + assert payload == payload_dump + return httpx.Response( + 200, + json={ + "inputId": [input_file.id], + "outputId": [output_id], + }, + ) + if request.method == "GET" and request.url.path == f"/resource/{output_id}": + assert request.url.params["format"] == "info" + return httpx.Response( + 200, + json=build_file_info_payload( + output_id, + "async-clean.pdf", + "application/pdf", + ), + ) + msg = f"Unexpected request {request.method} {request.url}" + raise AssertionError(msg) + + transport = httpx.MockTransport(handler) + async with AsyncPdfRestClient(api_key=ASYNC_API_KEY, transport=transport) as client: + response = await client.remove_permissions_password( + input_file, + current_permissions_password=current_password, + ) + + assert isinstance(response, PdfRestFileBasedResponse) + assert response.output_file.name == "async-clean.pdf" + + +@pytest.mark.asyncio +async def test_async_permissions_password_validation( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.delenv("PDFREST_API_KEY", raising=False) + pdf_file = make_pdf_file(PdfRestFileID.generate(1)) + non_pdf_file = make_non_pdf_file(str(PdfRestFileID.generate())) + other_pdf = make_pdf_file(PdfRestFileID.generate(2)) + secure_password = make_password("secure") + another_password = make_password("another") + empty_password = "".join([]) + transport = httpx.MockTransport(lambda request: (_ for _ in ()).throw(RuntimeError)) + + async with AsyncPdfRestClient(api_key=ASYNC_API_KEY, transport=transport) as client: + with pytest.raises(ValidationError, match="Must be a PDF file"): + await client.add_permissions_password( + non_pdf_file, + new_permissions_password=secure_password, + ) + + with pytest.raises(ValidationError, match="at most 1"): + await client.add_permissions_password( + [pdf_file, other_pdf], + new_permissions_password=secure_password, + ) + + with pytest.raises( + ValidationError, match="String should have at least 6 characters" + ): + await client.add_permissions_password( + pdf_file, + new_permissions_password="a" * 5, + ) + + with pytest.raises( + ValidationError, match="List should have at least 1 item after validation" + ): + await client.add_permissions_password( + pdf_file, + new_permissions_password=secure_password, + restrictions=[], + ) + + with pytest.raises( + ValidationError, match="String should have at least 1 character" + ): + await client.remove_permissions_password( + pdf_file, + current_permissions_password=empty_password, + ) + + with pytest.raises(ValidationError, match="at most 1"): + await client.remove_permissions_password( + [pdf_file, other_pdf], + current_permissions_password=another_password, + ) + + +def test_permissions_password_validation(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.delenv("PDFREST_API_KEY", raising=False) + pdf_file = make_pdf_file(PdfRestFileID.generate(1)) + non_pdf_file = make_non_pdf_file(str(PdfRestFileID.generate())) + other_pdf = make_pdf_file(PdfRestFileID.generate(2)) + secure_password = make_password("secure") + another_password = make_password("another") + empty_password = "".join([]) + transport = httpx.MockTransport(lambda request: (_ for _ in ()).throw(RuntimeError)) + + with ( + PdfRestClient(api_key=VALID_API_KEY, transport=transport) as client, + pytest.raises(ValidationError, match="Must be a PDF file"), + ): + client.add_permissions_password( + non_pdf_file, + new_permissions_password=secure_password, + ) + + with ( + PdfRestClient(api_key=VALID_API_KEY, transport=transport) as client, + pytest.raises(ValidationError, match="at most 1"), + ): + client.add_permissions_password( + [pdf_file, other_pdf], + new_permissions_password=secure_password, + ) + + with ( + PdfRestClient(api_key=VALID_API_KEY, transport=transport) as client, + pytest.raises( + ValidationError, match="String should have at least 6 characters" + ), + ): + client.add_permissions_password( + pdf_file, + new_permissions_password="a" * 5, + ) + + with ( + PdfRestClient(api_key=VALID_API_KEY, transport=transport) as client, + pytest.raises( + ValidationError, match="List should have at least 1 item after validation" + ), + ): + client.add_permissions_password( + pdf_file, + new_permissions_password=secure_password, + restrictions=[], + ) + + with ( + PdfRestClient(api_key=VALID_API_KEY, transport=transport) as client, + pytest.raises(ValidationError, match="String should have at least 1 character"), + ): + client.remove_permissions_password( + pdf_file, + current_permissions_password=empty_password, + ) + + with ( + PdfRestClient(api_key=VALID_API_KEY, transport=transport) as client, + pytest.raises(ValidationError, match="at most 1"), + ): + client.remove_permissions_password( + [pdf_file, other_pdf], + current_permissions_password=another_password, + ) From efd3ede31431d0be77ce3d17cd297aa49da80ecc Mon Sep 17 00:00:00 2001 From: Christopher Green Date: Fri, 16 Jan 2026 14:52:57 -0600 Subject: [PATCH 03/11] Add PDF encryption methods Assisted-by: Codex --- src/pdfrest/client.py | 204 ++++++++++++++++++++++++++++++++ src/pdfrest/models/_internal.py | 91 ++++++++++++++ 2 files changed, 295 insertions(+) diff --git a/src/pdfrest/client.py b/src/pdfrest/client.py index ce0b47a8..10d85479 100644 --- a/src/pdfrest/client.py +++ b/src/pdfrest/client.py @@ -82,6 +82,8 @@ JpegPdfRestPayload, OcrPdfPayload, PdfCompressPayload, + PdfDecryptPayload, + PdfEncryptPayload, PdfFlattenAnnotationsPayload, PdfFlattenFormsPayload, PdfFlattenTransparenciesPayload, @@ -2761,6 +2763,107 @@ def change_permissions_password( timeout=timeout, ) + def add_open_password( + self, + file: PdfRestFile | Sequence[PdfRestFile], + *, + new_open_password: str, + current_permissions_password: str | None = None, + output: str | None = None, + extra_query: Query | None = None, + extra_headers: AnyMapping | None = None, + extra_body: Body | None = None, + timeout: TimeoutTypes | None = None, + ) -> PdfRestFileBasedResponse: + """Encrypt a PDF with a new open password.""" + + payload: dict[str, Any] = { + "files": file, + "new_open_password": new_open_password, + } + if current_permissions_password is not None: + payload["current_permissions_password"] = current_permissions_password + if output is not None: + payload["output"] = output + + return self._post_file_operation( + endpoint="/encrypted-pdf", + payload=payload, + payload_model=PdfEncryptPayload, + extra_query=extra_query, + extra_headers=extra_headers, + extra_body=extra_body, + timeout=timeout, + ) + + def change_open_password( + self, + file: PdfRestFile | Sequence[PdfRestFile], + *, + current_open_password: str, + new_open_password: str, + current_permissions_password: str | None = None, + output: str | None = None, + extra_query: Query | None = None, + extra_headers: AnyMapping | None = None, + extra_body: Body | None = None, + timeout: TimeoutTypes | None = None, + ) -> PdfRestFileBasedResponse: + """Rotate the open password for an encrypted PDF.""" + + payload: dict[str, Any] = { + "files": file, + "current_open_password": current_open_password, + "new_open_password": new_open_password, + } + if current_permissions_password is not None: + payload["current_permissions_password"] = current_permissions_password + if output is not None: + payload["output"] = output + + return self._post_file_operation( + endpoint="/encrypted-pdf", + payload=payload, + payload_model=PdfEncryptPayload, + extra_query=extra_query, + extra_headers=extra_headers, + extra_body=extra_body, + timeout=timeout, + ) + + def remove_open_password( + self, + file: PdfRestFile | Sequence[PdfRestFile], + *, + current_open_password: str, + current_permissions_password: str | None = None, + output: str | None = None, + extra_query: Query | None = None, + extra_headers: AnyMapping | None = None, + extra_body: Body | None = None, + timeout: TimeoutTypes | None = None, + ) -> PdfRestFileBasedResponse: + """Decrypt a PDF by removing its open password.""" + + payload: dict[str, Any] = { + "files": file, + "current_open_password": current_open_password, + } + if current_permissions_password is not None: + payload["current_permissions_password"] = current_permissions_password + if output is not None: + payload["output"] = output + + return self._post_file_operation( + endpoint="/decrypted-pdf", + payload=payload, + payload_model=PdfDecryptPayload, + extra_query=extra_query, + extra_headers=extra_headers, + extra_body=extra_body, + timeout=timeout, + ) + def remove_permissions_password( self, file: PdfRestFile | Sequence[PdfRestFile], @@ -3938,6 +4041,107 @@ async def remove_permissions_password( timeout=timeout, ) + async def add_open_password( + self, + file: PdfRestFile | Sequence[PdfRestFile], + *, + new_open_password: str, + current_permissions_password: str | None = None, + output: str | None = None, + extra_query: Query | None = None, + extra_headers: AnyMapping | None = None, + extra_body: Body | None = None, + timeout: TimeoutTypes | None = None, + ) -> PdfRestFileBasedResponse: + """Asynchronously encrypt a PDF with a new open password.""" + + payload: dict[str, Any] = { + "files": file, + "new_open_password": new_open_password, + } + if current_permissions_password is not None: + payload["current_permissions_password"] = current_permissions_password + if output is not None: + payload["output"] = output + + return await self._post_file_operation( + endpoint="/encrypted-pdf", + payload=payload, + payload_model=PdfEncryptPayload, + extra_query=extra_query, + extra_headers=extra_headers, + extra_body=extra_body, + timeout=timeout, + ) + + async def change_open_password( + self, + file: PdfRestFile | Sequence[PdfRestFile], + *, + current_open_password: str, + new_open_password: str, + current_permissions_password: str | None = None, + output: str | None = None, + extra_query: Query | None = None, + extra_headers: AnyMapping | None = None, + extra_body: Body | None = None, + timeout: TimeoutTypes | None = None, + ) -> PdfRestFileBasedResponse: + """Asynchronously rotate the open password for an encrypted PDF.""" + + payload: dict[str, Any] = { + "files": file, + "current_open_password": current_open_password, + "new_open_password": new_open_password, + } + if current_permissions_password is not None: + payload["current_permissions_password"] = current_permissions_password + if output is not None: + payload["output"] = output + + return await self._post_file_operation( + endpoint="/encrypted-pdf", + payload=payload, + payload_model=PdfEncryptPayload, + extra_query=extra_query, + extra_headers=extra_headers, + extra_body=extra_body, + timeout=timeout, + ) + + async def remove_open_password( + self, + file: PdfRestFile | Sequence[PdfRestFile], + *, + current_open_password: str, + current_permissions_password: str | None = None, + output: str | None = None, + extra_query: Query | None = None, + extra_headers: AnyMapping | None = None, + extra_body: Body | None = None, + timeout: TimeoutTypes | None = None, + ) -> PdfRestFileBasedResponse: + """Asynchronously decrypt a PDF by removing its open password.""" + + payload: dict[str, Any] = { + "files": file, + "current_open_password": current_open_password, + } + if current_permissions_password is not None: + payload["current_permissions_password"] = current_permissions_password + if output is not None: + payload["output"] = output + + return await self._post_file_operation( + endpoint="/decrypted-pdf", + payload=payload, + payload_model=PdfDecryptPayload, + extra_query=extra_query, + extra_headers=extra_headers, + extra_body=extra_body, + timeout=timeout, + ) + async def compress_pdf( self, file: PdfRestFile | Sequence[PdfRestFile], diff --git a/src/pdfrest/models/_internal.py b/src/pdfrest/models/_internal.py index 9c916e1c..01202eda 100644 --- a/src/pdfrest/models/_internal.py +++ b/src/pdfrest/models/_internal.py @@ -1239,6 +1239,97 @@ class PdfUnrestrictPayload(BaseModel): ] = None +class PdfEncryptPayload(BaseModel): + """Adapt caller options into a pdfRest-ready encrypt-PDF request payload.""" + + files: Annotated[ + list[PdfRestFile], + Field( + min_length=1, + max_length=1, + validation_alias=AliasChoices("file", "files"), + serialization_alias="id", + ), + BeforeValidator(_ensure_list), + AfterValidator( + _allowed_mime_types("application/pdf", error_msg="Must be a PDF file") + ), + PlainSerializer(_serialize_as_first_file_id), + ] + new_open_password: Annotated[ + str, + Field( + serialization_alias="new_open_password", + min_length=6, + max_length=128, + ), + ] + current_open_password: Annotated[ + str | None, + Field( + serialization_alias="current_open_password", + min_length=1, + max_length=128, + default=None, + ), + ] = None + current_permissions_password: Annotated[ + str | None, + Field( + serialization_alias="current_permissions_password", + min_length=1, + max_length=128, + default=None, + ), + ] = None + output: Annotated[ + str | None, + Field(serialization_alias="output", min_length=1, default=None), + AfterValidator(_validate_output_prefix), + ] = None + + +class PdfDecryptPayload(BaseModel): + """Adapt caller options into a pdfRest-ready decrypt-PDF request payload.""" + + files: Annotated[ + list[PdfRestFile], + Field( + min_length=1, + max_length=1, + validation_alias=AliasChoices("file", "files"), + serialization_alias="id", + ), + BeforeValidator(_ensure_list), + AfterValidator( + _allowed_mime_types("application/pdf", error_msg="Must be a PDF file") + ), + PlainSerializer(_serialize_as_first_file_id), + ] + current_open_password: Annotated[ + str, + Field( + serialization_alias="current_open_password", + min_length=1, + max_length=128, + ), + ] + current_permissions_password: Annotated[ + str | None, + Field( + serialization_alias="current_permissions_password", + min_length=1, + max_length=128, + default=None, + ), + ] = None + output: Annotated[ + str | None, + Field(serialization_alias="output", min_length=1, default=None), + AfterValidator(_validate_output_prefix), + ] = None + + class BmpPdfRestPayload(BasePdfRestGraphicPayload[Literal["rgb", "gray"]]): """Adapt caller options into a pdfRest-ready BMP request payload.""" From d94aa967cdcf097e40af73ab592d6b62d6688e29 Mon Sep 17 00:00:00 2001 From: Christopher Green Date: Fri, 16 Jan 2026 15:39:16 -0600 Subject: [PATCH 04/11] Add Encrypt/Decrypt PDF tests Assisted-by: Codex --- tests/live/test_live_encrypt_pdf.py | 247 +++++++++++++++++ tests/test_encrypt_pdf.py | 406 ++++++++++++++++++++++++++++ 2 files changed, 653 insertions(+) create mode 100644 tests/live/test_live_encrypt_pdf.py create mode 100644 tests/test_encrypt_pdf.py diff --git a/tests/live/test_live_encrypt_pdf.py b/tests/live/test_live_encrypt_pdf.py new file mode 100644 index 00000000..535892f8 --- /dev/null +++ b/tests/live/test_live_encrypt_pdf.py @@ -0,0 +1,247 @@ +from __future__ import annotations + +from typing import cast, get_args +from uuid import uuid4 + +import pytest + +from pdfrest import AsyncPdfRestClient, PdfRestApiError, PdfRestClient +from pdfrest.models import PdfRestFile +from pdfrest.types import PdfRestriction + +from ..resources import get_test_resource_path + +PDF_RESTRICTIONS: tuple[PdfRestriction, ...] = cast( + tuple[PdfRestriction, ...], get_args(PdfRestriction) +) + + +def make_password(label: str) -> str: + return f"{label}-{uuid4().hex}" + + +@pytest.fixture(scope="module") +def uploaded_pdf_for_encrypt( + pdfrest_api_key: str, + pdfrest_live_base_url: str, +) -> PdfRestFile: + resource = get_test_resource_path("report.pdf") + with PdfRestClient( + api_key=pdfrest_api_key, + base_url=pdfrest_live_base_url, + ) as client: + return client.files.create_from_paths([resource])[0] + + +def test_live_add_open_password( + pdfrest_api_key: str, + pdfrest_live_base_url: str, + uploaded_pdf_for_encrypt: PdfRestFile, +) -> None: + with PdfRestClient( + api_key=pdfrest_api_key, + base_url=pdfrest_live_base_url, + ) as client: + response = client.add_open_password( + uploaded_pdf_for_encrypt, + new_open_password=make_password("live-open"), + output="live-encrypted", + ) + + assert response.output_files + output_file = response.output_file + assert output_file.type == "application/pdf" + assert output_file.name.startswith("live-encrypted") + assert str(response.input_id) == str(uploaded_pdf_for_encrypt.id) + + +@pytest.mark.asyncio +async def test_live_async_add_open_password( + pdfrest_api_key: str, + pdfrest_live_base_url: str, + uploaded_pdf_for_encrypt: PdfRestFile, +) -> None: + async with AsyncPdfRestClient( + api_key=pdfrest_api_key, + base_url=pdfrest_live_base_url, + ) as client: + response = await client.add_open_password( + uploaded_pdf_for_encrypt, + new_open_password=make_password("async-live-open"), + output="async-live-encrypted", + ) + + assert response.output_files + output_file = response.output_file + assert output_file.type == "application/pdf" + assert output_file.name.startswith("async-live-encrypted") + assert str(response.input_id) == str(uploaded_pdf_for_encrypt.id) + + +def test_live_change_open_password( + pdfrest_api_key: str, + pdfrest_live_base_url: str, + uploaded_pdf_for_encrypt: PdfRestFile, +) -> None: + with PdfRestClient( + api_key=pdfrest_api_key, + base_url=pdfrest_live_base_url, + ) as client: + current_password = make_password("live-open-current") + restricted = client.add_open_password( + uploaded_pdf_for_encrypt, + new_open_password=current_password, + output="live-open-old", + ).output_file + response = client.change_open_password( + restricted, + current_open_password=current_password, + new_open_password=make_password("live-open-new"), + output="live-open-new", + ) + + assert response.output_files + output_file = response.output_file + assert output_file.name.startswith("live-open-new") + assert output_file.type == "application/pdf" + assert str(response.input_id) == str(restricted.id) + + +@pytest.mark.asyncio +async def test_live_async_change_open_password( + pdfrest_api_key: str, + pdfrest_live_base_url: str, + uploaded_pdf_for_encrypt: PdfRestFile, +) -> None: + async with AsyncPdfRestClient( + api_key=pdfrest_api_key, + base_url=pdfrest_live_base_url, + ) as client: + current_password = make_password("async-live-open-current") + restricted = ( + await client.add_open_password( + uploaded_pdf_for_encrypt, + new_open_password=current_password, + output="async-live-open-old", + ) + ).output_file + response = await client.change_open_password( + restricted, + current_open_password=current_password, + new_open_password=make_password("async-live-open-new"), + output="async-live-open-new", + ) + + assert response.output_files + output_file = response.output_file + assert output_file.name.startswith("async-live-open-new") + assert output_file.type == "application/pdf" + assert str(response.input_id) == str(restricted.id) + + +def test_live_remove_open_password( + pdfrest_api_key: str, + pdfrest_live_base_url: str, + uploaded_pdf_for_encrypt: PdfRestFile, +) -> None: + with PdfRestClient( + api_key=pdfrest_api_key, + base_url=pdfrest_live_base_url, + ) as client: + current_password = make_password("live-open-remove") + restricted = client.add_open_password( + uploaded_pdf_for_encrypt, + new_open_password=current_password, + output="live-open-to-remove", + ).output_file + response = client.remove_open_password( + restricted, + current_open_password=current_password, + output="live-open-removed", + ) + + assert response.output_files + output_file = response.output_file + assert output_file.type == "application/pdf" + assert output_file.name.startswith("live-open-removed") + assert str(response.input_id) == str(restricted.id) + + +@pytest.mark.asyncio +async def test_live_async_remove_open_password( + pdfrest_api_key: str, + pdfrest_live_base_url: str, + uploaded_pdf_for_encrypt: PdfRestFile, +) -> None: + async with AsyncPdfRestClient( + api_key=pdfrest_api_key, + base_url=pdfrest_live_base_url, + ) as client: + current_password = make_password("async-live-open-remove") + restricted = ( + await client.add_open_password( + uploaded_pdf_for_encrypt, + new_open_password=current_password, + output="async-live-open-to-remove", + ) + ).output_file + response = await client.remove_open_password( + restricted, + current_open_password=current_password, + output="async-live-open-removed", + ) + + assert response.output_files + output_file = response.output_file + assert output_file.type == "application/pdf" + assert output_file.name.startswith("async-live-open-removed") + assert str(response.input_id) == str(restricted.id) + + +def test_live_remove_open_password_invalid_password( + pdfrest_api_key: str, + pdfrest_live_base_url: str, + uploaded_pdf_for_encrypt: PdfRestFile, +) -> None: + with PdfRestClient( + api_key=pdfrest_api_key, + base_url=pdfrest_live_base_url, + ) as client: + correct_password = make_password("live-open-correct") + wrong_password = make_password("live-open-wrong") + restricted = client.add_open_password( + uploaded_pdf_for_encrypt, + new_open_password=correct_password, + output="live-open-invalid", + ).output_file + with pytest.raises(PdfRestApiError, match="open password"): + client.remove_open_password( + restricted, + current_open_password=wrong_password, + ) + + +@pytest.mark.asyncio +async def test_live_async_remove_open_password_invalid_password( + pdfrest_api_key: str, + pdfrest_live_base_url: str, + uploaded_pdf_for_encrypt: PdfRestFile, +) -> None: + async with AsyncPdfRestClient( + api_key=pdfrest_api_key, + base_url=pdfrest_live_base_url, + ) as client: + correct_password = make_password("async-live-open-correct") + wrong_password = make_password("async-live-open-wrong") + restricted = ( + await client.add_open_password( + uploaded_pdf_for_encrypt, + new_open_password=correct_password, + output="async-live-open-invalid", + ) + ).output_file + with pytest.raises(PdfRestApiError, match="open password"): + await client.remove_open_password( + restricted, + current_open_password=wrong_password, + ) diff --git a/tests/test_encrypt_pdf.py b/tests/test_encrypt_pdf.py new file mode 100644 index 00000000..8bfbcf1c --- /dev/null +++ b/tests/test_encrypt_pdf.py @@ -0,0 +1,406 @@ +from __future__ import annotations + +import json +from uuid import uuid4 + +import httpx +import pytest +from pydantic import ValidationError + +from pdfrest import AsyncPdfRestClient, PdfRestClient +from pdfrest.models import PdfRestFile, PdfRestFileBasedResponse, PdfRestFileID +from pdfrest.models._internal import PdfDecryptPayload, PdfEncryptPayload + +from .graphics_test_helpers import ( + ASYNC_API_KEY, + VALID_API_KEY, + build_file_info_payload, + make_pdf_file, +) + + +def make_password(label: str) -> str: + return f"{label}-{uuid4().hex}" + + +def make_non_pdf_file(file_id: str) -> PdfRestFile: + return PdfRestFile.model_validate( + build_file_info_payload( + file_id, + "example.png", + "image/png", + ) + ) + + +@pytest.mark.parametrize( + "permissions_password", + [ + pytest.param(None, id="no-permissions"), + pytest.param(make_password("perm"), id="with-permissions"), + ], +) +def test_add_open_password_success( + monkeypatch: pytest.MonkeyPatch, permissions_password: str | None +) -> None: + monkeypatch.delenv("PDFREST_API_KEY", raising=False) + input_file = make_pdf_file(PdfRestFileID.generate(1)) + output_id = str(PdfRestFileID.generate()) + new_password = make_password("open") + payload_input: dict[str, object] = { + "files": [input_file], + "new_open_password": new_password, + "output": "encrypted", + } + if permissions_password is not None: + payload_input["current_permissions_password"] = permissions_password + payload_dump = PdfEncryptPayload.model_validate(payload_input).model_dump( + mode="json", by_alias=True, exclude_none=True, exclude_unset=True + ) + + seen: dict[str, int] = {"post": 0, "get": 0} + + def handler(request: httpx.Request) -> httpx.Response: + if request.method == "POST" and request.url.path == "/encrypted-pdf": + seen["post"] += 1 + payload = json.loads(request.content.decode("utf-8")) + assert payload == payload_dump + return httpx.Response( + 200, + json={ + "inputId": [input_file.id], + "outputId": [output_id], + }, + ) + if request.method == "GET" and request.url.path == f"/resource/{output_id}": + seen["get"] += 1 + assert request.url.params["format"] == "info" + return httpx.Response( + 200, + json=build_file_info_payload( + output_id, + "encrypted.pdf", + "application/pdf", + ), + ) + msg = f"Unexpected request {request.method} {request.url}" + raise AssertionError(msg) + + transport = httpx.MockTransport(handler) + with PdfRestClient(api_key=VALID_API_KEY, transport=transport) as client: + response = client.add_open_password( + input_file, + new_open_password=new_password, + current_permissions_password=permissions_password, + output="encrypted", + ) + + assert seen == {"post": 1, "get": 1} + assert isinstance(response, PdfRestFileBasedResponse) + assert response.output_file.name == "encrypted.pdf" + assert response.output_file.type == "application/pdf" + assert str(response.input_id) == str(input_file.id) + + +def test_change_open_password_request_customization( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.delenv("PDFREST_API_KEY", raising=False) + input_file = make_pdf_file(PdfRestFileID.generate(1)) + output_id = str(PdfRestFileID.generate()) + captured_timeout: dict[str, float | dict[str, float] | None] = {} + current_password = make_password("old-open") + new_password = make_password("new-open") + payload_dump = PdfEncryptPayload.model_validate( + { + "files": [input_file], + "current_open_password": current_password, + "new_open_password": new_password, + "current_permissions_password": make_password("perm"), + "output": "rotated-open", + } + ).model_dump(mode="json", by_alias=True, exclude_none=True, exclude_unset=True) + + def handler(request: httpx.Request) -> httpx.Response: + if request.method == "POST" and request.url.path == "/encrypted-pdf": + assert request.url.params["trace"] == "sync" + assert request.headers["X-Debug"] == "sync" + captured_timeout["value"] = request.extensions.get("timeout") + payload = json.loads(request.content.decode("utf-8")) + assert payload == {**payload_dump, "diagnostics": "on"} + return httpx.Response( + 200, + json={ + "inputId": [input_file.id], + "outputId": [output_id], + }, + ) + if request.method == "GET" and request.url.path == f"/resource/{output_id}": + assert request.url.params["format"] == "info" + assert request.url.params["trace"] == "sync" + assert request.headers["X-Debug"] == "sync" + return httpx.Response( + 200, + json=build_file_info_payload( + output_id, + "rotated-open.pdf", + "application/pdf", + ), + ) + msg = f"Unexpected request {request.method} {request.url}" + raise AssertionError(msg) + + transport = httpx.MockTransport(handler) + with PdfRestClient(api_key=VALID_API_KEY, transport=transport) as client: + response = client.change_open_password( + input_file, + current_open_password=current_password, + new_open_password=new_password, + current_permissions_password=payload_dump.get( + "current_permissions_password" + ), + output="rotated-open", + extra_query={"trace": "sync"}, + extra_headers={"X-Debug": "sync"}, + extra_body={"diagnostics": "on"}, + timeout=0.77, + ) + + assert isinstance(response, PdfRestFileBasedResponse) + assert response.output_file.name == "rotated-open.pdf" + timeout_value = captured_timeout["value"] + assert timeout_value is not None + if isinstance(timeout_value, dict): + assert all(pytest.approx(0.77) == value for value in timeout_value.values()) + else: + assert timeout_value == pytest.approx(0.77) + + +def test_remove_open_password_success(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.delenv("PDFREST_API_KEY", raising=False) + input_file = make_pdf_file(PdfRestFileID.generate(1)) + output_id = str(PdfRestFileID.generate()) + current_password = make_password("open-current") + payload_dump = PdfDecryptPayload.model_validate( + { + "files": [input_file], + "current_open_password": current_password, + "output": "decrypted", + } + ).model_dump(mode="json", by_alias=True, exclude_none=True, exclude_unset=True) + + seen: dict[str, int] = {"post": 0, "get": 0} + + def handler(request: httpx.Request) -> httpx.Response: + if request.method == "POST" and request.url.path == "/decrypted-pdf": + seen["post"] += 1 + payload = json.loads(request.content.decode("utf-8")) + assert payload == payload_dump + return httpx.Response( + 200, + json={ + "inputId": [input_file.id], + "outputId": [output_id], + }, + ) + if request.method == "GET" and request.url.path == f"/resource/{output_id}": + seen["get"] += 1 + assert request.url.params["format"] == "info" + return httpx.Response( + 200, + json=build_file_info_payload( + output_id, + "decrypted.pdf", + "application/pdf", + ), + ) + msg = f"Unexpected request {request.method} {request.url}" + raise AssertionError(msg) + + transport = httpx.MockTransport(handler) + with PdfRestClient(api_key=VALID_API_KEY, transport=transport) as client: + response = client.remove_open_password( + input_file, + current_open_password=current_password, + output="decrypted", + ) + + assert seen == {"post": 1, "get": 1} + assert isinstance(response, PdfRestFileBasedResponse) + assert response.output_file.name == "decrypted.pdf" + assert response.output_file.type == "application/pdf" + + +@pytest.mark.asyncio +async def test_async_add_open_password_request_customization( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.delenv("PDFREST_API_KEY", raising=False) + input_file = make_pdf_file(PdfRestFileID.generate(2)) + output_id = str(PdfRestFileID.generate()) + captured_timeout: dict[str, float | dict[str, float] | None] = {} + new_password = make_password("async-open") + payload_dump = PdfEncryptPayload.model_validate( + { + "files": [input_file], + "new_open_password": new_password, + } + ).model_dump(mode="json", by_alias=True, exclude_none=True, exclude_unset=True) + + def handler(request: httpx.Request) -> httpx.Response: + if request.method == "POST" and request.url.path == "/encrypted-pdf": + assert request.url.params["trace"] == "async" + assert request.headers["X-Debug"] == "async" + captured_timeout["value"] = request.extensions.get("timeout") + payload = json.loads(request.content.decode("utf-8")) + assert payload == {**payload_dump, "keep_permissions": "yes"} + return httpx.Response( + 200, + json={ + "inputId": [input_file.id], + "outputId": [output_id], + }, + ) + if request.method == "GET" and request.url.path == f"/resource/{output_id}": + assert request.url.params["format"] == "info" + assert request.url.params["trace"] == "async" + assert request.headers["X-Debug"] == "async" + return httpx.Response( + 200, + json=build_file_info_payload( + output_id, + "async-encrypted.pdf", + "application/pdf", + ), + ) + msg = f"Unexpected request {request.method} {request.url}" + raise AssertionError(msg) + + transport = httpx.MockTransport(handler) + async with AsyncPdfRestClient(api_key=ASYNC_API_KEY, transport=transport) as client: + response = await client.add_open_password( + input_file, + new_open_password=new_password, + extra_query={"trace": "async"}, + extra_headers={"X-Debug": "async"}, + extra_body={"keep_permissions": "yes"}, + timeout=0.66, + ) + + assert isinstance(response, PdfRestFileBasedResponse) + assert response.output_file.name == "async-encrypted.pdf" + timeout_value = captured_timeout["value"] + assert timeout_value is not None + if isinstance(timeout_value, dict): + assert all(pytest.approx(0.66) == value for value in timeout_value.values()) + else: + assert timeout_value == pytest.approx(0.66) + + +@pytest.mark.asyncio +async def test_async_remove_open_password_success( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.delenv("PDFREST_API_KEY", raising=False) + input_file = make_pdf_file(PdfRestFileID.generate(2)) + output_id = str(PdfRestFileID.generate()) + current_password = make_password("async-open-current") + payload_dump = PdfDecryptPayload.model_validate( + { + "files": [input_file], + "current_open_password": current_password, + } + ).model_dump(mode="json", by_alias=True, exclude_none=True, exclude_unset=True) + + def handler(request: httpx.Request) -> httpx.Response: + if request.method == "POST" and request.url.path == "/decrypted-pdf": + payload = json.loads(request.content.decode("utf-8")) + assert payload == payload_dump + return httpx.Response( + 200, + json={ + "inputId": [input_file.id], + "outputId": [output_id], + }, + ) + if request.method == "GET" and request.url.path == f"/resource/{output_id}": + assert request.url.params["format"] == "info" + return httpx.Response( + 200, + json=build_file_info_payload( + output_id, + "async-decrypted.pdf", + "application/pdf", + ), + ) + msg = f"Unexpected request {request.method} {request.url}" + raise AssertionError(msg) + + transport = httpx.MockTransport(handler) + async with AsyncPdfRestClient(api_key=ASYNC_API_KEY, transport=transport) as client: + response = await client.remove_open_password( + input_file, + current_open_password=current_password, + ) + + assert isinstance(response, PdfRestFileBasedResponse) + assert response.output_file.name == "async-decrypted.pdf" + + +def test_encrypt_decrypt_validation(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.delenv("PDFREST_API_KEY", raising=False) + pdf_file = make_pdf_file(PdfRestFileID.generate(1)) + non_pdf_file = make_non_pdf_file(str(PdfRestFileID.generate())) + other_pdf = make_pdf_file(PdfRestFileID.generate(2)) + secure_password = make_password("secure") + short_password = "a" * 3 + empty_password = "".join([]) + transport = httpx.MockTransport(lambda request: (_ for _ in ()).throw(RuntimeError)) + + with ( + PdfRestClient(api_key=VALID_API_KEY, transport=transport) as client, + pytest.raises(ValidationError, match="Must be a PDF file"), + ): + client.add_open_password( + non_pdf_file, + new_open_password=secure_password, + ) + + with ( + PdfRestClient(api_key=VALID_API_KEY, transport=transport) as client, + pytest.raises(ValidationError, match="at most 1"), + ): + client.add_open_password( + [pdf_file, other_pdf], + new_open_password=secure_password, + ) + + with ( + PdfRestClient(api_key=VALID_API_KEY, transport=transport) as client, + pytest.raises( + ValidationError, match="String should have at least 6 characters" + ), + ): + client.add_open_password( + pdf_file, + new_open_password=short_password, + ) + + with ( + PdfRestClient(api_key=VALID_API_KEY, transport=transport) as client, + pytest.raises(ValidationError, match="String should have at least 1 character"), + ): + client.change_open_password( + pdf_file, + current_open_password=empty_password, + new_open_password=secure_password, + ) + + with ( + PdfRestClient(api_key=VALID_API_KEY, transport=transport) as client, + pytest.raises(ValidationError, match="at most 1"), + ): + client.remove_open_password( + [pdf_file, other_pdf], + current_open_password=secure_password, + ) From c727f3e9c2b5fdaaee6e963158af781037c63cd8 Mon Sep 17 00:00:00 2001 From: Christopher Green Date: Fri, 6 Feb 2026 12:40:43 -0600 Subject: [PATCH 05/11] Add missing encryption and restriction test coverage Assisted-by: Codex --- tests/live/test_live_encrypt_pdf.py | 6 - tests/test_encrypt_pdf.py | 355 ++++++++++++++++++++++++---- tests/test_permissions_password.py | 299 +++++++++++++++++------ 3 files changed, 547 insertions(+), 113 deletions(-) diff --git a/tests/live/test_live_encrypt_pdf.py b/tests/live/test_live_encrypt_pdf.py index 535892f8..8b131921 100644 --- a/tests/live/test_live_encrypt_pdf.py +++ b/tests/live/test_live_encrypt_pdf.py @@ -1,20 +1,14 @@ from __future__ import annotations -from typing import cast, get_args from uuid import uuid4 import pytest from pdfrest import AsyncPdfRestClient, PdfRestApiError, PdfRestClient from pdfrest.models import PdfRestFile -from pdfrest.types import PdfRestriction from ..resources import get_test_resource_path -PDF_RESTRICTIONS: tuple[PdfRestriction, ...] = cast( - tuple[PdfRestriction, ...], get_args(PdfRestriction) -) - def make_password(label: str) -> str: return f"{label}-{uuid4().hex}" diff --git a/tests/test_encrypt_pdf.py b/tests/test_encrypt_pdf.py index 8bfbcf1c..a752408b 100644 --- a/tests/test_encrypt_pdf.py +++ b/tests/test_encrypt_pdf.py @@ -9,7 +9,6 @@ from pdfrest import AsyncPdfRestClient, PdfRestClient from pdfrest.models import PdfRestFile, PdfRestFileBasedResponse, PdfRestFileID -from pdfrest.models._internal import PdfDecryptPayload, PdfEncryptPayload from .graphics_test_helpers import ( ASYNC_API_KEY, @@ -33,6 +32,45 @@ def make_non_pdf_file(file_id: str) -> PdfRestFile: ) +def build_encrypt_payload( + input_file: PdfRestFile, + *, + new_open_password: str, + current_open_password: str | None = None, + current_permissions_password: str | None = None, + output: str | None = None, +) -> dict[str, str]: + payload: dict[str, str] = { + "id": str(input_file.id), + "new_open_password": new_open_password, + } + if current_open_password is not None: + payload["current_open_password"] = current_open_password + if current_permissions_password is not None: + payload["current_permissions_password"] = current_permissions_password + if output is not None: + payload["output"] = output + return payload + + +def build_decrypt_payload( + input_file: PdfRestFile, + *, + current_open_password: str, + current_permissions_password: str | None = None, + output: str | None = None, +) -> dict[str, str]: + payload: dict[str, str] = { + "id": str(input_file.id), + "current_open_password": current_open_password, + } + if current_permissions_password is not None: + payload["current_permissions_password"] = current_permissions_password + if output is not None: + payload["output"] = output + return payload + + @pytest.mark.parametrize( "permissions_password", [ @@ -47,15 +85,11 @@ def test_add_open_password_success( input_file = make_pdf_file(PdfRestFileID.generate(1)) output_id = str(PdfRestFileID.generate()) new_password = make_password("open") - payload_input: dict[str, object] = { - "files": [input_file], - "new_open_password": new_password, - "output": "encrypted", - } - if permissions_password is not None: - payload_input["current_permissions_password"] = permissions_password - payload_dump = PdfEncryptPayload.model_validate(payload_input).model_dump( - mode="json", by_alias=True, exclude_none=True, exclude_unset=True + payload_dump = build_encrypt_payload( + input_file, + new_open_password=new_password, + current_permissions_password=permissions_password, + output="encrypted", ) seen: dict[str, int] = {"post": 0, "get": 0} @@ -111,15 +145,14 @@ def test_change_open_password_request_customization( captured_timeout: dict[str, float | dict[str, float] | None] = {} current_password = make_password("old-open") new_password = make_password("new-open") - payload_dump = PdfEncryptPayload.model_validate( - { - "files": [input_file], - "current_open_password": current_password, - "new_open_password": new_password, - "current_permissions_password": make_password("perm"), - "output": "rotated-open", - } - ).model_dump(mode="json", by_alias=True, exclude_none=True, exclude_unset=True) + permissions_password = make_password("perm") + payload_dump = build_encrypt_payload( + input_file, + current_open_password=current_password, + new_open_password=new_password, + current_permissions_password=permissions_password, + output="rotated-open", + ) def handler(request: httpx.Request) -> httpx.Response: if request.method == "POST" and request.url.path == "/encrypted-pdf": @@ -156,9 +189,7 @@ def handler(request: httpx.Request) -> httpx.Response: input_file, current_open_password=current_password, new_open_password=new_password, - current_permissions_password=payload_dump.get( - "current_permissions_password" - ), + current_permissions_password=permissions_password, output="rotated-open", extra_query={"trace": "sync"}, extra_headers={"X-Debug": "sync"}, @@ -181,13 +212,11 @@ def test_remove_open_password_success(monkeypatch: pytest.MonkeyPatch) -> None: input_file = make_pdf_file(PdfRestFileID.generate(1)) output_id = str(PdfRestFileID.generate()) current_password = make_password("open-current") - payload_dump = PdfDecryptPayload.model_validate( - { - "files": [input_file], - "current_open_password": current_password, - "output": "decrypted", - } - ).model_dump(mode="json", by_alias=True, exclude_none=True, exclude_unset=True) + payload_dump = build_decrypt_payload( + input_file, + current_open_password=current_password, + output="decrypted", + ) seen: dict[str, int] = {"post": 0, "get": 0} @@ -231,6 +260,74 @@ def handler(request: httpx.Request) -> httpx.Response: assert response.output_file.type == "application/pdf" +def test_remove_open_password_request_customization( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.delenv("PDFREST_API_KEY", raising=False) + input_file = make_pdf_file(PdfRestFileID.generate(1)) + output_id = str(PdfRestFileID.generate()) + captured_timeout: dict[str, float | dict[str, float] | None] = {} + current_password = make_password("open-custom") + permissions_password = make_password("perm-custom") + payload_dump = build_decrypt_payload( + input_file, + current_open_password=current_password, + current_permissions_password=permissions_password, + output="decrypted-custom", + ) + + def handler(request: httpx.Request) -> httpx.Response: + if request.method == "POST" and request.url.path == "/decrypted-pdf": + assert request.url.params["trace"] == "sync" + assert request.headers["X-Debug"] == "sync" + captured_timeout["value"] = request.extensions.get("timeout") + payload = json.loads(request.content.decode("utf-8")) + assert payload == {**payload_dump, "audit": "yes"} + return httpx.Response( + 200, + json={ + "inputId": [input_file.id], + "outputId": [output_id], + }, + ) + if request.method == "GET" and request.url.path == f"/resource/{output_id}": + assert request.url.params["format"] == "info" + assert request.url.params["trace"] == "sync" + assert request.headers["X-Debug"] == "sync" + return httpx.Response( + 200, + json=build_file_info_payload( + output_id, + "decrypted-custom.pdf", + "application/pdf", + ), + ) + msg = f"Unexpected request {request.method} {request.url}" + raise AssertionError(msg) + + transport = httpx.MockTransport(handler) + with PdfRestClient(api_key=VALID_API_KEY, transport=transport) as client: + response = client.remove_open_password( + input_file, + current_open_password=current_password, + current_permissions_password=permissions_password, + output="decrypted-custom", + extra_query={"trace": "sync"}, + extra_headers={"X-Debug": "sync"}, + extra_body={"audit": "yes"}, + timeout=0.59, + ) + + assert isinstance(response, PdfRestFileBasedResponse) + assert response.output_file.name == "decrypted-custom.pdf" + timeout_value = captured_timeout["value"] + assert timeout_value is not None + if isinstance(timeout_value, dict): + assert all(pytest.approx(0.59) == value for value in timeout_value.values()) + else: + assert timeout_value == pytest.approx(0.59) + + @pytest.mark.asyncio async def test_async_add_open_password_request_customization( monkeypatch: pytest.MonkeyPatch, @@ -240,12 +337,13 @@ async def test_async_add_open_password_request_customization( output_id = str(PdfRestFileID.generate()) captured_timeout: dict[str, float | dict[str, float] | None] = {} new_password = make_password("async-open") - payload_dump = PdfEncryptPayload.model_validate( - { - "files": [input_file], - "new_open_password": new_password, - } - ).model_dump(mode="json", by_alias=True, exclude_none=True, exclude_unset=True) + permissions_password = make_password("async-perm") + payload_dump = build_encrypt_payload( + input_file, + new_open_password=new_password, + current_permissions_password=permissions_password, + output="async-encrypted", + ) def handler(request: httpx.Request) -> httpx.Response: if request.method == "POST" and request.url.path == "/encrypted-pdf": @@ -281,6 +379,8 @@ def handler(request: httpx.Request) -> httpx.Response: response = await client.add_open_password( input_file, new_open_password=new_password, + current_permissions_password=permissions_password, + output="async-encrypted", extra_query={"trace": "async"}, extra_headers={"X-Debug": "async"}, extra_body={"keep_permissions": "yes"}, @@ -297,6 +397,62 @@ def handler(request: httpx.Request) -> httpx.Response: assert timeout_value == pytest.approx(0.66) +@pytest.mark.asyncio +async def test_async_change_open_password_success( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.delenv("PDFREST_API_KEY", raising=False) + input_file = make_pdf_file(PdfRestFileID.generate(2)) + output_id = str(PdfRestFileID.generate()) + current_password = make_password("async-open-current") + new_password = make_password("async-open-next") + permissions_password = make_password("async-open-perm") + payload_dump = build_encrypt_payload( + input_file, + current_open_password=current_password, + new_open_password=new_password, + current_permissions_password=permissions_password, + output="async-rotated-open", + ) + + def handler(request: httpx.Request) -> httpx.Response: + if request.method == "POST" and request.url.path == "/encrypted-pdf": + payload = json.loads(request.content.decode("utf-8")) + assert payload == payload_dump + return httpx.Response( + 200, + json={ + "inputId": [input_file.id], + "outputId": [output_id], + }, + ) + if request.method == "GET" and request.url.path == f"/resource/{output_id}": + assert request.url.params["format"] == "info" + return httpx.Response( + 200, + json=build_file_info_payload( + output_id, + "async-rotated-open.pdf", + "application/pdf", + ), + ) + msg = f"Unexpected request {request.method} {request.url}" + raise AssertionError(msg) + + transport = httpx.MockTransport(handler) + async with AsyncPdfRestClient(api_key=ASYNC_API_KEY, transport=transport) as client: + response = await client.change_open_password( + input_file, + current_open_password=current_password, + new_open_password=new_password, + current_permissions_password=permissions_password, + output="async-rotated-open", + ) + + assert isinstance(response, PdfRestFileBasedResponse) + assert response.output_file.name == "async-rotated-open.pdf" + + @pytest.mark.asyncio async def test_async_remove_open_password_success( monkeypatch: pytest.MonkeyPatch, @@ -305,12 +461,10 @@ async def test_async_remove_open_password_success( input_file = make_pdf_file(PdfRestFileID.generate(2)) output_id = str(PdfRestFileID.generate()) current_password = make_password("async-open-current") - payload_dump = PdfDecryptPayload.model_validate( - { - "files": [input_file], - "current_open_password": current_password, - } - ).model_dump(mode="json", by_alias=True, exclude_none=True, exclude_unset=True) + payload_dump = build_decrypt_payload( + input_file, + current_open_password=current_password, + ) def handler(request: httpx.Request) -> httpx.Response: if request.method == "POST" and request.url.path == "/decrypted-pdf": @@ -347,6 +501,75 @@ def handler(request: httpx.Request) -> httpx.Response: assert response.output_file.name == "async-decrypted.pdf" +@pytest.mark.asyncio +async def test_async_remove_open_password_request_customization( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.delenv("PDFREST_API_KEY", raising=False) + input_file = make_pdf_file(PdfRestFileID.generate(2)) + output_id = str(PdfRestFileID.generate()) + captured_timeout: dict[str, float | dict[str, float] | None] = {} + current_password = make_password("async-open-custom") + permissions_password = make_password("async-perm-custom") + payload_dump = build_decrypt_payload( + input_file, + current_open_password=current_password, + current_permissions_password=permissions_password, + output="async-decrypted-custom", + ) + + def handler(request: httpx.Request) -> httpx.Response: + if request.method == "POST" and request.url.path == "/decrypted-pdf": + assert request.url.params["trace"] == "async" + assert request.headers["X-Debug"] == "async" + captured_timeout["value"] = request.extensions.get("timeout") + payload = json.loads(request.content.decode("utf-8")) + assert payload == {**payload_dump, "audit": "yes"} + return httpx.Response( + 200, + json={ + "inputId": [input_file.id], + "outputId": [output_id], + }, + ) + if request.method == "GET" and request.url.path == f"/resource/{output_id}": + assert request.url.params["format"] == "info" + assert request.url.params["trace"] == "async" + assert request.headers["X-Debug"] == "async" + return httpx.Response( + 200, + json=build_file_info_payload( + output_id, + "async-decrypted-custom.pdf", + "application/pdf", + ), + ) + msg = f"Unexpected request {request.method} {request.url}" + raise AssertionError(msg) + + transport = httpx.MockTransport(handler) + async with AsyncPdfRestClient(api_key=ASYNC_API_KEY, transport=transport) as client: + response = await client.remove_open_password( + input_file, + current_open_password=current_password, + current_permissions_password=permissions_password, + output="async-decrypted-custom", + extra_query={"trace": "async"}, + extra_headers={"X-Debug": "async"}, + extra_body={"audit": "yes"}, + timeout=0.63, + ) + + assert isinstance(response, PdfRestFileBasedResponse) + assert response.output_file.name == "async-decrypted-custom.pdf" + timeout_value = captured_timeout["value"] + assert timeout_value is not None + if isinstance(timeout_value, dict): + assert all(pytest.approx(0.63) == value for value in timeout_value.values()) + else: + assert timeout_value == pytest.approx(0.63) + + def test_encrypt_decrypt_validation(monkeypatch: pytest.MonkeyPatch) -> None: monkeypatch.delenv("PDFREST_API_KEY", raising=False) pdf_file = make_pdf_file(PdfRestFileID.generate(1)) @@ -404,3 +627,53 @@ def test_encrypt_decrypt_validation(monkeypatch: pytest.MonkeyPatch) -> None: [pdf_file, other_pdf], current_open_password=secure_password, ) + + +@pytest.mark.asyncio +async def test_async_encrypt_decrypt_validation( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.delenv("PDFREST_API_KEY", raising=False) + pdf_file = make_pdf_file(PdfRestFileID.generate(1)) + non_pdf_file = make_non_pdf_file(str(PdfRestFileID.generate())) + other_pdf = make_pdf_file(PdfRestFileID.generate(2)) + secure_password = make_password("secure-async") + short_password = "a" * 3 + empty_password = "".join([]) + transport = httpx.MockTransport(lambda request: (_ for _ in ()).throw(RuntimeError)) + + async with AsyncPdfRestClient(api_key=ASYNC_API_KEY, transport=transport) as client: + with pytest.raises(ValidationError, match="Must be a PDF file"): + await client.add_open_password( + non_pdf_file, + new_open_password=secure_password, + ) + + with pytest.raises(ValidationError, match="at most 1"): + await client.add_open_password( + [pdf_file, other_pdf], + new_open_password=secure_password, + ) + + with pytest.raises( + ValidationError, match="String should have at least 6 characters" + ): + await client.add_open_password( + pdf_file, + new_open_password=short_password, + ) + + with pytest.raises( + ValidationError, match="String should have at least 1 character" + ): + await client.change_open_password( + pdf_file, + current_open_password=empty_password, + new_open_password=secure_password, + ) + + with pytest.raises(ValidationError, match="at most 1"): + await client.remove_open_password( + [pdf_file, other_pdf], + current_open_password=secure_password, + ) diff --git a/tests/test_permissions_password.py b/tests/test_permissions_password.py index 866dd416..a45af37c 100644 --- a/tests/test_permissions_password.py +++ b/tests/test_permissions_password.py @@ -9,7 +9,6 @@ from pdfrest import AsyncPdfRestClient, PdfRestClient from pdfrest.models import PdfRestFile, PdfRestFileBasedResponse, PdfRestFileID -from pdfrest.models._internal import PdfRestrictPayload, PdfUnrestrictPayload from pdfrest.types import PdfRestriction from .graphics_test_helpers import ( @@ -34,6 +33,48 @@ def make_non_pdf_file(file_id: str) -> PdfRestFile: ) +def build_restrict_payload( + input_file: PdfRestFile, + *, + new_permissions_password: str, + current_permissions_password: str | None = None, + current_open_password: str | None = None, + restrictions: list[PdfRestriction] | None = None, + output: str | None = None, +) -> dict[str, str | list[PdfRestriction]]: + payload: dict[str, str | list[PdfRestriction]] = { + "id": str(input_file.id), + "new_permissions_password": new_permissions_password, + } + if current_permissions_password is not None: + payload["current_permissions_password"] = current_permissions_password + if current_open_password is not None: + payload["current_open_password"] = current_open_password + if restrictions is not None: + payload["restrictions"] = restrictions + if output is not None: + payload["output"] = output + return payload + + +def build_unrestrict_payload( + input_file: PdfRestFile, + *, + current_permissions_password: str, + current_open_password: str | None = None, + output: str | None = None, +) -> dict[str, str]: + payload: dict[str, str] = { + "id": str(input_file.id), + "current_permissions_password": current_permissions_password, + } + if current_open_password is not None: + payload["current_open_password"] = current_open_password + if output is not None: + payload["output"] = output + return payload + + @pytest.mark.parametrize( "restrictions", [ @@ -50,16 +91,12 @@ def test_add_permissions_password_success( output_id = str(PdfRestFileID.generate()) new_password = make_password("secure") open_password = make_password("open") - payload_input: dict[str, object] = { - "files": [input_file], - "new_permissions_password": new_password, - "current_open_password": open_password, - "output": "restricted", - } - if restrictions is not None: - payload_input["restrictions"] = restrictions - payload_dump = PdfRestrictPayload.model_validate(payload_input).model_dump( - mode="json", by_alias=True, exclude_none=True, exclude_unset=True + payload_dump = build_restrict_payload( + input_file, + new_permissions_password=new_password, + current_open_password=open_password, + restrictions=restrictions, + output="restricted", ) seen: dict[str, int] = {"post": 0, "get": 0} @@ -115,14 +152,12 @@ def test_add_permissions_password_request_customization( output_id = str(PdfRestFileID.generate()) captured_timeout: dict[str, float | dict[str, float] | None] = {} new_password = make_password("custom") - payload_dump = PdfRestrictPayload.model_validate( - { - "files": [input_file], - "new_permissions_password": new_password, - "restrictions": ["print_high"], - "output": "custom-restricted", - } - ).model_dump(mode="json", by_alias=True, exclude_none=True, exclude_unset=True) + payload_dump = build_restrict_payload( + input_file, + new_permissions_password=new_password, + restrictions=["print_high"], + output="custom-restricted", + ) def handler(request: httpx.Request) -> httpx.Response: if request.method == "POST" and request.url.path == "/restricted-pdf": @@ -185,15 +220,15 @@ def test_change_permissions_password_request_customization( captured_timeout: dict[str, float | dict[str, float] | None] = {} current_password = make_password("old") new_password = make_password("new") - payload_dump = PdfRestrictPayload.model_validate( - { - "files": [input_file], - "current_permissions_password": current_password, - "new_permissions_password": new_password, - "restrictions": ["edit_content"], - "output": "rotated", - } - ).model_dump(mode="json", by_alias=True, exclude_none=True, exclude_unset=True) + current_open_password = make_password("open") + payload_dump = build_restrict_payload( + input_file, + current_permissions_password=current_password, + new_permissions_password=new_password, + current_open_password=current_open_password, + restrictions=["edit_content"], + output="rotated", + ) def handler(request: httpx.Request) -> httpx.Response: if request.method == "POST" and request.url.path == "/restricted-pdf": @@ -230,6 +265,7 @@ def handler(request: httpx.Request) -> httpx.Response: input_file, current_permissions_password=current_password, new_permissions_password=new_password, + current_open_password=current_open_password, restrictions=["edit_content"], output="rotated", extra_query={"trace": "sync"}, @@ -255,13 +291,11 @@ def test_remove_permissions_password_success( input_file = make_pdf_file(PdfRestFileID.generate(1)) output_id = str(PdfRestFileID.generate()) current_password = make_password("old") - payload_dump = PdfUnrestrictPayload.model_validate( - { - "files": [input_file], - "current_permissions_password": current_password, - "output": "clean", - } - ).model_dump(mode="json", by_alias=True, exclude_none=True, exclude_unset=True) + payload_dump = build_unrestrict_payload( + input_file, + current_permissions_password=current_password, + output="clean", + ) seen: dict[str, int] = {"post": 0, "get": 0} @@ -305,6 +339,74 @@ def handler(request: httpx.Request) -> httpx.Response: assert response.output_file.type == "application/pdf" +def test_remove_permissions_password_request_customization( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.delenv("PDFREST_API_KEY", raising=False) + input_file = make_pdf_file(PdfRestFileID.generate(1)) + output_id = str(PdfRestFileID.generate()) + captured_timeout: dict[str, float | dict[str, float] | None] = {} + current_password = make_password("remove-custom") + open_password = make_password("open-custom") + payload_dump = build_unrestrict_payload( + input_file, + current_permissions_password=current_password, + current_open_password=open_password, + output="clean-custom", + ) + + def handler(request: httpx.Request) -> httpx.Response: + if request.method == "POST" and request.url.path == "/unrestricted-pdf": + assert request.url.params["trace"] == "sync" + assert request.headers["X-Debug"] == "sync" + captured_timeout["value"] = request.extensions.get("timeout") + payload = json.loads(request.content.decode("utf-8")) + assert payload == {**payload_dump, "audit": "enabled"} + return httpx.Response( + 200, + json={ + "inputId": [input_file.id], + "outputId": [output_id], + }, + ) + if request.method == "GET" and request.url.path == f"/resource/{output_id}": + assert request.url.params["format"] == "info" + assert request.url.params["trace"] == "sync" + assert request.headers["X-Debug"] == "sync" + return httpx.Response( + 200, + json=build_file_info_payload( + output_id, + "clean-custom.pdf", + "application/pdf", + ), + ) + msg = f"Unexpected request {request.method} {request.url}" + raise AssertionError(msg) + + transport = httpx.MockTransport(handler) + with PdfRestClient(api_key=VALID_API_KEY, transport=transport) as client: + response = client.remove_permissions_password( + input_file, + current_permissions_password=current_password, + current_open_password=open_password, + output="clean-custom", + extra_query={"trace": "sync"}, + extra_headers={"X-Debug": "sync"}, + extra_body={"audit": "enabled"}, + timeout=0.69, + ) + + assert isinstance(response, PdfRestFileBasedResponse) + assert response.output_file.name == "clean-custom.pdf" + timeout_value = captured_timeout["value"] + assert timeout_value is not None + if isinstance(timeout_value, dict): + assert all(pytest.approx(0.69) == value for value in timeout_value.values()) + else: + assert timeout_value == pytest.approx(0.69) + + @pytest.mark.asyncio @pytest.mark.parametrize( "restrictions", @@ -321,15 +423,11 @@ async def test_async_add_permissions_password_success( input_file = make_pdf_file(PdfRestFileID.generate(1)) output_id = str(PdfRestFileID.generate()) new_password = make_password("secure") - payload_input: dict[str, object] = { - "files": [input_file], - "new_permissions_password": new_password, - "output": "restricted", - } - if restrictions is not None: - payload_input["restrictions"] = restrictions - payload_dump = PdfRestrictPayload.model_validate(payload_input).model_dump( - mode="json", by_alias=True, exclude_none=True, exclude_unset=True + payload_dump = build_restrict_payload( + input_file, + new_permissions_password=new_password, + restrictions=restrictions, + output="restricted", ) seen: dict[str, int] = {"post": 0, "get": 0} @@ -385,13 +483,13 @@ async def test_async_add_permissions_password_request_customization( output_id = str(PdfRestFileID.generate()) captured_timeout: dict[str, float | dict[str, float] | None] = {} new_password = make_password("async") - payload_dump = PdfRestrictPayload.model_validate( - { - "files": [input_file], - "new_permissions_password": new_password, - "restrictions": ["print_high"], - } - ).model_dump(mode="json", by_alias=True, exclude_none=True, exclude_unset=True) + current_open_password = make_password("async-open") + payload_dump = build_restrict_payload( + input_file, + new_permissions_password=new_password, + current_open_password=current_open_password, + restrictions=["print_high"], + ) def handler(request: httpx.Request) -> httpx.Response: if request.method == "POST" and request.url.path == "/restricted-pdf": @@ -427,6 +525,7 @@ def handler(request: httpx.Request) -> httpx.Response: response = await client.add_permissions_password( input_file, new_permissions_password=new_password, + current_open_password=current_open_password, restrictions=["print_high"], extra_query={"trace": "async"}, extra_headers={"X-Debug": "async"}, @@ -454,15 +553,15 @@ async def test_async_change_permissions_password_request_customization( captured_timeout: dict[str, float | dict[str, float] | None] = {} current_password = make_password("old-async") new_password = make_password("new-async") - payload_dump = PdfRestrictPayload.model_validate( - { - "files": [input_file], - "current_permissions_password": current_password, - "new_permissions_password": new_password, - "restrictions": ["edit_content"], - "output": "async-rotated", - } - ).model_dump(mode="json", by_alias=True, exclude_none=True, exclude_unset=True) + current_open_password = make_password("open-async") + payload_dump = build_restrict_payload( + input_file, + current_permissions_password=current_password, + new_permissions_password=new_password, + current_open_password=current_open_password, + restrictions=["edit_content"], + output="async-rotated", + ) def handler(request: httpx.Request) -> httpx.Response: if request.method == "POST" and request.url.path == "/restricted-pdf": @@ -499,6 +598,7 @@ def handler(request: httpx.Request) -> httpx.Response: input_file, current_permissions_password=current_password, new_permissions_password=new_password, + current_open_password=current_open_password, restrictions=["edit_content"], output="async-rotated", extra_query={"trace": "async"}, @@ -525,12 +625,10 @@ async def test_async_remove_permissions_password_success( input_file = make_pdf_file(PdfRestFileID.generate(2)) output_id = str(PdfRestFileID.generate()) current_password = make_password("secret") - payload_dump = PdfUnrestrictPayload.model_validate( - { - "files": [input_file], - "current_permissions_password": current_password, - } - ).model_dump(mode="json", by_alias=True, exclude_none=True, exclude_unset=True) + payload_dump = build_unrestrict_payload( + input_file, + current_permissions_password=current_password, + ) def handler(request: httpx.Request) -> httpx.Response: if request.method == "POST" and request.url.path == "/unrestricted-pdf": @@ -567,6 +665,75 @@ def handler(request: httpx.Request) -> httpx.Response: assert response.output_file.name == "async-clean.pdf" +@pytest.mark.asyncio +async def test_async_remove_permissions_password_request_customization( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.delenv("PDFREST_API_KEY", raising=False) + input_file = make_pdf_file(PdfRestFileID.generate(2)) + output_id = str(PdfRestFileID.generate()) + captured_timeout: dict[str, float | dict[str, float] | None] = {} + current_password = make_password("async-remove-custom") + open_password = make_password("async-open-custom") + payload_dump = build_unrestrict_payload( + input_file, + current_permissions_password=current_password, + current_open_password=open_password, + output="async-clean-custom", + ) + + def handler(request: httpx.Request) -> httpx.Response: + if request.method == "POST" and request.url.path == "/unrestricted-pdf": + assert request.url.params["trace"] == "async" + assert request.headers["X-Debug"] == "async" + captured_timeout["value"] = request.extensions.get("timeout") + payload = json.loads(request.content.decode("utf-8")) + assert payload == {**payload_dump, "audit": "enabled"} + return httpx.Response( + 200, + json={ + "inputId": [input_file.id], + "outputId": [output_id], + }, + ) + if request.method == "GET" and request.url.path == f"/resource/{output_id}": + assert request.url.params["format"] == "info" + assert request.url.params["trace"] == "async" + assert request.headers["X-Debug"] == "async" + return httpx.Response( + 200, + json=build_file_info_payload( + output_id, + "async-clean-custom.pdf", + "application/pdf", + ), + ) + msg = f"Unexpected request {request.method} {request.url}" + raise AssertionError(msg) + + transport = httpx.MockTransport(handler) + async with AsyncPdfRestClient(api_key=ASYNC_API_KEY, transport=transport) as client: + response = await client.remove_permissions_password( + input_file, + current_permissions_password=current_password, + current_open_password=open_password, + output="async-clean-custom", + extra_query={"trace": "async"}, + extra_headers={"X-Debug": "async"}, + extra_body={"audit": "enabled"}, + timeout=0.73, + ) + + assert isinstance(response, PdfRestFileBasedResponse) + assert response.output_file.name == "async-clean-custom.pdf" + timeout_value = captured_timeout["value"] + assert timeout_value is not None + if isinstance(timeout_value, dict): + assert all(pytest.approx(0.73) == value for value in timeout_value.values()) + else: + assert timeout_value == pytest.approx(0.73) + + @pytest.mark.asyncio async def test_async_permissions_password_validation( monkeypatch: pytest.MonkeyPatch, From 3ecf44c48afcedb84774c11d81eb86304950b2f2 Mon Sep 17 00:00:00 2001 From: Christopher Green Date: Fri, 6 Feb 2026 13:07:42 -0600 Subject: [PATCH 06/11] Encryption live tests: Fix incorrect regex check of error message --- tests/live/test_live_encrypt_pdf.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/live/test_live_encrypt_pdf.py b/tests/live/test_live_encrypt_pdf.py index 8b131921..9bfdd683 100644 --- a/tests/live/test_live_encrypt_pdf.py +++ b/tests/live/test_live_encrypt_pdf.py @@ -208,7 +208,7 @@ def test_live_remove_open_password_invalid_password( new_open_password=correct_password, output="live-open-invalid", ).output_file - with pytest.raises(PdfRestApiError, match="open password"): + with pytest.raises(PdfRestApiError, match="password-protected"): client.remove_open_password( restricted, current_open_password=wrong_password, @@ -234,7 +234,7 @@ async def test_live_async_remove_open_password_invalid_password( output="async-live-open-invalid", ) ).output_file - with pytest.raises(PdfRestApiError, match="open password"): + with pytest.raises(PdfRestApiError, match="password-protected"): await client.remove_open_password( restricted, current_open_password=wrong_password, From eff7194e39d35bec0b6f3b3bfdee23af278d71c7 Mon Sep 17 00:00:00 2001 From: "Kevin A. Mitchell" Date: Mon, 9 Feb 2026 15:13:49 -0600 Subject: [PATCH 07/11] tests/live: Add tests for invalid restriction handling in password permissions - Added `test_live_add_permissions_password_invalid_restriction` to verify that invalid restrictions raise `PdfRestApiError` in sync client. - Added `test_live_async_add_permissions_password_invalid_restriction` to ensure async client correctly handles invalid restrictions. Assisted-by: Codex --- tests/live/test_live_permissions_password.py | 43 ++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/tests/live/test_live_permissions_password.py b/tests/live/test_live_permissions_password.py index c99c0ad4..829ada17 100644 --- a/tests/live/test_live_permissions_password.py +++ b/tests/live/test_live_permissions_password.py @@ -267,3 +267,46 @@ async def test_live_async_remove_permissions_password_invalid_password( restricted_file, current_permissions_password=wrong_password, ) + + +def test_live_add_permissions_password_invalid_restriction( + pdfrest_api_key: str, + pdfrest_live_base_url: str, + uploaded_pdf_for_permissions: PdfRestFile, +) -> None: + invalid_restriction = "totally-invalid-restriction" + with ( + PdfRestClient( + api_key=pdfrest_api_key, + base_url=pdfrest_live_base_url, + ) as client, + pytest.raises(PdfRestApiError, match=r"(?i)restriction|invalid"), + ): + client.add_permissions_password( + uploaded_pdf_for_permissions, + new_permissions_password=make_password("live-invalid-restriction"), + restrictions=["print_low"], + extra_body={"restrictions": [invalid_restriction]}, + ) + + +@pytest.mark.asyncio +async def test_live_async_add_permissions_password_invalid_restriction( + pdfrest_api_key: str, + pdfrest_live_base_url: str, + uploaded_pdf_for_permissions: PdfRestFile, +) -> None: + invalid_restriction = "totally-invalid-restriction" + async with AsyncPdfRestClient( + api_key=pdfrest_api_key, + base_url=pdfrest_live_base_url, + ) as client: + with pytest.raises(PdfRestApiError, match=r"(?i)restriction|invalid"): + await client.add_permissions_password( + uploaded_pdf_for_permissions, + new_permissions_password=make_password( + "async-live-invalid-restriction" + ), + restrictions=["print_low"], + extra_body={"restrictions": [invalid_restriction]}, + ) From ebaad21d18c9ff0a7ca3fc753ed044fd64519a82 Mon Sep 17 00:00:00 2001 From: Christopher Green Date: Mon, 9 Feb 2026 16:52:16 -0600 Subject: [PATCH 08/11] models: Resolve payload-model test serialization drift and import cycle - import `PdfRestFile` from `models.public` inside `_internal.py` to avoid circular imports - update encrypt/restrict unit tests to build expected request payloads through `PdfEncryptPayload`/`PdfDecryptPayload`/`PdfRestrictPayload`/`PdfUnrestrictPayload` - replace hand-built payload dicts with `model_validate(...).model_dump(mode="json", by_alias=True, exclude_none=True, exclude_unset=True)` Assisted-by: Codex --- src/pdfrest/models/_internal.py | 3 +- tests/test_encrypt_pdf.py | 48 ++++++++++++++-------------- tests/test_permissions_password.py | 51 +++++++++++++++--------------- 3 files changed, 52 insertions(+), 50 deletions(-) diff --git a/src/pdfrest/models/_internal.py b/src/pdfrest/models/_internal.py index 01202eda..121e7eab 100644 --- a/src/pdfrest/models/_internal.py +++ b/src/pdfrest/models/_internal.py @@ -33,8 +33,7 @@ SummaryOutputType, TranslateOutputFormat, ) -from . import PdfRestFile -from .public import PdfRestFileID +from .public import PdfRestFile, PdfRestFileID def _ensure_list(value: Any) -> Any: diff --git a/tests/test_encrypt_pdf.py b/tests/test_encrypt_pdf.py index a752408b..24f1daa9 100644 --- a/tests/test_encrypt_pdf.py +++ b/tests/test_encrypt_pdf.py @@ -8,7 +8,12 @@ from pydantic import ValidationError from pdfrest import AsyncPdfRestClient, PdfRestClient -from pdfrest.models import PdfRestFile, PdfRestFileBasedResponse, PdfRestFileID +from pdfrest.models import ( + PdfRestFile, + PdfRestFileBasedResponse, + PdfRestFileID, +) +from pdfrest.models._internal import PdfDecryptPayload, PdfEncryptPayload from .graphics_test_helpers import ( ASYNC_API_KEY, @@ -39,18 +44,16 @@ def build_encrypt_payload( current_open_password: str | None = None, current_permissions_password: str | None = None, output: str | None = None, -) -> dict[str, str]: - payload: dict[str, str] = { - "id": str(input_file.id), - "new_open_password": new_open_password, - } - if current_open_password is not None: - payload["current_open_password"] = current_open_password - if current_permissions_password is not None: - payload["current_permissions_password"] = current_permissions_password - if output is not None: - payload["output"] = output - return payload +) -> dict[str, object]: + return PdfEncryptPayload.model_validate( + { + "files": [input_file], + "new_open_password": new_open_password, + "current_open_password": current_open_password, + "current_permissions_password": current_permissions_password, + "output": output, + } + ).model_dump(mode="json", by_alias=True, exclude_none=True, exclude_unset=True) def build_decrypt_payload( @@ -59,16 +62,15 @@ def build_decrypt_payload( current_open_password: str, current_permissions_password: str | None = None, output: str | None = None, -) -> dict[str, str]: - payload: dict[str, str] = { - "id": str(input_file.id), - "current_open_password": current_open_password, - } - if current_permissions_password is not None: - payload["current_permissions_password"] = current_permissions_password - if output is not None: - payload["output"] = output - return payload +) -> dict[str, object]: + return PdfDecryptPayload.model_validate( + { + "files": [input_file], + "current_open_password": current_open_password, + "current_permissions_password": current_permissions_password, + "output": output, + } + ).model_dump(mode="json", by_alias=True, exclude_none=True, exclude_unset=True) @pytest.mark.parametrize( diff --git a/tests/test_permissions_password.py b/tests/test_permissions_password.py index a45af37c..6406e126 100644 --- a/tests/test_permissions_password.py +++ b/tests/test_permissions_password.py @@ -8,7 +8,12 @@ from pydantic import ValidationError from pdfrest import AsyncPdfRestClient, PdfRestClient -from pdfrest.models import PdfRestFile, PdfRestFileBasedResponse, PdfRestFileID +from pdfrest.models import ( + PdfRestFile, + PdfRestFileBasedResponse, + PdfRestFileID, +) +from pdfrest.models._internal import PdfRestrictPayload, PdfUnrestrictPayload from pdfrest.types import PdfRestriction from .graphics_test_helpers import ( @@ -41,20 +46,17 @@ def build_restrict_payload( current_open_password: str | None = None, restrictions: list[PdfRestriction] | None = None, output: str | None = None, -) -> dict[str, str | list[PdfRestriction]]: - payload: dict[str, str | list[PdfRestriction]] = { - "id": str(input_file.id), - "new_permissions_password": new_permissions_password, - } - if current_permissions_password is not None: - payload["current_permissions_password"] = current_permissions_password - if current_open_password is not None: - payload["current_open_password"] = current_open_password - if restrictions is not None: - payload["restrictions"] = restrictions - if output is not None: - payload["output"] = output - return payload +) -> dict[str, object]: + return PdfRestrictPayload.model_validate( + { + "files": [input_file], + "new_permissions_password": new_permissions_password, + "current_permissions_password": current_permissions_password, + "current_open_password": current_open_password, + "restrictions": restrictions, + "output": output, + } + ).model_dump(mode="json", by_alias=True, exclude_none=True, exclude_unset=True) def build_unrestrict_payload( @@ -63,16 +65,15 @@ def build_unrestrict_payload( current_permissions_password: str, current_open_password: str | None = None, output: str | None = None, -) -> dict[str, str]: - payload: dict[str, str] = { - "id": str(input_file.id), - "current_permissions_password": current_permissions_password, - } - if current_open_password is not None: - payload["current_open_password"] = current_open_password - if output is not None: - payload["output"] = output - return payload +) -> dict[str, object]: + return PdfUnrestrictPayload.model_validate( + { + "files": [input_file], + "current_permissions_password": current_permissions_password, + "current_open_password": current_open_password, + "output": output, + } + ).model_dump(mode="json", by_alias=True, exclude_none=True, exclude_unset=True) @pytest.mark.parametrize( From 185dbd98c19d127a520fe9e759b6627e139420b0 Mon Sep 17 00:00:00 2001 From: Christopher Green Date: Mon, 9 Feb 2026 17:15:01 -0600 Subject: [PATCH 09/11] tests: Strengthen live password-suite response assertions - add shared `assert_pdf_file_response` helpers to live encryption and permissions-password suites - assert additional response attributes on success paths, including file size, `.pdf` suffix, and `response.warning is None` - keep existing MIME type, output-prefix, and input-id checks while reducing repeated assertion blocks - align live assertions with repository testing guidelines for richer server-response verification Assisted-by: Codex --- tests/live/test_live_encrypt_pdf.py | 78 ++++++++++++-------- tests/live/test_live_permissions_password.py | 78 ++++++++++++-------- 2 files changed, 94 insertions(+), 62 deletions(-) diff --git a/tests/live/test_live_encrypt_pdf.py b/tests/live/test_live_encrypt_pdf.py index 9bfdd683..8da58220 100644 --- a/tests/live/test_live_encrypt_pdf.py +++ b/tests/live/test_live_encrypt_pdf.py @@ -5,7 +5,7 @@ import pytest from pdfrest import AsyncPdfRestClient, PdfRestApiError, PdfRestClient -from pdfrest.models import PdfRestFile +from pdfrest.models import PdfRestFile, PdfRestFileBasedResponse from ..resources import get_test_resource_path @@ -14,6 +14,22 @@ def make_password(label: str) -> str: return f"{label}-{uuid4().hex}" +def assert_pdf_file_response( + response: PdfRestFileBasedResponse, + *, + output_prefix: str, + input_file: PdfRestFile, +) -> None: + assert response.output_files + output_file = response.output_file + assert output_file.type == "application/pdf" + assert output_file.name.startswith(output_prefix) + assert output_file.name.endswith(".pdf") + assert output_file.size > 0 + assert response.warning is None + assert str(response.input_id) == str(input_file.id) + + @pytest.fixture(scope="module") def uploaded_pdf_for_encrypt( pdfrest_api_key: str, @@ -42,11 +58,11 @@ def test_live_add_open_password( output="live-encrypted", ) - assert response.output_files - output_file = response.output_file - assert output_file.type == "application/pdf" - assert output_file.name.startswith("live-encrypted") - assert str(response.input_id) == str(uploaded_pdf_for_encrypt.id) + assert_pdf_file_response( + response, + output_prefix="live-encrypted", + input_file=uploaded_pdf_for_encrypt, + ) @pytest.mark.asyncio @@ -65,11 +81,11 @@ async def test_live_async_add_open_password( output="async-live-encrypted", ) - assert response.output_files - output_file = response.output_file - assert output_file.type == "application/pdf" - assert output_file.name.startswith("async-live-encrypted") - assert str(response.input_id) == str(uploaded_pdf_for_encrypt.id) + assert_pdf_file_response( + response, + output_prefix="async-live-encrypted", + input_file=uploaded_pdf_for_encrypt, + ) def test_live_change_open_password( @@ -94,11 +110,11 @@ def test_live_change_open_password( output="live-open-new", ) - assert response.output_files - output_file = response.output_file - assert output_file.name.startswith("live-open-new") - assert output_file.type == "application/pdf" - assert str(response.input_id) == str(restricted.id) + assert_pdf_file_response( + response, + output_prefix="live-open-new", + input_file=restricted, + ) @pytest.mark.asyncio @@ -126,11 +142,11 @@ async def test_live_async_change_open_password( output="async-live-open-new", ) - assert response.output_files - output_file = response.output_file - assert output_file.name.startswith("async-live-open-new") - assert output_file.type == "application/pdf" - assert str(response.input_id) == str(restricted.id) + assert_pdf_file_response( + response, + output_prefix="async-live-open-new", + input_file=restricted, + ) def test_live_remove_open_password( @@ -154,11 +170,11 @@ def test_live_remove_open_password( output="live-open-removed", ) - assert response.output_files - output_file = response.output_file - assert output_file.type == "application/pdf" - assert output_file.name.startswith("live-open-removed") - assert str(response.input_id) == str(restricted.id) + assert_pdf_file_response( + response, + output_prefix="live-open-removed", + input_file=restricted, + ) @pytest.mark.asyncio @@ -185,11 +201,11 @@ async def test_live_async_remove_open_password( output="async-live-open-removed", ) - assert response.output_files - output_file = response.output_file - assert output_file.type == "application/pdf" - assert output_file.name.startswith("async-live-open-removed") - assert str(response.input_id) == str(restricted.id) + assert_pdf_file_response( + response, + output_prefix="async-live-open-removed", + input_file=restricted, + ) def test_live_remove_open_password_invalid_password( diff --git a/tests/live/test_live_permissions_password.py b/tests/live/test_live_permissions_password.py index 829ada17..3d78a6b3 100644 --- a/tests/live/test_live_permissions_password.py +++ b/tests/live/test_live_permissions_password.py @@ -6,7 +6,7 @@ import pytest from pdfrest import AsyncPdfRestClient, PdfRestApiError, PdfRestClient -from pdfrest.models import PdfRestFile +from pdfrest.models import PdfRestFile, PdfRestFileBasedResponse from pdfrest.types import PdfRestriction from ..resources import get_test_resource_path @@ -16,6 +16,22 @@ def make_password(label: str) -> str: return f"{label}-{uuid4().hex}" +def assert_pdf_file_response( + response: PdfRestFileBasedResponse, + *, + output_prefix: str, + input_file: PdfRestFile, +) -> None: + assert response.output_files + output_file = response.output_file + assert output_file.type == "application/pdf" + assert output_file.name.startswith(output_prefix) + assert output_file.name.endswith(".pdf") + assert output_file.size > 0 + assert response.warning is None + assert str(response.input_id) == str(input_file.id) + + PDF_RESTRICTIONS: tuple[PdfRestriction, ...] = cast( tuple[PdfRestriction, ...], get_args(PdfRestriction) ) @@ -56,11 +72,11 @@ def test_live_add_permissions_password( output="live-restrict", ) - assert response.output_files - output_file = response.output_file - assert output_file.type == "application/pdf" - assert output_file.name.startswith("live-restrict") - assert str(response.input_id) == str(uploaded_pdf_for_permissions.id) + assert_pdf_file_response( + response, + output_prefix="live-restrict", + input_file=uploaded_pdf_for_permissions, + ) @pytest.mark.asyncio @@ -83,11 +99,11 @@ async def test_live_async_add_permissions_password( output="async-restrict", ) - assert response.output_files - output_file = response.output_file - assert output_file.type == "application/pdf" - assert output_file.name.startswith("async-restrict") - assert str(response.input_id) == str(uploaded_pdf_for_permissions.id) + assert_pdf_file_response( + response, + output_prefix="async-restrict", + input_file=uploaded_pdf_for_permissions, + ) def test_live_change_permissions_password( @@ -116,11 +132,11 @@ def test_live_change_permissions_password( output="live-restrict-new", ) - assert response.output_files - output_file = response.output_file - assert output_file.name.startswith("live-restrict-new") - assert output_file.type == "application/pdf" - assert str(response.input_id) == str(restricted_file.id) + assert_pdf_file_response( + response, + output_prefix="live-restrict-new", + input_file=restricted_file, + ) @pytest.mark.asyncio @@ -150,11 +166,11 @@ async def test_live_async_change_permissions_password( output="async-live-restrict-new", ) - assert response.output_files - output_file = response.output_file - assert output_file.name.startswith("async-live-restrict-new") - assert output_file.type == "application/pdf" - assert str(response.input_id) == str(restricted_file.id) + assert_pdf_file_response( + response, + output_prefix="async-live-restrict-new", + input_file=restricted_file, + ) def test_live_remove_permissions_password( @@ -180,11 +196,11 @@ def test_live_remove_permissions_password( output="live-removed", ) - assert response.output_files - output_file = response.output_file - assert output_file.name.startswith("live-removed") - assert output_file.type == "application/pdf" - assert str(response.input_id) == str(restricted_file.id) + assert_pdf_file_response( + response, + output_prefix="live-removed", + input_file=restricted_file, + ) @pytest.mark.asyncio @@ -211,11 +227,11 @@ async def test_live_async_remove_permissions_password( output="async-live-removed", ) - assert response.output_files - output_file = response.output_file - assert output_file.name.startswith("async-live-removed") - assert output_file.type == "application/pdf" - assert str(response.input_id) == str(restricted_file.id) + assert_pdf_file_response( + response, + output_prefix="async-live-removed", + input_file=restricted_file, + ) def test_live_remove_permissions_password_invalid_password( From 988817db0f683bc7194cd8804069583298b9ab72 Mon Sep 17 00:00:00 2001 From: Christopher Green Date: Tue, 10 Feb 2026 11:12:54 -0600 Subject: [PATCH 10/11] tests: Strengthen password-suite response assertions with URL checks Assisted-by: Codex --- tests/live/test_live_encrypt_pdf.py | 2 + tests/live/test_live_permissions_password.py | 2 + tests/test_encrypt_pdf.py | 73 +++++++++++----- tests/test_permissions_password.py | 89 ++++++++++++++------ 4 files changed, 122 insertions(+), 44 deletions(-) diff --git a/tests/live/test_live_encrypt_pdf.py b/tests/live/test_live_encrypt_pdf.py index 8da58220..a8b45526 100644 --- a/tests/live/test_live_encrypt_pdf.py +++ b/tests/live/test_live_encrypt_pdf.py @@ -26,6 +26,8 @@ def assert_pdf_file_response( assert output_file.name.startswith(output_prefix) assert output_file.name.endswith(".pdf") assert output_file.size > 0 + output_url = str(output_file.url) + assert f"/resource/{output_file.id}" in output_url assert response.warning is None assert str(response.input_id) == str(input_file.id) diff --git a/tests/live/test_live_permissions_password.py b/tests/live/test_live_permissions_password.py index 3d78a6b3..7a4ad178 100644 --- a/tests/live/test_live_permissions_password.py +++ b/tests/live/test_live_permissions_password.py @@ -28,6 +28,8 @@ def assert_pdf_file_response( assert output_file.name.startswith(output_prefix) assert output_file.name.endswith(".pdf") assert output_file.size > 0 + output_url = str(output_file.url) + assert f"/resource/{output_file.id}" in output_url assert response.warning is None assert str(response.input_id) == str(input_file.id) diff --git a/tests/test_encrypt_pdf.py b/tests/test_encrypt_pdf.py index 24f1daa9..175fd79f 100644 --- a/tests/test_encrypt_pdf.py +++ b/tests/test_encrypt_pdf.py @@ -73,6 +73,20 @@ def build_decrypt_payload( ).model_dump(mode="json", by_alias=True, exclude_none=True, exclude_unset=True) +def assert_pdf_file_response( + response: PdfRestFileBasedResponse, *, expected_name: str, input_file: PdfRestFile +) -> None: + assert isinstance(response, PdfRestFileBasedResponse) + output_file = response.output_file + assert output_file.name == expected_name + assert output_file.type == "application/pdf" + assert output_file.size > 0 + output_url = str(output_file.url) + assert f"/resource/{output_file.id}" in output_url + assert response.warning is None + assert str(response.input_id) == str(input_file.id) + + @pytest.mark.parametrize( "permissions_password", [ @@ -132,10 +146,11 @@ def handler(request: httpx.Request) -> httpx.Response: ) assert seen == {"post": 1, "get": 1} - assert isinstance(response, PdfRestFileBasedResponse) - assert response.output_file.name == "encrypted.pdf" - assert response.output_file.type == "application/pdf" - assert str(response.input_id) == str(input_file.id) + assert_pdf_file_response( + response, + expected_name="encrypted.pdf", + input_file=input_file, + ) def test_change_open_password_request_customization( @@ -199,8 +214,11 @@ def handler(request: httpx.Request) -> httpx.Response: timeout=0.77, ) - assert isinstance(response, PdfRestFileBasedResponse) - assert response.output_file.name == "rotated-open.pdf" + assert_pdf_file_response( + response, + expected_name="rotated-open.pdf", + input_file=input_file, + ) timeout_value = captured_timeout["value"] assert timeout_value is not None if isinstance(timeout_value, dict): @@ -257,9 +275,11 @@ def handler(request: httpx.Request) -> httpx.Response: ) assert seen == {"post": 1, "get": 1} - assert isinstance(response, PdfRestFileBasedResponse) - assert response.output_file.name == "decrypted.pdf" - assert response.output_file.type == "application/pdf" + assert_pdf_file_response( + response, + expected_name="decrypted.pdf", + input_file=input_file, + ) def test_remove_open_password_request_customization( @@ -320,8 +340,11 @@ def handler(request: httpx.Request) -> httpx.Response: timeout=0.59, ) - assert isinstance(response, PdfRestFileBasedResponse) - assert response.output_file.name == "decrypted-custom.pdf" + assert_pdf_file_response( + response, + expected_name="decrypted-custom.pdf", + input_file=input_file, + ) timeout_value = captured_timeout["value"] assert timeout_value is not None if isinstance(timeout_value, dict): @@ -389,8 +412,11 @@ def handler(request: httpx.Request) -> httpx.Response: timeout=0.66, ) - assert isinstance(response, PdfRestFileBasedResponse) - assert response.output_file.name == "async-encrypted.pdf" + assert_pdf_file_response( + response, + expected_name="async-encrypted.pdf", + input_file=input_file, + ) timeout_value = captured_timeout["value"] assert timeout_value is not None if isinstance(timeout_value, dict): @@ -451,8 +477,11 @@ def handler(request: httpx.Request) -> httpx.Response: output="async-rotated-open", ) - assert isinstance(response, PdfRestFileBasedResponse) - assert response.output_file.name == "async-rotated-open.pdf" + assert_pdf_file_response( + response, + expected_name="async-rotated-open.pdf", + input_file=input_file, + ) @pytest.mark.asyncio @@ -499,8 +528,11 @@ def handler(request: httpx.Request) -> httpx.Response: current_open_password=current_password, ) - assert isinstance(response, PdfRestFileBasedResponse) - assert response.output_file.name == "async-decrypted.pdf" + assert_pdf_file_response( + response, + expected_name="async-decrypted.pdf", + input_file=input_file, + ) @pytest.mark.asyncio @@ -562,8 +594,11 @@ def handler(request: httpx.Request) -> httpx.Response: timeout=0.63, ) - assert isinstance(response, PdfRestFileBasedResponse) - assert response.output_file.name == "async-decrypted-custom.pdf" + assert_pdf_file_response( + response, + expected_name="async-decrypted-custom.pdf", + input_file=input_file, + ) timeout_value = captured_timeout["value"] assert timeout_value is not None if isinstance(timeout_value, dict): diff --git a/tests/test_permissions_password.py b/tests/test_permissions_password.py index 6406e126..704c7a9a 100644 --- a/tests/test_permissions_password.py +++ b/tests/test_permissions_password.py @@ -76,6 +76,20 @@ def build_unrestrict_payload( ).model_dump(mode="json", by_alias=True, exclude_none=True, exclude_unset=True) +def assert_pdf_file_response( + response: PdfRestFileBasedResponse, *, expected_name: str, input_file: PdfRestFile +) -> None: + assert isinstance(response, PdfRestFileBasedResponse) + output_file = response.output_file + assert output_file.name == expected_name + assert output_file.type == "application/pdf" + assert output_file.size > 0 + output_url = str(output_file.url) + assert f"/resource/{output_file.id}" in output_url + assert response.warning is None + assert str(response.input_id) == str(input_file.id) + + @pytest.mark.parametrize( "restrictions", [ @@ -139,10 +153,11 @@ def handler(request: httpx.Request) -> httpx.Response: ) assert seen == {"post": 1, "get": 1} - assert isinstance(response, PdfRestFileBasedResponse) - assert response.output_file.name == "restricted.pdf" - assert response.output_file.type == "application/pdf" - assert str(response.input_id) == str(input_file.id) + assert_pdf_file_response( + response, + expected_name="restricted.pdf", + input_file=input_file, + ) def test_add_permissions_password_request_customization( @@ -202,8 +217,11 @@ def handler(request: httpx.Request) -> httpx.Response: timeout=0.55, ) - assert isinstance(response, PdfRestFileBasedResponse) - assert response.output_file.name == "custom-restricted.pdf" + assert_pdf_file_response( + response, + expected_name="custom-restricted.pdf", + input_file=input_file, + ) timeout_value = captured_timeout["value"] assert timeout_value is not None if isinstance(timeout_value, dict): @@ -275,8 +293,11 @@ def handler(request: httpx.Request) -> httpx.Response: timeout=0.77, ) - assert isinstance(response, PdfRestFileBasedResponse) - assert response.output_file.name == "rotated.pdf" + assert_pdf_file_response( + response, + expected_name="rotated.pdf", + input_file=input_file, + ) timeout_value = captured_timeout["value"] assert timeout_value is not None if isinstance(timeout_value, dict): @@ -335,9 +356,11 @@ def handler(request: httpx.Request) -> httpx.Response: ) assert seen == {"post": 1, "get": 1} - assert isinstance(response, PdfRestFileBasedResponse) - assert response.output_file.name == "clean.pdf" - assert response.output_file.type == "application/pdf" + assert_pdf_file_response( + response, + expected_name="clean.pdf", + input_file=input_file, + ) def test_remove_permissions_password_request_customization( @@ -398,8 +421,11 @@ def handler(request: httpx.Request) -> httpx.Response: timeout=0.69, ) - assert isinstance(response, PdfRestFileBasedResponse) - assert response.output_file.name == "clean-custom.pdf" + assert_pdf_file_response( + response, + expected_name="clean-custom.pdf", + input_file=input_file, + ) timeout_value = captured_timeout["value"] assert timeout_value is not None if isinstance(timeout_value, dict): @@ -469,10 +495,11 @@ def handler(request: httpx.Request) -> httpx.Response: ) assert seen == {"post": 1, "get": 1} - assert isinstance(response, PdfRestFileBasedResponse) - assert response.output_file.name == "restricted.pdf" - assert response.output_file.type == "application/pdf" - assert str(response.input_id) == str(input_file.id) + assert_pdf_file_response( + response, + expected_name="restricted.pdf", + input_file=input_file, + ) @pytest.mark.asyncio @@ -534,8 +561,11 @@ def handler(request: httpx.Request) -> httpx.Response: timeout=0.66, ) - assert isinstance(response, PdfRestFileBasedResponse) - assert response.output_file.name == "async-restricted.pdf" + assert_pdf_file_response( + response, + expected_name="async-restricted.pdf", + input_file=input_file, + ) timeout_value = captured_timeout["value"] assert timeout_value is not None if isinstance(timeout_value, dict): @@ -608,8 +638,11 @@ def handler(request: httpx.Request) -> httpx.Response: timeout=0.88, ) - assert isinstance(response, PdfRestFileBasedResponse) - assert response.output_file.name == "async-rotated.pdf" + assert_pdf_file_response( + response, + expected_name="async-rotated.pdf", + input_file=input_file, + ) timeout_value = captured_timeout["value"] assert timeout_value is not None if isinstance(timeout_value, dict): @@ -662,8 +695,11 @@ def handler(request: httpx.Request) -> httpx.Response: current_permissions_password=current_password, ) - assert isinstance(response, PdfRestFileBasedResponse) - assert response.output_file.name == "async-clean.pdf" + assert_pdf_file_response( + response, + expected_name="async-clean.pdf", + input_file=input_file, + ) @pytest.mark.asyncio @@ -725,8 +761,11 @@ def handler(request: httpx.Request) -> httpx.Response: timeout=0.73, ) - assert isinstance(response, PdfRestFileBasedResponse) - assert response.output_file.name == "async-clean-custom.pdf" + assert_pdf_file_response( + response, + expected_name="async-clean-custom.pdf", + input_file=input_file, + ) timeout_value = captured_timeout["value"] assert timeout_value is not None if isinstance(timeout_value, dict): From 85ff4317dd83938190a42fb54fead33cd2a92162 Mon Sep 17 00:00:00 2001 From: Christopher Green Date: Tue, 10 Feb 2026 11:35:44 -0600 Subject: [PATCH 11/11] tests: Complete encrypt customization coverage Assisted-by: Codex --- tests/test_encrypt_pdf.py | 146 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 146 insertions(+) diff --git a/tests/test_encrypt_pdf.py b/tests/test_encrypt_pdf.py index 175fd79f..f4fb0bd0 100644 --- a/tests/test_encrypt_pdf.py +++ b/tests/test_encrypt_pdf.py @@ -153,6 +153,77 @@ def handler(request: httpx.Request) -> httpx.Response: ) +def test_add_open_password_request_customization( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.delenv("PDFREST_API_KEY", raising=False) + input_file = make_pdf_file(PdfRestFileID.generate(1)) + output_id = str(PdfRestFileID.generate()) + captured_timeout: dict[str, float | dict[str, float] | None] = {} + new_password = make_password("open-custom") + permissions_password = make_password("perm-custom") + payload_dump = build_encrypt_payload( + input_file, + new_open_password=new_password, + current_permissions_password=permissions_password, + output="encrypted-custom", + ) + + def handler(request: httpx.Request) -> httpx.Response: + if request.method == "POST" and request.url.path == "/encrypted-pdf": + assert request.url.params["trace"] == "sync" + assert request.headers["X-Debug"] == "sync" + captured_timeout["value"] = request.extensions.get("timeout") + payload = json.loads(request.content.decode("utf-8")) + assert payload == {**payload_dump, "diagnostics": "on"} + return httpx.Response( + 200, + json={ + "inputId": [input_file.id], + "outputId": [output_id], + }, + ) + if request.method == "GET" and request.url.path == f"/resource/{output_id}": + assert request.url.params["format"] == "info" + assert request.url.params["trace"] == "sync" + assert request.headers["X-Debug"] == "sync" + return httpx.Response( + 200, + json=build_file_info_payload( + output_id, + "encrypted-custom.pdf", + "application/pdf", + ), + ) + msg = f"Unexpected request {request.method} {request.url}" + raise AssertionError(msg) + + transport = httpx.MockTransport(handler) + with PdfRestClient(api_key=VALID_API_KEY, transport=transport) as client: + response = client.add_open_password( + input_file, + new_open_password=new_password, + current_permissions_password=permissions_password, + output="encrypted-custom", + extra_query={"trace": "sync"}, + extra_headers={"X-Debug": "sync"}, + extra_body={"diagnostics": "on"}, + timeout=0.71, + ) + + assert_pdf_file_response( + response, + expected_name="encrypted-custom.pdf", + input_file=input_file, + ) + timeout_value = captured_timeout["value"] + assert timeout_value is not None + if isinstance(timeout_value, dict): + assert all(pytest.approx(0.71) == value for value in timeout_value.values()) + else: + assert timeout_value == pytest.approx(0.71) + + def test_change_open_password_request_customization( monkeypatch: pytest.MonkeyPatch, ) -> None: @@ -484,6 +555,81 @@ def handler(request: httpx.Request) -> httpx.Response: ) +@pytest.mark.asyncio +async def test_async_change_open_password_request_customization( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.delenv("PDFREST_API_KEY", raising=False) + input_file = make_pdf_file(PdfRestFileID.generate(2)) + output_id = str(PdfRestFileID.generate()) + captured_timeout: dict[str, float | dict[str, float] | None] = {} + current_password = make_password("async-open-current-custom") + new_password = make_password("async-open-next-custom") + permissions_password = make_password("async-open-perm-custom") + payload_dump = build_encrypt_payload( + input_file, + current_open_password=current_password, + new_open_password=new_password, + current_permissions_password=permissions_password, + output="async-rotated-open-custom", + ) + + def handler(request: httpx.Request) -> httpx.Response: + if request.method == "POST" and request.url.path == "/encrypted-pdf": + assert request.url.params["trace"] == "async" + assert request.headers["X-Debug"] == "async" + captured_timeout["value"] = request.extensions.get("timeout") + payload = json.loads(request.content.decode("utf-8")) + assert payload == {**payload_dump, "audit": "yes"} + return httpx.Response( + 200, + json={ + "inputId": [input_file.id], + "outputId": [output_id], + }, + ) + if request.method == "GET" and request.url.path == f"/resource/{output_id}": + assert request.url.params["format"] == "info" + assert request.url.params["trace"] == "async" + assert request.headers["X-Debug"] == "async" + return httpx.Response( + 200, + json=build_file_info_payload( + output_id, + "async-rotated-open-custom.pdf", + "application/pdf", + ), + ) + msg = f"Unexpected request {request.method} {request.url}" + raise AssertionError(msg) + + transport = httpx.MockTransport(handler) + async with AsyncPdfRestClient(api_key=ASYNC_API_KEY, transport=transport) as client: + response = await client.change_open_password( + input_file, + current_open_password=current_password, + new_open_password=new_password, + current_permissions_password=permissions_password, + output="async-rotated-open-custom", + extra_query={"trace": "async"}, + extra_headers={"X-Debug": "async"}, + extra_body={"audit": "yes"}, + timeout=0.74, + ) + + assert_pdf_file_response( + response, + expected_name="async-rotated-open-custom.pdf", + input_file=input_file, + ) + timeout_value = captured_timeout["value"] + assert timeout_value is not None + if isinstance(timeout_value, dict): + assert all(pytest.approx(0.74) == value for value in timeout_value.values()) + else: + assert timeout_value == pytest.approx(0.74) + + @pytest.mark.asyncio async def test_async_remove_open_password_success( monkeypatch: pytest.MonkeyPatch,