diff --git a/src/pdfrest/client.py b/src/pdfrest/client.py index 0035227c..10d85479 100644 --- a/src/pdfrest/client.py +++ b/src/pdfrest/client.py @@ -82,6 +82,8 @@ JpegPdfRestPayload, OcrPdfPayload, PdfCompressPayload, + PdfDecryptPayload, + PdfEncryptPayload, PdfFlattenAnnotationsPayload, PdfFlattenFormsPayload, PdfFlattenTransparenciesPayload, @@ -92,12 +94,14 @@ PdfRedactionApplyPayload, PdfRedactionPreviewPayload, PdfRestRawFileResponse, + PdfRestrictPayload, PdfSplitPayload, PdfToExcelPayload, PdfToPdfaPayload, PdfToPdfxPayload, PdfToPowerpointPayload, PdfToWordPayload, + PdfUnrestrictPayload, PdfXfaToAcroformsPayload, PngPdfRestPayload, SummarizePdfTextPayload, @@ -120,6 +124,7 @@ PdfMergeInput, PdfPageSelection, PdfRedactionInstruction, + PdfRestriction, PdfRGBColor, PdfXType, PngColorModel, @@ -2684,6 +2689,214 @@ def flatten_pdf_forms( timeout=timeout, ) + def add_permissions_password( + self, + file: PdfRestFile | Sequence[PdfRestFile], + *, + new_permissions_password: str, + restrictions: Sequence[PdfRestriction] | None = None, + current_open_password: str | None = None, + output: str | None = None, + extra_query: Query | None = None, + extra_headers: AnyMapping | None = None, + extra_body: Body | None = None, + timeout: TimeoutTypes | None = None, + ) -> PdfRestFileBasedResponse: + """Add a permissions password and optional restrictions to a PDF.""" + + payload: dict[str, Any] = { + "files": file, + "new_permissions_password": new_permissions_password, + } + if restrictions is not None: + payload["restrictions"] = restrictions + if current_open_password is not None: + payload["current_open_password"] = current_open_password + if output is not None: + payload["output"] = output + + return self._post_file_operation( + endpoint="/restricted-pdf", + payload=payload, + payload_model=PdfRestrictPayload, + extra_query=extra_query, + extra_headers=extra_headers, + extra_body=extra_body, + timeout=timeout, + ) + + def change_permissions_password( + self, + file: PdfRestFile | Sequence[PdfRestFile], + *, + current_permissions_password: str, + new_permissions_password: str, + restrictions: Sequence[PdfRestriction] | None = None, + current_open_password: str | None = None, + output: str | None = None, + extra_query: Query | None = None, + extra_headers: AnyMapping | None = None, + extra_body: Body | None = None, + timeout: TimeoutTypes | None = None, + ) -> PdfRestFileBasedResponse: + """Rotate the permissions password and optionally update restrictions.""" + + payload: dict[str, Any] = { + "files": file, + "current_permissions_password": current_permissions_password, + "new_permissions_password": new_permissions_password, + } + if restrictions is not None: + payload["restrictions"] = restrictions + if current_open_password is not None: + payload["current_open_password"] = current_open_password + if output is not None: + payload["output"] = output + + return self._post_file_operation( + endpoint="/restricted-pdf", + payload=payload, + payload_model=PdfRestrictPayload, + extra_query=extra_query, + extra_headers=extra_headers, + extra_body=extra_body, + timeout=timeout, + ) + + def add_open_password( + self, + file: PdfRestFile | Sequence[PdfRestFile], + *, + new_open_password: str, + current_permissions_password: str | None = None, + output: str | None = None, + extra_query: Query | None = None, + extra_headers: AnyMapping | None = None, + extra_body: Body | None = None, + timeout: TimeoutTypes | None = None, + ) -> PdfRestFileBasedResponse: + """Encrypt a PDF with a new open password.""" + + payload: dict[str, Any] = { + "files": file, + "new_open_password": new_open_password, + } + if current_permissions_password is not None: + payload["current_permissions_password"] = current_permissions_password + if output is not None: + payload["output"] = output + + return self._post_file_operation( + endpoint="/encrypted-pdf", + payload=payload, + payload_model=PdfEncryptPayload, + extra_query=extra_query, + extra_headers=extra_headers, + extra_body=extra_body, + timeout=timeout, + ) + + def change_open_password( + self, + file: PdfRestFile | Sequence[PdfRestFile], + *, + current_open_password: str, + new_open_password: str, + current_permissions_password: str | None = None, + output: str | None = None, + extra_query: Query | None = None, + extra_headers: AnyMapping | None = None, + extra_body: Body | None = None, + timeout: TimeoutTypes | None = None, + ) -> PdfRestFileBasedResponse: + """Rotate the open password for an encrypted PDF.""" + + payload: dict[str, Any] = { + "files": file, + "current_open_password": current_open_password, + "new_open_password": new_open_password, + } + if current_permissions_password is not None: + payload["current_permissions_password"] = current_permissions_password + if output is not None: + payload["output"] = output + + return self._post_file_operation( + endpoint="/encrypted-pdf", + payload=payload, + payload_model=PdfEncryptPayload, + extra_query=extra_query, + extra_headers=extra_headers, + extra_body=extra_body, + timeout=timeout, + ) + + def remove_open_password( + self, + file: PdfRestFile | Sequence[PdfRestFile], + *, + current_open_password: str, + current_permissions_password: str | None = None, + output: str | None = None, + extra_query: Query | None = None, + extra_headers: AnyMapping | None = None, + extra_body: Body | None = None, + timeout: TimeoutTypes | None = None, + ) -> PdfRestFileBasedResponse: + """Decrypt a PDF by removing its open password.""" + + payload: dict[str, Any] = { + "files": file, + "current_open_password": current_open_password, + } + if current_permissions_password is not None: + payload["current_permissions_password"] = current_permissions_password + if output is not None: + payload["output"] = output + + return self._post_file_operation( + endpoint="/decrypted-pdf", + payload=payload, + payload_model=PdfDecryptPayload, + extra_query=extra_query, + extra_headers=extra_headers, + extra_body=extra_body, + timeout=timeout, + ) + + def remove_permissions_password( + self, + file: PdfRestFile | Sequence[PdfRestFile], + *, + current_permissions_password: str, + current_open_password: str | None = None, + output: str | None = None, + extra_query: Query | None = None, + extra_headers: AnyMapping | None = None, + extra_body: Body | None = None, + timeout: TimeoutTypes | None = None, + ) -> PdfRestFileBasedResponse: + """Remove permissions restrictions from a PDF.""" + + payload: dict[str, Any] = { + "files": file, + "current_permissions_password": current_permissions_password, + } + if current_open_password is not None: + payload["current_open_password"] = current_open_password + if output is not None: + payload["output"] = output + + return self._post_file_operation( + endpoint="/unrestricted-pdf", + payload=payload, + payload_model=PdfUnrestrictPayload, + extra_query=extra_query, + extra_headers=extra_headers, + extra_body=extra_body, + timeout=timeout, + ) + def compress_pdf( self, file: PdfRestFile | Sequence[PdfRestFile], @@ -3721,6 +3934,214 @@ async def flatten_pdf_forms( timeout=timeout, ) + async def add_permissions_password( + self, + file: PdfRestFile | Sequence[PdfRestFile], + *, + new_permissions_password: str, + restrictions: Sequence[PdfRestriction] | None = None, + current_open_password: str | None = None, + output: str | None = None, + extra_query: Query | None = None, + extra_headers: AnyMapping | None = None, + extra_body: Body | None = None, + timeout: TimeoutTypes | None = None, + ) -> PdfRestFileBasedResponse: + """Asynchronously add a permissions password and optional restrictions to a PDF.""" + + payload: dict[str, Any] = { + "files": file, + "new_permissions_password": new_permissions_password, + } + if restrictions is not None: + payload["restrictions"] = restrictions + if current_open_password is not None: + payload["current_open_password"] = current_open_password + if output is not None: + payload["output"] = output + + return await self._post_file_operation( + endpoint="/restricted-pdf", + payload=payload, + payload_model=PdfRestrictPayload, + extra_query=extra_query, + extra_headers=extra_headers, + extra_body=extra_body, + timeout=timeout, + ) + + async def change_permissions_password( + self, + file: PdfRestFile | Sequence[PdfRestFile], + *, + current_permissions_password: str, + new_permissions_password: str, + restrictions: Sequence[PdfRestriction] | None = None, + current_open_password: str | None = None, + output: str | None = None, + extra_query: Query | None = None, + extra_headers: AnyMapping | None = None, + extra_body: Body | None = None, + timeout: TimeoutTypes | None = None, + ) -> PdfRestFileBasedResponse: + """Asynchronously rotate the permissions password and optionally update restrictions.""" + + payload: dict[str, Any] = { + "files": file, + "current_permissions_password": current_permissions_password, + "new_permissions_password": new_permissions_password, + } + if restrictions is not None: + payload["restrictions"] = restrictions + if current_open_password is not None: + payload["current_open_password"] = current_open_password + if output is not None: + payload["output"] = output + + return await self._post_file_operation( + endpoint="/restricted-pdf", + payload=payload, + payload_model=PdfRestrictPayload, + extra_query=extra_query, + extra_headers=extra_headers, + extra_body=extra_body, + timeout=timeout, + ) + + async def remove_permissions_password( + self, + file: PdfRestFile | Sequence[PdfRestFile], + *, + current_permissions_password: str, + current_open_password: str | None = None, + output: str | None = None, + extra_query: Query | None = None, + extra_headers: AnyMapping | None = None, + extra_body: Body | None = None, + timeout: TimeoutTypes | None = None, + ) -> PdfRestFileBasedResponse: + """Asynchronously remove permissions restrictions from a PDF.""" + + payload: dict[str, Any] = { + "files": file, + "current_permissions_password": current_permissions_password, + } + if current_open_password is not None: + payload["current_open_password"] = current_open_password + if output is not None: + payload["output"] = output + + return await self._post_file_operation( + endpoint="/unrestricted-pdf", + payload=payload, + payload_model=PdfUnrestrictPayload, + extra_query=extra_query, + extra_headers=extra_headers, + extra_body=extra_body, + timeout=timeout, + ) + + async def add_open_password( + self, + file: PdfRestFile | Sequence[PdfRestFile], + *, + new_open_password: str, + current_permissions_password: str | None = None, + output: str | None = None, + extra_query: Query | None = None, + extra_headers: AnyMapping | None = None, + extra_body: Body | None = None, + timeout: TimeoutTypes | None = None, + ) -> PdfRestFileBasedResponse: + """Asynchronously encrypt a PDF with a new open password.""" + + payload: dict[str, Any] = { + "files": file, + "new_open_password": new_open_password, + } + if current_permissions_password is not None: + payload["current_permissions_password"] = current_permissions_password + if output is not None: + payload["output"] = output + + return await self._post_file_operation( + endpoint="/encrypted-pdf", + payload=payload, + payload_model=PdfEncryptPayload, + extra_query=extra_query, + extra_headers=extra_headers, + extra_body=extra_body, + timeout=timeout, + ) + + async def change_open_password( + self, + file: PdfRestFile | Sequence[PdfRestFile], + *, + current_open_password: str, + new_open_password: str, + current_permissions_password: str | None = None, + output: str | None = None, + extra_query: Query | None = None, + extra_headers: AnyMapping | None = None, + extra_body: Body | None = None, + timeout: TimeoutTypes | None = None, + ) -> PdfRestFileBasedResponse: + """Asynchronously rotate the open password for an encrypted PDF.""" + + payload: dict[str, Any] = { + "files": file, + "current_open_password": current_open_password, + "new_open_password": new_open_password, + } + if current_permissions_password is not None: + payload["current_permissions_password"] = current_permissions_password + if output is not None: + payload["output"] = output + + return await self._post_file_operation( + endpoint="/encrypted-pdf", + payload=payload, + payload_model=PdfEncryptPayload, + extra_query=extra_query, + extra_headers=extra_headers, + extra_body=extra_body, + timeout=timeout, + ) + + async def remove_open_password( + self, + file: PdfRestFile | Sequence[PdfRestFile], + *, + current_open_password: str, + current_permissions_password: str | None = None, + output: str | None = None, + extra_query: Query | None = None, + extra_headers: AnyMapping | None = None, + extra_body: Body | None = None, + timeout: TimeoutTypes | None = None, + ) -> PdfRestFileBasedResponse: + """Asynchronously decrypt a PDF by removing its open password.""" + + payload: dict[str, Any] = { + "files": file, + "current_open_password": current_open_password, + } + if current_permissions_password is not None: + payload["current_permissions_password"] = current_permissions_password + if output is not None: + payload["output"] = output + + return await self._post_file_operation( + endpoint="/decrypted-pdf", + payload=payload, + payload_model=PdfDecryptPayload, + extra_query=extra_query, + extra_headers=extra_headers, + extra_body=extra_body, + timeout=timeout, + ) + async def compress_pdf( self, file: PdfRestFile | Sequence[PdfRestFile], diff --git a/src/pdfrest/models/_internal.py b/src/pdfrest/models/_internal.py index 1c654f0d..121e7eab 100644 --- a/src/pdfrest/models/_internal.py +++ b/src/pdfrest/models/_internal.py @@ -26,14 +26,14 @@ OcrLanguage, PdfAType, PdfInfoQuery, + PdfRestriction, PdfXType, SummaryFormat, SummaryOutputFormat, SummaryOutputType, TranslateOutputFormat, ) -from . import PdfRestFile -from .public import PdfRestFileID +from .public import PdfRestFile, PdfRestFileID def _ensure_list(value: Any) -> Any: @@ -1142,6 +1142,193 @@ class PdfFlattenAnnotationsPayload(BaseModel): ] = None +class PdfRestrictPayload(BaseModel): + """Adapt caller options into a pdfRest-ready restrict-PDF request payload.""" + + files: Annotated[ + list[PdfRestFile], + Field( + min_length=1, + max_length=1, + validation_alias=AliasChoices("file", "files"), + serialization_alias="id", + ), + BeforeValidator(_ensure_list), + AfterValidator( + _allowed_mime_types("application/pdf", error_msg="Must be a PDF file") + ), + PlainSerializer(_serialize_as_first_file_id), + ] + new_permissions_password: Annotated[ + str, + Field( + serialization_alias="new_permissions_password", + min_length=6, + max_length=128, + ), + ] + restrictions: Annotated[ + list[PdfRestriction] | None, + Field(serialization_alias="restrictions", min_length=1, default=None), + BeforeValidator(_ensure_list), + ] = None + current_permissions_password: Annotated[ + str | None, + Field( + serialization_alias="current_permissions_password", + min_length=1, + max_length=128, + default=None, + ), + ] = None + current_open_password: Annotated[ + str | None, + Field( + serialization_alias="current_open_password", + min_length=1, + max_length=128, + default=None, + ), + ] = None + output: Annotated[ + str | None, + Field(serialization_alias="output", min_length=1, default=None), + AfterValidator(_validate_output_prefix), + ] = None + + +class PdfUnrestrictPayload(BaseModel): + """Adapt caller options into a pdfRest-ready unrestrict-PDF request payload.""" + + files: Annotated[ + list[PdfRestFile], + Field( + min_length=1, + max_length=1, + validation_alias=AliasChoices("file", "files"), + serialization_alias="id", + ), + BeforeValidator(_ensure_list), + AfterValidator( + _allowed_mime_types("application/pdf", error_msg="Must be a PDF file") + ), + PlainSerializer(_serialize_as_first_file_id), + ] + current_permissions_password: Annotated[ + str, + Field( + serialization_alias="current_permissions_password", + min_length=1, + max_length=128, + ), + ] + current_open_password: Annotated[ + str | None, + Field( + serialization_alias="current_open_password", + min_length=1, + max_length=128, + default=None, + ), + ] = None + output: Annotated[ + str | None, + Field(serialization_alias="output", min_length=1, default=None), + AfterValidator(_validate_output_prefix), + ] = None + + +class PdfEncryptPayload(BaseModel): + """Adapt caller options into a pdfRest-ready encrypt-PDF request payload.""" + + files: Annotated[ + list[PdfRestFile], + Field( + min_length=1, + max_length=1, + validation_alias=AliasChoices("file", "files"), + serialization_alias="id", + ), + BeforeValidator(_ensure_list), + AfterValidator( + _allowed_mime_types("application/pdf", error_msg="Must be a PDF file") + ), + PlainSerializer(_serialize_as_first_file_id), + ] + new_open_password: Annotated[ + str, + Field( + serialization_alias="new_open_password", + min_length=6, + max_length=128, + ), + ] + current_open_password: Annotated[ + str | None, + Field( + serialization_alias="current_open_password", + min_length=1, + max_length=128, + default=None, + ), + ] = None + current_permissions_password: Annotated[ + str | None, + Field( + serialization_alias="current_permissions_password", + min_length=1, + max_length=128, + default=None, + ), + ] = None + output: Annotated[ + str | None, + Field(serialization_alias="output", min_length=1, default=None), + AfterValidator(_validate_output_prefix), + ] = None + + +class PdfDecryptPayload(BaseModel): + """Adapt caller options into a pdfRest-ready decrypt-PDF request payload.""" + + files: Annotated[ + list[PdfRestFile], + Field( + min_length=1, + max_length=1, + validation_alias=AliasChoices("file", "files"), + serialization_alias="id", + ), + BeforeValidator(_ensure_list), + AfterValidator( + _allowed_mime_types("application/pdf", error_msg="Must be a PDF file") + ), + PlainSerializer(_serialize_as_first_file_id), + ] + current_open_password: Annotated[ + str, + Field( + serialization_alias="current_open_password", + min_length=1, + max_length=128, + ), + ] + current_permissions_password: Annotated[ + str | None, + Field( + serialization_alias="current_permissions_password", + min_length=1, + max_length=128, + default=None, + ), + ] = None + output: Annotated[ + str | None, + Field(serialization_alias="output", min_length=1, default=None), + AfterValidator(_validate_output_prefix), + ] = None + + class BmpPdfRestPayload(BasePdfRestGraphicPayload[Literal["rgb", "gray"]]): """Adapt caller options into a pdfRest-ready BMP request payload.""" diff --git a/src/pdfrest/types/__init__.py b/src/pdfrest/types/__init__.py index 48f78b03..0d139641 100644 --- a/src/pdfrest/types/__init__.py +++ b/src/pdfrest/types/__init__.py @@ -3,6 +3,7 @@ from .public import ( ALL_OCR_LANGUAGES, ALL_PDF_INFO_QUERIES, + ALL_PDF_RESTRICTIONS, BmpColorModel, CompressionLevel, ExtractTextGranularity, @@ -19,6 +20,7 @@ PdfRedactionInstruction, PdfRedactionPreset, PdfRedactionType, + PdfRestriction, PdfRGBColor, PdfXType, PngColorModel, @@ -32,6 +34,7 @@ __all__ = [ "ALL_OCR_LANGUAGES", "ALL_PDF_INFO_QUERIES", + "ALL_PDF_RESTRICTIONS", "BmpColorModel", "CompressionLevel", "ExtractTextGranularity", @@ -49,6 +52,7 @@ "PdfRedactionInstruction", "PdfRedactionPreset", "PdfRedactionType", + "PdfRestriction", "PdfXType", "PngColorModel", "SummaryFormat", diff --git a/src/pdfrest/types/public.py b/src/pdfrest/types/public.py index 6472f2e7..020f012f 100644 --- a/src/pdfrest/types/public.py +++ b/src/pdfrest/types/public.py @@ -15,6 +15,7 @@ __all__ = ( "ALL_OCR_LANGUAGES", "ALL_PDF_INFO_QUERIES", + "ALL_PDF_RESTRICTIONS", "BmpColorModel", "CompressionLevel", "ExtractTextGranularity", @@ -32,6 +33,7 @@ "PdfRedactionInstruction", "PdfRedactionPreset", "PdfRedactionType", + "PdfRestriction", "PdfXType", "PngColorModel", "SummaryFormat", @@ -160,3 +162,18 @@ class PdfMergeSource(TypedDict, total=False): ALL_OCR_LANGUAGES: tuple[OcrLanguage, ...] = cast( tuple[OcrLanguage, ...], get_args(OcrLanguage) ) + +PdfRestriction = Literal[ + "print_low", + "print_high", + "edit_document_assembly", + "edit_fill_and_sign_form_fields", + "edit_annotations", + "edit_content", + "copy_content", + "accessibility_off", +] + +ALL_PDF_RESTRICTIONS: tuple[PdfRestriction, ...] = cast( + tuple[PdfRestriction, ...], get_args(PdfRestriction) +) diff --git a/tests/live/test_live_encrypt_pdf.py b/tests/live/test_live_encrypt_pdf.py new file mode 100644 index 00000000..a8b45526 --- /dev/null +++ b/tests/live/test_live_encrypt_pdf.py @@ -0,0 +1,259 @@ +from __future__ import annotations + +from uuid import uuid4 + +import pytest + +from pdfrest import AsyncPdfRestClient, PdfRestApiError, PdfRestClient +from pdfrest.models import PdfRestFile, PdfRestFileBasedResponse + +from ..resources import get_test_resource_path + + +def make_password(label: str) -> str: + return f"{label}-{uuid4().hex}" + + +def assert_pdf_file_response( + response: PdfRestFileBasedResponse, + *, + output_prefix: str, + input_file: PdfRestFile, +) -> None: + assert response.output_files + output_file = response.output_file + assert output_file.type == "application/pdf" + assert output_file.name.startswith(output_prefix) + assert output_file.name.endswith(".pdf") + assert output_file.size > 0 + output_url = str(output_file.url) + assert f"/resource/{output_file.id}" in output_url + assert response.warning is None + assert str(response.input_id) == str(input_file.id) + + +@pytest.fixture(scope="module") +def uploaded_pdf_for_encrypt( + pdfrest_api_key: str, + pdfrest_live_base_url: str, +) -> PdfRestFile: + resource = get_test_resource_path("report.pdf") + with PdfRestClient( + api_key=pdfrest_api_key, + base_url=pdfrest_live_base_url, + ) as client: + return client.files.create_from_paths([resource])[0] + + +def test_live_add_open_password( + pdfrest_api_key: str, + pdfrest_live_base_url: str, + uploaded_pdf_for_encrypt: PdfRestFile, +) -> None: + with PdfRestClient( + api_key=pdfrest_api_key, + base_url=pdfrest_live_base_url, + ) as client: + response = client.add_open_password( + uploaded_pdf_for_encrypt, + new_open_password=make_password("live-open"), + output="live-encrypted", + ) + + assert_pdf_file_response( + response, + output_prefix="live-encrypted", + input_file=uploaded_pdf_for_encrypt, + ) + + +@pytest.mark.asyncio +async def test_live_async_add_open_password( + pdfrest_api_key: str, + pdfrest_live_base_url: str, + uploaded_pdf_for_encrypt: PdfRestFile, +) -> None: + async with AsyncPdfRestClient( + api_key=pdfrest_api_key, + base_url=pdfrest_live_base_url, + ) as client: + response = await client.add_open_password( + uploaded_pdf_for_encrypt, + new_open_password=make_password("async-live-open"), + output="async-live-encrypted", + ) + + assert_pdf_file_response( + response, + output_prefix="async-live-encrypted", + input_file=uploaded_pdf_for_encrypt, + ) + + +def test_live_change_open_password( + pdfrest_api_key: str, + pdfrest_live_base_url: str, + uploaded_pdf_for_encrypt: PdfRestFile, +) -> None: + with PdfRestClient( + api_key=pdfrest_api_key, + base_url=pdfrest_live_base_url, + ) as client: + current_password = make_password("live-open-current") + restricted = client.add_open_password( + uploaded_pdf_for_encrypt, + new_open_password=current_password, + output="live-open-old", + ).output_file + response = client.change_open_password( + restricted, + current_open_password=current_password, + new_open_password=make_password("live-open-new"), + output="live-open-new", + ) + + assert_pdf_file_response( + response, + output_prefix="live-open-new", + input_file=restricted, + ) + + +@pytest.mark.asyncio +async def test_live_async_change_open_password( + pdfrest_api_key: str, + pdfrest_live_base_url: str, + uploaded_pdf_for_encrypt: PdfRestFile, +) -> None: + async with AsyncPdfRestClient( + api_key=pdfrest_api_key, + base_url=pdfrest_live_base_url, + ) as client: + current_password = make_password("async-live-open-current") + restricted = ( + await client.add_open_password( + uploaded_pdf_for_encrypt, + new_open_password=current_password, + output="async-live-open-old", + ) + ).output_file + response = await client.change_open_password( + restricted, + current_open_password=current_password, + new_open_password=make_password("async-live-open-new"), + output="async-live-open-new", + ) + + assert_pdf_file_response( + response, + output_prefix="async-live-open-new", + input_file=restricted, + ) + + +def test_live_remove_open_password( + pdfrest_api_key: str, + pdfrest_live_base_url: str, + uploaded_pdf_for_encrypt: PdfRestFile, +) -> None: + with PdfRestClient( + api_key=pdfrest_api_key, + base_url=pdfrest_live_base_url, + ) as client: + current_password = make_password("live-open-remove") + restricted = client.add_open_password( + uploaded_pdf_for_encrypt, + new_open_password=current_password, + output="live-open-to-remove", + ).output_file + response = client.remove_open_password( + restricted, + current_open_password=current_password, + output="live-open-removed", + ) + + assert_pdf_file_response( + response, + output_prefix="live-open-removed", + input_file=restricted, + ) + + +@pytest.mark.asyncio +async def test_live_async_remove_open_password( + pdfrest_api_key: str, + pdfrest_live_base_url: str, + uploaded_pdf_for_encrypt: PdfRestFile, +) -> None: + async with AsyncPdfRestClient( + api_key=pdfrest_api_key, + base_url=pdfrest_live_base_url, + ) as client: + current_password = make_password("async-live-open-remove") + restricted = ( + await client.add_open_password( + uploaded_pdf_for_encrypt, + new_open_password=current_password, + output="async-live-open-to-remove", + ) + ).output_file + response = await client.remove_open_password( + restricted, + current_open_password=current_password, + output="async-live-open-removed", + ) + + assert_pdf_file_response( + response, + output_prefix="async-live-open-removed", + input_file=restricted, + ) + + +def test_live_remove_open_password_invalid_password( + pdfrest_api_key: str, + pdfrest_live_base_url: str, + uploaded_pdf_for_encrypt: PdfRestFile, +) -> None: + with PdfRestClient( + api_key=pdfrest_api_key, + base_url=pdfrest_live_base_url, + ) as client: + correct_password = make_password("live-open-correct") + wrong_password = make_password("live-open-wrong") + restricted = client.add_open_password( + uploaded_pdf_for_encrypt, + new_open_password=correct_password, + output="live-open-invalid", + ).output_file + with pytest.raises(PdfRestApiError, match="password-protected"): + client.remove_open_password( + restricted, + current_open_password=wrong_password, + ) + + +@pytest.mark.asyncio +async def test_live_async_remove_open_password_invalid_password( + pdfrest_api_key: str, + pdfrest_live_base_url: str, + uploaded_pdf_for_encrypt: PdfRestFile, +) -> None: + async with AsyncPdfRestClient( + api_key=pdfrest_api_key, + base_url=pdfrest_live_base_url, + ) as client: + correct_password = make_password("async-live-open-correct") + wrong_password = make_password("async-live-open-wrong") + restricted = ( + await client.add_open_password( + uploaded_pdf_for_encrypt, + new_open_password=correct_password, + output="async-live-open-invalid", + ) + ).output_file + with pytest.raises(PdfRestApiError, match="password-protected"): + await client.remove_open_password( + restricted, + current_open_password=wrong_password, + ) diff --git a/tests/live/test_live_permissions_password.py b/tests/live/test_live_permissions_password.py new file mode 100644 index 00000000..7a4ad178 --- /dev/null +++ b/tests/live/test_live_permissions_password.py @@ -0,0 +1,330 @@ +from __future__ import annotations + +from typing import cast, get_args +from uuid import uuid4 + +import pytest + +from pdfrest import AsyncPdfRestClient, PdfRestApiError, PdfRestClient +from pdfrest.models import PdfRestFile, PdfRestFileBasedResponse +from pdfrest.types import PdfRestriction + +from ..resources import get_test_resource_path + + +def make_password(label: str) -> str: + return f"{label}-{uuid4().hex}" + + +def assert_pdf_file_response( + response: PdfRestFileBasedResponse, + *, + output_prefix: str, + input_file: PdfRestFile, +) -> None: + assert response.output_files + output_file = response.output_file + assert output_file.type == "application/pdf" + assert output_file.name.startswith(output_prefix) + assert output_file.name.endswith(".pdf") + assert output_file.size > 0 + output_url = str(output_file.url) + assert f"/resource/{output_file.id}" in output_url + assert response.warning is None + assert str(response.input_id) == str(input_file.id) + + +PDF_RESTRICTIONS: tuple[PdfRestriction, ...] = cast( + tuple[PdfRestriction, ...], get_args(PdfRestriction) +) +RESTRICTION_PARAMS = [ + pytest.param(restriction, id=restriction) for restriction in PDF_RESTRICTIONS +] + + +@pytest.fixture(scope="module") +def uploaded_pdf_for_permissions( + pdfrest_api_key: str, + pdfrest_live_base_url: str, +) -> PdfRestFile: + resource = get_test_resource_path("report.pdf") + with PdfRestClient( + api_key=pdfrest_api_key, + base_url=pdfrest_live_base_url, + ) as client: + return client.files.create_from_paths([resource])[0] + + +@pytest.mark.parametrize("restriction", RESTRICTION_PARAMS) +def test_live_add_permissions_password( + pdfrest_api_key: str, + pdfrest_live_base_url: str, + uploaded_pdf_for_permissions: PdfRestFile, + restriction: PdfRestriction, +) -> None: + with PdfRestClient( + api_key=pdfrest_api_key, + base_url=pdfrest_live_base_url, + ) as client: + new_password = make_password("live-perm") + response = client.add_permissions_password( + uploaded_pdf_for_permissions, + new_permissions_password=new_password, + restrictions=[restriction], + output="live-restrict", + ) + + assert_pdf_file_response( + response, + output_prefix="live-restrict", + input_file=uploaded_pdf_for_permissions, + ) + + +@pytest.mark.asyncio +@pytest.mark.parametrize("restriction", RESTRICTION_PARAMS) +async def test_live_async_add_permissions_password( + pdfrest_api_key: str, + pdfrest_live_base_url: str, + uploaded_pdf_for_permissions: PdfRestFile, + restriction: PdfRestriction, +) -> None: + async with AsyncPdfRestClient( + api_key=pdfrest_api_key, + base_url=pdfrest_live_base_url, + ) as client: + new_password = make_password("async-live-perm") + response = await client.add_permissions_password( + uploaded_pdf_for_permissions, + new_permissions_password=new_password, + restrictions=[restriction], + output="async-restrict", + ) + + assert_pdf_file_response( + response, + output_prefix="async-restrict", + input_file=uploaded_pdf_for_permissions, + ) + + +def test_live_change_permissions_password( + pdfrest_api_key: str, + pdfrest_live_base_url: str, + uploaded_pdf_for_permissions: PdfRestFile, +) -> None: + with PdfRestClient( + api_key=pdfrest_api_key, + base_url=pdfrest_live_base_url, + ) as client: + current_password = make_password("old-live") + new_password = make_password("new-live") + restricted_response = client.add_permissions_password( + uploaded_pdf_for_permissions, + new_permissions_password=current_password, + restrictions=["print_low"], + output="live-restrict-old", + ) + restricted_file = restricted_response.output_file + response = client.change_permissions_password( + restricted_file, + current_permissions_password=current_password, + new_permissions_password=new_password, + restrictions=["copy_content"], + output="live-restrict-new", + ) + + assert_pdf_file_response( + response, + output_prefix="live-restrict-new", + input_file=restricted_file, + ) + + +@pytest.mark.asyncio +async def test_live_async_change_permissions_password( + pdfrest_api_key: str, + pdfrest_live_base_url: str, + uploaded_pdf_for_permissions: PdfRestFile, +) -> None: + async with AsyncPdfRestClient( + api_key=pdfrest_api_key, + base_url=pdfrest_live_base_url, + ) as client: + current_password = make_password("async-old") + new_password = make_password("async-new") + restricted_response = await client.add_permissions_password( + uploaded_pdf_for_permissions, + new_permissions_password=current_password, + restrictions=["edit_content"], + output="async-live-restrict-old", + ) + restricted_file = restricted_response.output_file + response = await client.change_permissions_password( + restricted_file, + current_permissions_password=current_password, + new_permissions_password=new_password, + restrictions=["edit_annotations"], + output="async-live-restrict-new", + ) + + assert_pdf_file_response( + response, + output_prefix="async-live-restrict-new", + input_file=restricted_file, + ) + + +def test_live_remove_permissions_password( + pdfrest_api_key: str, + pdfrest_live_base_url: str, + uploaded_pdf_for_permissions: PdfRestFile, +) -> None: + with PdfRestClient( + api_key=pdfrest_api_key, + base_url=pdfrest_live_base_url, + ) as client: + current_password = make_password("remove-live") + restricted_response = client.add_permissions_password( + uploaded_pdf_for_permissions, + new_permissions_password=current_password, + restrictions=["print_high"], + output="live-to-remove", + ) + restricted_file = restricted_response.output_file + response = client.remove_permissions_password( + restricted_file, + current_permissions_password=current_password, + output="live-removed", + ) + + assert_pdf_file_response( + response, + output_prefix="live-removed", + input_file=restricted_file, + ) + + +@pytest.mark.asyncio +async def test_live_async_remove_permissions_password( + pdfrest_api_key: str, + pdfrest_live_base_url: str, + uploaded_pdf_for_permissions: PdfRestFile, +) -> None: + async with AsyncPdfRestClient( + api_key=pdfrest_api_key, + base_url=pdfrest_live_base_url, + ) as client: + current_password = make_password("async-remove") + restricted_response = await client.add_permissions_password( + uploaded_pdf_for_permissions, + new_permissions_password=current_password, + restrictions=["accessibility_off"], + output="async-live-to-remove", + ) + restricted_file = restricted_response.output_file + response = await client.remove_permissions_password( + restricted_file, + current_permissions_password=current_password, + output="async-live-removed", + ) + + assert_pdf_file_response( + response, + output_prefix="async-live-removed", + input_file=restricted_file, + ) + + +def test_live_remove_permissions_password_invalid_password( + pdfrest_api_key: str, + pdfrest_live_base_url: str, + uploaded_pdf_for_permissions: PdfRestFile, +) -> None: + with PdfRestClient( + api_key=pdfrest_api_key, + base_url=pdfrest_live_base_url, + ) as client: + correct_password = make_password("live-wrong") + wrong_password = make_password("incorrect") + restricted_response = client.add_permissions_password( + uploaded_pdf_for_permissions, + new_permissions_password=correct_password, + restrictions=["edit_annotations"], + output="live-invalid-remove", + ) + restricted_file = restricted_response.output_file + with pytest.raises(PdfRestApiError, match="permissions password"): + client.remove_permissions_password( + restricted_file, + current_permissions_password=wrong_password, + ) + + +@pytest.mark.asyncio +async def test_live_async_remove_permissions_password_invalid_password( + pdfrest_api_key: str, + pdfrest_live_base_url: str, + uploaded_pdf_for_permissions: PdfRestFile, +) -> None: + async with AsyncPdfRestClient( + api_key=pdfrest_api_key, + base_url=pdfrest_live_base_url, + ) as client: + correct_password = make_password("async-live-wrong") + wrong_password = make_password("async-wrong") + restricted_response = await client.add_permissions_password( + uploaded_pdf_for_permissions, + new_permissions_password=correct_password, + restrictions=["edit_content"], + output="async-live-invalid-remove", + ) + restricted_file = restricted_response.output_file + with pytest.raises(PdfRestApiError, match="permissions password"): + await client.remove_permissions_password( + restricted_file, + current_permissions_password=wrong_password, + ) + + +def test_live_add_permissions_password_invalid_restriction( + pdfrest_api_key: str, + pdfrest_live_base_url: str, + uploaded_pdf_for_permissions: PdfRestFile, +) -> None: + invalid_restriction = "totally-invalid-restriction" + with ( + PdfRestClient( + api_key=pdfrest_api_key, + base_url=pdfrest_live_base_url, + ) as client, + pytest.raises(PdfRestApiError, match=r"(?i)restriction|invalid"), + ): + client.add_permissions_password( + uploaded_pdf_for_permissions, + new_permissions_password=make_password("live-invalid-restriction"), + restrictions=["print_low"], + extra_body={"restrictions": [invalid_restriction]}, + ) + + +@pytest.mark.asyncio +async def test_live_async_add_permissions_password_invalid_restriction( + pdfrest_api_key: str, + pdfrest_live_base_url: str, + uploaded_pdf_for_permissions: PdfRestFile, +) -> None: + invalid_restriction = "totally-invalid-restriction" + async with AsyncPdfRestClient( + api_key=pdfrest_api_key, + base_url=pdfrest_live_base_url, + ) as client: + with pytest.raises(PdfRestApiError, match=r"(?i)restriction|invalid"): + await client.add_permissions_password( + uploaded_pdf_for_permissions, + new_permissions_password=make_password( + "async-live-invalid-restriction" + ), + restrictions=["print_low"], + extra_body={"restrictions": [invalid_restriction]}, + ) diff --git a/tests/test_encrypt_pdf.py b/tests/test_encrypt_pdf.py new file mode 100644 index 00000000..f4fb0bd0 --- /dev/null +++ b/tests/test_encrypt_pdf.py @@ -0,0 +1,862 @@ +from __future__ import annotations + +import json +from uuid import uuid4 + +import httpx +import pytest +from pydantic import ValidationError + +from pdfrest import AsyncPdfRestClient, PdfRestClient +from pdfrest.models import ( + PdfRestFile, + PdfRestFileBasedResponse, + PdfRestFileID, +) +from pdfrest.models._internal import PdfDecryptPayload, PdfEncryptPayload + +from .graphics_test_helpers import ( + ASYNC_API_KEY, + VALID_API_KEY, + build_file_info_payload, + make_pdf_file, +) + + +def make_password(label: str) -> str: + return f"{label}-{uuid4().hex}" + + +def make_non_pdf_file(file_id: str) -> PdfRestFile: + return PdfRestFile.model_validate( + build_file_info_payload( + file_id, + "example.png", + "image/png", + ) + ) + + +def build_encrypt_payload( + input_file: PdfRestFile, + *, + new_open_password: str, + current_open_password: str | None = None, + current_permissions_password: str | None = None, + output: str | None = None, +) -> dict[str, object]: + return PdfEncryptPayload.model_validate( + { + "files": [input_file], + "new_open_password": new_open_password, + "current_open_password": current_open_password, + "current_permissions_password": current_permissions_password, + "output": output, + } + ).model_dump(mode="json", by_alias=True, exclude_none=True, exclude_unset=True) + + +def build_decrypt_payload( + input_file: PdfRestFile, + *, + current_open_password: str, + current_permissions_password: str | None = None, + output: str | None = None, +) -> dict[str, object]: + return PdfDecryptPayload.model_validate( + { + "files": [input_file], + "current_open_password": current_open_password, + "current_permissions_password": current_permissions_password, + "output": output, + } + ).model_dump(mode="json", by_alias=True, exclude_none=True, exclude_unset=True) + + +def assert_pdf_file_response( + response: PdfRestFileBasedResponse, *, expected_name: str, input_file: PdfRestFile +) -> None: + assert isinstance(response, PdfRestFileBasedResponse) + output_file = response.output_file + assert output_file.name == expected_name + assert output_file.type == "application/pdf" + assert output_file.size > 0 + output_url = str(output_file.url) + assert f"/resource/{output_file.id}" in output_url + assert response.warning is None + assert str(response.input_id) == str(input_file.id) + + +@pytest.mark.parametrize( + "permissions_password", + [ + pytest.param(None, id="no-permissions"), + pytest.param(make_password("perm"), id="with-permissions"), + ], +) +def test_add_open_password_success( + monkeypatch: pytest.MonkeyPatch, permissions_password: str | None +) -> None: + monkeypatch.delenv("PDFREST_API_KEY", raising=False) + input_file = make_pdf_file(PdfRestFileID.generate(1)) + output_id = str(PdfRestFileID.generate()) + new_password = make_password("open") + payload_dump = build_encrypt_payload( + input_file, + new_open_password=new_password, + current_permissions_password=permissions_password, + output="encrypted", + ) + + seen: dict[str, int] = {"post": 0, "get": 0} + + def handler(request: httpx.Request) -> httpx.Response: + if request.method == "POST" and request.url.path == "/encrypted-pdf": + seen["post"] += 1 + payload = json.loads(request.content.decode("utf-8")) + assert payload == payload_dump + return httpx.Response( + 200, + json={ + "inputId": [input_file.id], + "outputId": [output_id], + }, + ) + if request.method == "GET" and request.url.path == f"/resource/{output_id}": + seen["get"] += 1 + assert request.url.params["format"] == "info" + return httpx.Response( + 200, + json=build_file_info_payload( + output_id, + "encrypted.pdf", + "application/pdf", + ), + ) + msg = f"Unexpected request {request.method} {request.url}" + raise AssertionError(msg) + + transport = httpx.MockTransport(handler) + with PdfRestClient(api_key=VALID_API_KEY, transport=transport) as client: + response = client.add_open_password( + input_file, + new_open_password=new_password, + current_permissions_password=permissions_password, + output="encrypted", + ) + + assert seen == {"post": 1, "get": 1} + assert_pdf_file_response( + response, + expected_name="encrypted.pdf", + input_file=input_file, + ) + + +def test_add_open_password_request_customization( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.delenv("PDFREST_API_KEY", raising=False) + input_file = make_pdf_file(PdfRestFileID.generate(1)) + output_id = str(PdfRestFileID.generate()) + captured_timeout: dict[str, float | dict[str, float] | None] = {} + new_password = make_password("open-custom") + permissions_password = make_password("perm-custom") + payload_dump = build_encrypt_payload( + input_file, + new_open_password=new_password, + current_permissions_password=permissions_password, + output="encrypted-custom", + ) + + def handler(request: httpx.Request) -> httpx.Response: + if request.method == "POST" and request.url.path == "/encrypted-pdf": + assert request.url.params["trace"] == "sync" + assert request.headers["X-Debug"] == "sync" + captured_timeout["value"] = request.extensions.get("timeout") + payload = json.loads(request.content.decode("utf-8")) + assert payload == {**payload_dump, "diagnostics": "on"} + return httpx.Response( + 200, + json={ + "inputId": [input_file.id], + "outputId": [output_id], + }, + ) + if request.method == "GET" and request.url.path == f"/resource/{output_id}": + assert request.url.params["format"] == "info" + assert request.url.params["trace"] == "sync" + assert request.headers["X-Debug"] == "sync" + return httpx.Response( + 200, + json=build_file_info_payload( + output_id, + "encrypted-custom.pdf", + "application/pdf", + ), + ) + msg = f"Unexpected request {request.method} {request.url}" + raise AssertionError(msg) + + transport = httpx.MockTransport(handler) + with PdfRestClient(api_key=VALID_API_KEY, transport=transport) as client: + response = client.add_open_password( + input_file, + new_open_password=new_password, + current_permissions_password=permissions_password, + output="encrypted-custom", + extra_query={"trace": "sync"}, + extra_headers={"X-Debug": "sync"}, + extra_body={"diagnostics": "on"}, + timeout=0.71, + ) + + assert_pdf_file_response( + response, + expected_name="encrypted-custom.pdf", + input_file=input_file, + ) + timeout_value = captured_timeout["value"] + assert timeout_value is not None + if isinstance(timeout_value, dict): + assert all(pytest.approx(0.71) == value for value in timeout_value.values()) + else: + assert timeout_value == pytest.approx(0.71) + + +def test_change_open_password_request_customization( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.delenv("PDFREST_API_KEY", raising=False) + input_file = make_pdf_file(PdfRestFileID.generate(1)) + output_id = str(PdfRestFileID.generate()) + captured_timeout: dict[str, float | dict[str, float] | None] = {} + current_password = make_password("old-open") + new_password = make_password("new-open") + permissions_password = make_password("perm") + payload_dump = build_encrypt_payload( + input_file, + current_open_password=current_password, + new_open_password=new_password, + current_permissions_password=permissions_password, + output="rotated-open", + ) + + def handler(request: httpx.Request) -> httpx.Response: + if request.method == "POST" and request.url.path == "/encrypted-pdf": + assert request.url.params["trace"] == "sync" + assert request.headers["X-Debug"] == "sync" + captured_timeout["value"] = request.extensions.get("timeout") + payload = json.loads(request.content.decode("utf-8")) + assert payload == {**payload_dump, "diagnostics": "on"} + return httpx.Response( + 200, + json={ + "inputId": [input_file.id], + "outputId": [output_id], + }, + ) + if request.method == "GET" and request.url.path == f"/resource/{output_id}": + assert request.url.params["format"] == "info" + assert request.url.params["trace"] == "sync" + assert request.headers["X-Debug"] == "sync" + return httpx.Response( + 200, + json=build_file_info_payload( + output_id, + "rotated-open.pdf", + "application/pdf", + ), + ) + msg = f"Unexpected request {request.method} {request.url}" + raise AssertionError(msg) + + transport = httpx.MockTransport(handler) + with PdfRestClient(api_key=VALID_API_KEY, transport=transport) as client: + response = client.change_open_password( + input_file, + current_open_password=current_password, + new_open_password=new_password, + current_permissions_password=permissions_password, + output="rotated-open", + extra_query={"trace": "sync"}, + extra_headers={"X-Debug": "sync"}, + extra_body={"diagnostics": "on"}, + timeout=0.77, + ) + + assert_pdf_file_response( + response, + expected_name="rotated-open.pdf", + input_file=input_file, + ) + timeout_value = captured_timeout["value"] + assert timeout_value is not None + if isinstance(timeout_value, dict): + assert all(pytest.approx(0.77) == value for value in timeout_value.values()) + else: + assert timeout_value == pytest.approx(0.77) + + +def test_remove_open_password_success(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.delenv("PDFREST_API_KEY", raising=False) + input_file = make_pdf_file(PdfRestFileID.generate(1)) + output_id = str(PdfRestFileID.generate()) + current_password = make_password("open-current") + payload_dump = build_decrypt_payload( + input_file, + current_open_password=current_password, + output="decrypted", + ) + + seen: dict[str, int] = {"post": 0, "get": 0} + + def handler(request: httpx.Request) -> httpx.Response: + if request.method == "POST" and request.url.path == "/decrypted-pdf": + seen["post"] += 1 + payload = json.loads(request.content.decode("utf-8")) + assert payload == payload_dump + return httpx.Response( + 200, + json={ + "inputId": [input_file.id], + "outputId": [output_id], + }, + ) + if request.method == "GET" and request.url.path == f"/resource/{output_id}": + seen["get"] += 1 + assert request.url.params["format"] == "info" + return httpx.Response( + 200, + json=build_file_info_payload( + output_id, + "decrypted.pdf", + "application/pdf", + ), + ) + msg = f"Unexpected request {request.method} {request.url}" + raise AssertionError(msg) + + transport = httpx.MockTransport(handler) + with PdfRestClient(api_key=VALID_API_KEY, transport=transport) as client: + response = client.remove_open_password( + input_file, + current_open_password=current_password, + output="decrypted", + ) + + assert seen == {"post": 1, "get": 1} + assert_pdf_file_response( + response, + expected_name="decrypted.pdf", + input_file=input_file, + ) + + +def test_remove_open_password_request_customization( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.delenv("PDFREST_API_KEY", raising=False) + input_file = make_pdf_file(PdfRestFileID.generate(1)) + output_id = str(PdfRestFileID.generate()) + captured_timeout: dict[str, float | dict[str, float] | None] = {} + current_password = make_password("open-custom") + permissions_password = make_password("perm-custom") + payload_dump = build_decrypt_payload( + input_file, + current_open_password=current_password, + current_permissions_password=permissions_password, + output="decrypted-custom", + ) + + def handler(request: httpx.Request) -> httpx.Response: + if request.method == "POST" and request.url.path == "/decrypted-pdf": + assert request.url.params["trace"] == "sync" + assert request.headers["X-Debug"] == "sync" + captured_timeout["value"] = request.extensions.get("timeout") + payload = json.loads(request.content.decode("utf-8")) + assert payload == {**payload_dump, "audit": "yes"} + return httpx.Response( + 200, + json={ + "inputId": [input_file.id], + "outputId": [output_id], + }, + ) + if request.method == "GET" and request.url.path == f"/resource/{output_id}": + assert request.url.params["format"] == "info" + assert request.url.params["trace"] == "sync" + assert request.headers["X-Debug"] == "sync" + return httpx.Response( + 200, + json=build_file_info_payload( + output_id, + "decrypted-custom.pdf", + "application/pdf", + ), + ) + msg = f"Unexpected request {request.method} {request.url}" + raise AssertionError(msg) + + transport = httpx.MockTransport(handler) + with PdfRestClient(api_key=VALID_API_KEY, transport=transport) as client: + response = client.remove_open_password( + input_file, + current_open_password=current_password, + current_permissions_password=permissions_password, + output="decrypted-custom", + extra_query={"trace": "sync"}, + extra_headers={"X-Debug": "sync"}, + extra_body={"audit": "yes"}, + timeout=0.59, + ) + + assert_pdf_file_response( + response, + expected_name="decrypted-custom.pdf", + input_file=input_file, + ) + timeout_value = captured_timeout["value"] + assert timeout_value is not None + if isinstance(timeout_value, dict): + assert all(pytest.approx(0.59) == value for value in timeout_value.values()) + else: + assert timeout_value == pytest.approx(0.59) + + +@pytest.mark.asyncio +async def test_async_add_open_password_request_customization( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.delenv("PDFREST_API_KEY", raising=False) + input_file = make_pdf_file(PdfRestFileID.generate(2)) + output_id = str(PdfRestFileID.generate()) + captured_timeout: dict[str, float | dict[str, float] | None] = {} + new_password = make_password("async-open") + permissions_password = make_password("async-perm") + payload_dump = build_encrypt_payload( + input_file, + new_open_password=new_password, + current_permissions_password=permissions_password, + output="async-encrypted", + ) + + def handler(request: httpx.Request) -> httpx.Response: + if request.method == "POST" and request.url.path == "/encrypted-pdf": + assert request.url.params["trace"] == "async" + assert request.headers["X-Debug"] == "async" + captured_timeout["value"] = request.extensions.get("timeout") + payload = json.loads(request.content.decode("utf-8")) + assert payload == {**payload_dump, "keep_permissions": "yes"} + return httpx.Response( + 200, + json={ + "inputId": [input_file.id], + "outputId": [output_id], + }, + ) + if request.method == "GET" and request.url.path == f"/resource/{output_id}": + assert request.url.params["format"] == "info" + assert request.url.params["trace"] == "async" + assert request.headers["X-Debug"] == "async" + return httpx.Response( + 200, + json=build_file_info_payload( + output_id, + "async-encrypted.pdf", + "application/pdf", + ), + ) + msg = f"Unexpected request {request.method} {request.url}" + raise AssertionError(msg) + + transport = httpx.MockTransport(handler) + async with AsyncPdfRestClient(api_key=ASYNC_API_KEY, transport=transport) as client: + response = await client.add_open_password( + input_file, + new_open_password=new_password, + current_permissions_password=permissions_password, + output="async-encrypted", + extra_query={"trace": "async"}, + extra_headers={"X-Debug": "async"}, + extra_body={"keep_permissions": "yes"}, + timeout=0.66, + ) + + assert_pdf_file_response( + response, + expected_name="async-encrypted.pdf", + input_file=input_file, + ) + timeout_value = captured_timeout["value"] + assert timeout_value is not None + if isinstance(timeout_value, dict): + assert all(pytest.approx(0.66) == value for value in timeout_value.values()) + else: + assert timeout_value == pytest.approx(0.66) + + +@pytest.mark.asyncio +async def test_async_change_open_password_success( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.delenv("PDFREST_API_KEY", raising=False) + input_file = make_pdf_file(PdfRestFileID.generate(2)) + output_id = str(PdfRestFileID.generate()) + current_password = make_password("async-open-current") + new_password = make_password("async-open-next") + permissions_password = make_password("async-open-perm") + payload_dump = build_encrypt_payload( + input_file, + current_open_password=current_password, + new_open_password=new_password, + current_permissions_password=permissions_password, + output="async-rotated-open", + ) + + def handler(request: httpx.Request) -> httpx.Response: + if request.method == "POST" and request.url.path == "/encrypted-pdf": + payload = json.loads(request.content.decode("utf-8")) + assert payload == payload_dump + return httpx.Response( + 200, + json={ + "inputId": [input_file.id], + "outputId": [output_id], + }, + ) + if request.method == "GET" and request.url.path == f"/resource/{output_id}": + assert request.url.params["format"] == "info" + return httpx.Response( + 200, + json=build_file_info_payload( + output_id, + "async-rotated-open.pdf", + "application/pdf", + ), + ) + msg = f"Unexpected request {request.method} {request.url}" + raise AssertionError(msg) + + transport = httpx.MockTransport(handler) + async with AsyncPdfRestClient(api_key=ASYNC_API_KEY, transport=transport) as client: + response = await client.change_open_password( + input_file, + current_open_password=current_password, + new_open_password=new_password, + current_permissions_password=permissions_password, + output="async-rotated-open", + ) + + assert_pdf_file_response( + response, + expected_name="async-rotated-open.pdf", + input_file=input_file, + ) + + +@pytest.mark.asyncio +async def test_async_change_open_password_request_customization( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.delenv("PDFREST_API_KEY", raising=False) + input_file = make_pdf_file(PdfRestFileID.generate(2)) + output_id = str(PdfRestFileID.generate()) + captured_timeout: dict[str, float | dict[str, float] | None] = {} + current_password = make_password("async-open-current-custom") + new_password = make_password("async-open-next-custom") + permissions_password = make_password("async-open-perm-custom") + payload_dump = build_encrypt_payload( + input_file, + current_open_password=current_password, + new_open_password=new_password, + current_permissions_password=permissions_password, + output="async-rotated-open-custom", + ) + + def handler(request: httpx.Request) -> httpx.Response: + if request.method == "POST" and request.url.path == "/encrypted-pdf": + assert request.url.params["trace"] == "async" + assert request.headers["X-Debug"] == "async" + captured_timeout["value"] = request.extensions.get("timeout") + payload = json.loads(request.content.decode("utf-8")) + assert payload == {**payload_dump, "audit": "yes"} + return httpx.Response( + 200, + json={ + "inputId": [input_file.id], + "outputId": [output_id], + }, + ) + if request.method == "GET" and request.url.path == f"/resource/{output_id}": + assert request.url.params["format"] == "info" + assert request.url.params["trace"] == "async" + assert request.headers["X-Debug"] == "async" + return httpx.Response( + 200, + json=build_file_info_payload( + output_id, + "async-rotated-open-custom.pdf", + "application/pdf", + ), + ) + msg = f"Unexpected request {request.method} {request.url}" + raise AssertionError(msg) + + transport = httpx.MockTransport(handler) + async with AsyncPdfRestClient(api_key=ASYNC_API_KEY, transport=transport) as client: + response = await client.change_open_password( + input_file, + current_open_password=current_password, + new_open_password=new_password, + current_permissions_password=permissions_password, + output="async-rotated-open-custom", + extra_query={"trace": "async"}, + extra_headers={"X-Debug": "async"}, + extra_body={"audit": "yes"}, + timeout=0.74, + ) + + assert_pdf_file_response( + response, + expected_name="async-rotated-open-custom.pdf", + input_file=input_file, + ) + timeout_value = captured_timeout["value"] + assert timeout_value is not None + if isinstance(timeout_value, dict): + assert all(pytest.approx(0.74) == value for value in timeout_value.values()) + else: + assert timeout_value == pytest.approx(0.74) + + +@pytest.mark.asyncio +async def test_async_remove_open_password_success( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.delenv("PDFREST_API_KEY", raising=False) + input_file = make_pdf_file(PdfRestFileID.generate(2)) + output_id = str(PdfRestFileID.generate()) + current_password = make_password("async-open-current") + payload_dump = build_decrypt_payload( + input_file, + current_open_password=current_password, + ) + + def handler(request: httpx.Request) -> httpx.Response: + if request.method == "POST" and request.url.path == "/decrypted-pdf": + payload = json.loads(request.content.decode("utf-8")) + assert payload == payload_dump + return httpx.Response( + 200, + json={ + "inputId": [input_file.id], + "outputId": [output_id], + }, + ) + if request.method == "GET" and request.url.path == f"/resource/{output_id}": + assert request.url.params["format"] == "info" + return httpx.Response( + 200, + json=build_file_info_payload( + output_id, + "async-decrypted.pdf", + "application/pdf", + ), + ) + msg = f"Unexpected request {request.method} {request.url}" + raise AssertionError(msg) + + transport = httpx.MockTransport(handler) + async with AsyncPdfRestClient(api_key=ASYNC_API_KEY, transport=transport) as client: + response = await client.remove_open_password( + input_file, + current_open_password=current_password, + ) + + assert_pdf_file_response( + response, + expected_name="async-decrypted.pdf", + input_file=input_file, + ) + + +@pytest.mark.asyncio +async def test_async_remove_open_password_request_customization( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.delenv("PDFREST_API_KEY", raising=False) + input_file = make_pdf_file(PdfRestFileID.generate(2)) + output_id = str(PdfRestFileID.generate()) + captured_timeout: dict[str, float | dict[str, float] | None] = {} + current_password = make_password("async-open-custom") + permissions_password = make_password("async-perm-custom") + payload_dump = build_decrypt_payload( + input_file, + current_open_password=current_password, + current_permissions_password=permissions_password, + output="async-decrypted-custom", + ) + + def handler(request: httpx.Request) -> httpx.Response: + if request.method == "POST" and request.url.path == "/decrypted-pdf": + assert request.url.params["trace"] == "async" + assert request.headers["X-Debug"] == "async" + captured_timeout["value"] = request.extensions.get("timeout") + payload = json.loads(request.content.decode("utf-8")) + assert payload == {**payload_dump, "audit": "yes"} + return httpx.Response( + 200, + json={ + "inputId": [input_file.id], + "outputId": [output_id], + }, + ) + if request.method == "GET" and request.url.path == f"/resource/{output_id}": + assert request.url.params["format"] == "info" + assert request.url.params["trace"] == "async" + assert request.headers["X-Debug"] == "async" + return httpx.Response( + 200, + json=build_file_info_payload( + output_id, + "async-decrypted-custom.pdf", + "application/pdf", + ), + ) + msg = f"Unexpected request {request.method} {request.url}" + raise AssertionError(msg) + + transport = httpx.MockTransport(handler) + async with AsyncPdfRestClient(api_key=ASYNC_API_KEY, transport=transport) as client: + response = await client.remove_open_password( + input_file, + current_open_password=current_password, + current_permissions_password=permissions_password, + output="async-decrypted-custom", + extra_query={"trace": "async"}, + extra_headers={"X-Debug": "async"}, + extra_body={"audit": "yes"}, + timeout=0.63, + ) + + assert_pdf_file_response( + response, + expected_name="async-decrypted-custom.pdf", + input_file=input_file, + ) + timeout_value = captured_timeout["value"] + assert timeout_value is not None + if isinstance(timeout_value, dict): + assert all(pytest.approx(0.63) == value for value in timeout_value.values()) + else: + assert timeout_value == pytest.approx(0.63) + + +def test_encrypt_decrypt_validation(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.delenv("PDFREST_API_KEY", raising=False) + pdf_file = make_pdf_file(PdfRestFileID.generate(1)) + non_pdf_file = make_non_pdf_file(str(PdfRestFileID.generate())) + other_pdf = make_pdf_file(PdfRestFileID.generate(2)) + secure_password = make_password("secure") + short_password = "a" * 3 + empty_password = "".join([]) + transport = httpx.MockTransport(lambda request: (_ for _ in ()).throw(RuntimeError)) + + with ( + PdfRestClient(api_key=VALID_API_KEY, transport=transport) as client, + pytest.raises(ValidationError, match="Must be a PDF file"), + ): + client.add_open_password( + non_pdf_file, + new_open_password=secure_password, + ) + + with ( + PdfRestClient(api_key=VALID_API_KEY, transport=transport) as client, + pytest.raises(ValidationError, match="at most 1"), + ): + client.add_open_password( + [pdf_file, other_pdf], + new_open_password=secure_password, + ) + + with ( + PdfRestClient(api_key=VALID_API_KEY, transport=transport) as client, + pytest.raises( + ValidationError, match="String should have at least 6 characters" + ), + ): + client.add_open_password( + pdf_file, + new_open_password=short_password, + ) + + with ( + PdfRestClient(api_key=VALID_API_KEY, transport=transport) as client, + pytest.raises(ValidationError, match="String should have at least 1 character"), + ): + client.change_open_password( + pdf_file, + current_open_password=empty_password, + new_open_password=secure_password, + ) + + with ( + PdfRestClient(api_key=VALID_API_KEY, transport=transport) as client, + pytest.raises(ValidationError, match="at most 1"), + ): + client.remove_open_password( + [pdf_file, other_pdf], + current_open_password=secure_password, + ) + + +@pytest.mark.asyncio +async def test_async_encrypt_decrypt_validation( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.delenv("PDFREST_API_KEY", raising=False) + pdf_file = make_pdf_file(PdfRestFileID.generate(1)) + non_pdf_file = make_non_pdf_file(str(PdfRestFileID.generate())) + other_pdf = make_pdf_file(PdfRestFileID.generate(2)) + secure_password = make_password("secure-async") + short_password = "a" * 3 + empty_password = "".join([]) + transport = httpx.MockTransport(lambda request: (_ for _ in ()).throw(RuntimeError)) + + async with AsyncPdfRestClient(api_key=ASYNC_API_KEY, transport=transport) as client: + with pytest.raises(ValidationError, match="Must be a PDF file"): + await client.add_open_password( + non_pdf_file, + new_open_password=secure_password, + ) + + with pytest.raises(ValidationError, match="at most 1"): + await client.add_open_password( + [pdf_file, other_pdf], + new_open_password=secure_password, + ) + + with pytest.raises( + ValidationError, match="String should have at least 6 characters" + ): + await client.add_open_password( + pdf_file, + new_open_password=short_password, + ) + + with pytest.raises( + ValidationError, match="String should have at least 1 character" + ): + await client.change_open_password( + pdf_file, + current_open_password=empty_password, + new_open_password=secure_password, + ) + + with pytest.raises(ValidationError, match="at most 1"): + await client.remove_open_password( + [pdf_file, other_pdf], + current_open_password=secure_password, + ) diff --git a/tests/test_permissions_password.py b/tests/test_permissions_password.py new file mode 100644 index 00000000..704c7a9a --- /dev/null +++ b/tests/test_permissions_password.py @@ -0,0 +1,902 @@ +from __future__ import annotations + +import json +from uuid import uuid4 + +import httpx +import pytest +from pydantic import ValidationError + +from pdfrest import AsyncPdfRestClient, PdfRestClient +from pdfrest.models import ( + PdfRestFile, + PdfRestFileBasedResponse, + PdfRestFileID, +) +from pdfrest.models._internal import PdfRestrictPayload, PdfUnrestrictPayload +from pdfrest.types import PdfRestriction + +from .graphics_test_helpers import ( + ASYNC_API_KEY, + VALID_API_KEY, + build_file_info_payload, + make_pdf_file, +) + + +def make_password(label: str) -> str: + return f"{label}-{uuid4().hex}" + + +def make_non_pdf_file(file_id: str) -> PdfRestFile: + return PdfRestFile.model_validate( + build_file_info_payload( + file_id, + "example.png", + "image/png", + ) + ) + + +def build_restrict_payload( + input_file: PdfRestFile, + *, + new_permissions_password: str, + current_permissions_password: str | None = None, + current_open_password: str | None = None, + restrictions: list[PdfRestriction] | None = None, + output: str | None = None, +) -> dict[str, object]: + return PdfRestrictPayload.model_validate( + { + "files": [input_file], + "new_permissions_password": new_permissions_password, + "current_permissions_password": current_permissions_password, + "current_open_password": current_open_password, + "restrictions": restrictions, + "output": output, + } + ).model_dump(mode="json", by_alias=True, exclude_none=True, exclude_unset=True) + + +def build_unrestrict_payload( + input_file: PdfRestFile, + *, + current_permissions_password: str, + current_open_password: str | None = None, + output: str | None = None, +) -> dict[str, object]: + return PdfUnrestrictPayload.model_validate( + { + "files": [input_file], + "current_permissions_password": current_permissions_password, + "current_open_password": current_open_password, + "output": output, + } + ).model_dump(mode="json", by_alias=True, exclude_none=True, exclude_unset=True) + + +def assert_pdf_file_response( + response: PdfRestFileBasedResponse, *, expected_name: str, input_file: PdfRestFile +) -> None: + assert isinstance(response, PdfRestFileBasedResponse) + output_file = response.output_file + assert output_file.name == expected_name + assert output_file.type == "application/pdf" + assert output_file.size > 0 + output_url = str(output_file.url) + assert f"/resource/{output_file.id}" in output_url + assert response.warning is None + assert str(response.input_id) == str(input_file.id) + + +@pytest.mark.parametrize( + "restrictions", + [ + pytest.param(None, id="none"), + pytest.param(["print_low"], id="single"), + pytest.param(["print_low", "copy_content"], id="multiple"), + ], +) +def test_add_permissions_password_success( + monkeypatch: pytest.MonkeyPatch, restrictions: list[PdfRestriction] | None +) -> None: + monkeypatch.delenv("PDFREST_API_KEY", raising=False) + input_file = make_pdf_file(PdfRestFileID.generate(1)) + output_id = str(PdfRestFileID.generate()) + new_password = make_password("secure") + open_password = make_password("open") + payload_dump = build_restrict_payload( + input_file, + new_permissions_password=new_password, + current_open_password=open_password, + restrictions=restrictions, + output="restricted", + ) + + seen: dict[str, int] = {"post": 0, "get": 0} + + def handler(request: httpx.Request) -> httpx.Response: + if request.method == "POST" and request.url.path == "/restricted-pdf": + seen["post"] += 1 + payload = json.loads(request.content.decode("utf-8")) + assert payload == payload_dump + return httpx.Response( + 200, + json={ + "inputId": [input_file.id], + "outputId": [output_id], + }, + ) + if request.method == "GET" and request.url.path == f"/resource/{output_id}": + seen["get"] += 1 + assert request.url.params["format"] == "info" + return httpx.Response( + 200, + json=build_file_info_payload( + output_id, + "restricted.pdf", + "application/pdf", + ), + ) + msg = f"Unexpected request {request.method} {request.url}" + raise AssertionError(msg) + + transport = httpx.MockTransport(handler) + with PdfRestClient(api_key=VALID_API_KEY, transport=transport) as client: + response = client.add_permissions_password( + input_file, + new_permissions_password=new_password, + restrictions=restrictions, + current_open_password=open_password, + output="restricted", + ) + + assert seen == {"post": 1, "get": 1} + assert_pdf_file_response( + response, + expected_name="restricted.pdf", + input_file=input_file, + ) + + +def test_add_permissions_password_request_customization( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.delenv("PDFREST_API_KEY", raising=False) + input_file = make_pdf_file(PdfRestFileID.generate(1)) + output_id = str(PdfRestFileID.generate()) + captured_timeout: dict[str, float | dict[str, float] | None] = {} + new_password = make_password("custom") + payload_dump = build_restrict_payload( + input_file, + new_permissions_password=new_password, + restrictions=["print_high"], + output="custom-restricted", + ) + + def handler(request: httpx.Request) -> httpx.Response: + if request.method == "POST" and request.url.path == "/restricted-pdf": + assert request.url.params["trace"] == "sync" + assert request.headers["X-Debug"] == "sync" + captured_timeout["value"] = request.extensions.get("timeout") + payload = json.loads(request.content.decode("utf-8")) + assert payload == {**payload_dump, "keep_open": "yes"} + return httpx.Response( + 200, + json={ + "inputId": [input_file.id], + "outputId": [output_id], + }, + ) + if request.method == "GET" and request.url.path == f"/resource/{output_id}": + assert request.url.params["format"] == "info" + assert request.url.params["trace"] == "sync" + assert request.headers["X-Debug"] == "sync" + return httpx.Response( + 200, + json=build_file_info_payload( + output_id, + "custom-restricted.pdf", + "application/pdf", + ), + ) + msg = f"Unexpected request {request.method} {request.url}" + raise AssertionError(msg) + + transport = httpx.MockTransport(handler) + with PdfRestClient(api_key=VALID_API_KEY, transport=transport) as client: + response = client.add_permissions_password( + input_file, + new_permissions_password=new_password, + restrictions=["print_high"], + output="custom-restricted", + extra_query={"trace": "sync"}, + extra_headers={"X-Debug": "sync"}, + extra_body={"keep_open": "yes"}, + timeout=0.55, + ) + + assert_pdf_file_response( + response, + expected_name="custom-restricted.pdf", + input_file=input_file, + ) + timeout_value = captured_timeout["value"] + assert timeout_value is not None + if isinstance(timeout_value, dict): + assert all(pytest.approx(0.55) == value for value in timeout_value.values()) + else: + assert timeout_value == pytest.approx(0.55) + + +def test_change_permissions_password_request_customization( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.delenv("PDFREST_API_KEY", raising=False) + input_file = make_pdf_file(PdfRestFileID.generate(1)) + output_id = str(PdfRestFileID.generate()) + captured_timeout: dict[str, float | dict[str, float] | None] = {} + current_password = make_password("old") + new_password = make_password("new") + current_open_password = make_password("open") + payload_dump = build_restrict_payload( + input_file, + current_permissions_password=current_password, + new_permissions_password=new_password, + current_open_password=current_open_password, + restrictions=["edit_content"], + output="rotated", + ) + + def handler(request: httpx.Request) -> httpx.Response: + if request.method == "POST" and request.url.path == "/restricted-pdf": + assert request.url.params["trace"] == "sync" + assert request.headers["X-Debug"] == "sync" + captured_timeout["value"] = request.extensions.get("timeout") + payload = json.loads(request.content.decode("utf-8")) + assert payload == {**payload_dump, "diagnostics": "on"} + return httpx.Response( + 200, + json={ + "inputId": [input_file.id], + "outputId": [output_id], + }, + ) + if request.method == "GET" and request.url.path == f"/resource/{output_id}": + assert request.url.params["format"] == "info" + assert request.url.params["trace"] == "sync" + assert request.headers["X-Debug"] == "sync" + return httpx.Response( + 200, + json=build_file_info_payload( + output_id, + "rotated.pdf", + "application/pdf", + ), + ) + msg = f"Unexpected request {request.method} {request.url}" + raise AssertionError(msg) + + transport = httpx.MockTransport(handler) + with PdfRestClient(api_key=VALID_API_KEY, transport=transport) as client: + response = client.change_permissions_password( + input_file, + current_permissions_password=current_password, + new_permissions_password=new_password, + current_open_password=current_open_password, + restrictions=["edit_content"], + output="rotated", + extra_query={"trace": "sync"}, + extra_headers={"X-Debug": "sync"}, + extra_body={"diagnostics": "on"}, + timeout=0.77, + ) + + assert_pdf_file_response( + response, + expected_name="rotated.pdf", + input_file=input_file, + ) + timeout_value = captured_timeout["value"] + assert timeout_value is not None + if isinstance(timeout_value, dict): + assert all(pytest.approx(0.77) == value for value in timeout_value.values()) + else: + assert timeout_value == pytest.approx(0.77) + + +def test_remove_permissions_password_success( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.delenv("PDFREST_API_KEY", raising=False) + input_file = make_pdf_file(PdfRestFileID.generate(1)) + output_id = str(PdfRestFileID.generate()) + current_password = make_password("old") + payload_dump = build_unrestrict_payload( + input_file, + current_permissions_password=current_password, + output="clean", + ) + + seen: dict[str, int] = {"post": 0, "get": 0} + + def handler(request: httpx.Request) -> httpx.Response: + if request.method == "POST" and request.url.path == "/unrestricted-pdf": + seen["post"] += 1 + payload = json.loads(request.content.decode("utf-8")) + assert payload == payload_dump + return httpx.Response( + 200, + json={ + "inputId": [input_file.id], + "outputId": [output_id], + }, + ) + if request.method == "GET" and request.url.path == f"/resource/{output_id}": + seen["get"] += 1 + assert request.url.params["format"] == "info" + return httpx.Response( + 200, + json=build_file_info_payload( + output_id, + "clean.pdf", + "application/pdf", + ), + ) + msg = f"Unexpected request {request.method} {request.url}" + raise AssertionError(msg) + + transport = httpx.MockTransport(handler) + with PdfRestClient(api_key=VALID_API_KEY, transport=transport) as client: + response = client.remove_permissions_password( + input_file, + current_permissions_password=current_password, + output="clean", + ) + + assert seen == {"post": 1, "get": 1} + assert_pdf_file_response( + response, + expected_name="clean.pdf", + input_file=input_file, + ) + + +def test_remove_permissions_password_request_customization( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.delenv("PDFREST_API_KEY", raising=False) + input_file = make_pdf_file(PdfRestFileID.generate(1)) + output_id = str(PdfRestFileID.generate()) + captured_timeout: dict[str, float | dict[str, float] | None] = {} + current_password = make_password("remove-custom") + open_password = make_password("open-custom") + payload_dump = build_unrestrict_payload( + input_file, + current_permissions_password=current_password, + current_open_password=open_password, + output="clean-custom", + ) + + def handler(request: httpx.Request) -> httpx.Response: + if request.method == "POST" and request.url.path == "/unrestricted-pdf": + assert request.url.params["trace"] == "sync" + assert request.headers["X-Debug"] == "sync" + captured_timeout["value"] = request.extensions.get("timeout") + payload = json.loads(request.content.decode("utf-8")) + assert payload == {**payload_dump, "audit": "enabled"} + return httpx.Response( + 200, + json={ + "inputId": [input_file.id], + "outputId": [output_id], + }, + ) + if request.method == "GET" and request.url.path == f"/resource/{output_id}": + assert request.url.params["format"] == "info" + assert request.url.params["trace"] == "sync" + assert request.headers["X-Debug"] == "sync" + return httpx.Response( + 200, + json=build_file_info_payload( + output_id, + "clean-custom.pdf", + "application/pdf", + ), + ) + msg = f"Unexpected request {request.method} {request.url}" + raise AssertionError(msg) + + transport = httpx.MockTransport(handler) + with PdfRestClient(api_key=VALID_API_KEY, transport=transport) as client: + response = client.remove_permissions_password( + input_file, + current_permissions_password=current_password, + current_open_password=open_password, + output="clean-custom", + extra_query={"trace": "sync"}, + extra_headers={"X-Debug": "sync"}, + extra_body={"audit": "enabled"}, + timeout=0.69, + ) + + assert_pdf_file_response( + response, + expected_name="clean-custom.pdf", + input_file=input_file, + ) + timeout_value = captured_timeout["value"] + assert timeout_value is not None + if isinstance(timeout_value, dict): + assert all(pytest.approx(0.69) == value for value in timeout_value.values()) + else: + assert timeout_value == pytest.approx(0.69) + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "restrictions", + [ + pytest.param(None, id="none"), + pytest.param(["print_low"], id="single"), + pytest.param(["print_low", "copy_content"], id="multiple"), + ], +) +async def test_async_add_permissions_password_success( + monkeypatch: pytest.MonkeyPatch, restrictions: list[PdfRestriction] | None +) -> None: + monkeypatch.delenv("PDFREST_API_KEY", raising=False) + input_file = make_pdf_file(PdfRestFileID.generate(1)) + output_id = str(PdfRestFileID.generate()) + new_password = make_password("secure") + payload_dump = build_restrict_payload( + input_file, + new_permissions_password=new_password, + restrictions=restrictions, + output="restricted", + ) + + seen: dict[str, int] = {"post": 0, "get": 0} + + def handler(request: httpx.Request) -> httpx.Response: + if request.method == "POST" and request.url.path == "/restricted-pdf": + seen["post"] += 1 + payload = json.loads(request.content.decode("utf-8")) + assert payload == payload_dump + return httpx.Response( + 200, + json={ + "inputId": [input_file.id], + "outputId": [output_id], + }, + ) + if request.method == "GET" and request.url.path == f"/resource/{output_id}": + seen["get"] += 1 + assert request.url.params["format"] == "info" + return httpx.Response( + 200, + json=build_file_info_payload( + output_id, + "restricted.pdf", + "application/pdf", + ), + ) + msg = f"Unexpected request {request.method} {request.url}" + raise AssertionError(msg) + + transport = httpx.MockTransport(handler) + async with AsyncPdfRestClient(api_key=ASYNC_API_KEY, transport=transport) as client: + response = await client.add_permissions_password( + input_file, + new_permissions_password=new_password, + restrictions=restrictions, + output="restricted", + ) + + assert seen == {"post": 1, "get": 1} + assert_pdf_file_response( + response, + expected_name="restricted.pdf", + input_file=input_file, + ) + + +@pytest.mark.asyncio +async def test_async_add_permissions_password_request_customization( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.delenv("PDFREST_API_KEY", raising=False) + input_file = make_pdf_file(PdfRestFileID.generate(2)) + output_id = str(PdfRestFileID.generate()) + captured_timeout: dict[str, float | dict[str, float] | None] = {} + new_password = make_password("async") + current_open_password = make_password("async-open") + payload_dump = build_restrict_payload( + input_file, + new_permissions_password=new_password, + current_open_password=current_open_password, + restrictions=["print_high"], + ) + + def handler(request: httpx.Request) -> httpx.Response: + if request.method == "POST" and request.url.path == "/restricted-pdf": + assert request.url.params["trace"] == "async" + assert request.headers["X-Debug"] == "async" + captured_timeout["value"] = request.extensions.get("timeout") + payload = json.loads(request.content.decode("utf-8")) + assert payload == {**payload_dump, "keep_open": "yes"} + return httpx.Response( + 200, + json={ + "inputId": [input_file.id], + "outputId": [output_id], + }, + ) + if request.method == "GET" and request.url.path == f"/resource/{output_id}": + assert request.url.params["format"] == "info" + assert request.url.params["trace"] == "async" + assert request.headers["X-Debug"] == "async" + return httpx.Response( + 200, + json=build_file_info_payload( + output_id, + "async-restricted.pdf", + "application/pdf", + ), + ) + msg = f"Unexpected request {request.method} {request.url}" + raise AssertionError(msg) + + transport = httpx.MockTransport(handler) + async with AsyncPdfRestClient(api_key=ASYNC_API_KEY, transport=transport) as client: + response = await client.add_permissions_password( + input_file, + new_permissions_password=new_password, + current_open_password=current_open_password, + restrictions=["print_high"], + extra_query={"trace": "async"}, + extra_headers={"X-Debug": "async"}, + extra_body={"keep_open": "yes"}, + timeout=0.66, + ) + + assert_pdf_file_response( + response, + expected_name="async-restricted.pdf", + input_file=input_file, + ) + timeout_value = captured_timeout["value"] + assert timeout_value is not None + if isinstance(timeout_value, dict): + assert all(pytest.approx(0.66) == value for value in timeout_value.values()) + else: + assert timeout_value == pytest.approx(0.66) + + +@pytest.mark.asyncio +async def test_async_change_permissions_password_request_customization( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.delenv("PDFREST_API_KEY", raising=False) + input_file = make_pdf_file(PdfRestFileID.generate(2)) + output_id = str(PdfRestFileID.generate()) + captured_timeout: dict[str, float | dict[str, float] | None] = {} + current_password = make_password("old-async") + new_password = make_password("new-async") + current_open_password = make_password("open-async") + payload_dump = build_restrict_payload( + input_file, + current_permissions_password=current_password, + new_permissions_password=new_password, + current_open_password=current_open_password, + restrictions=["edit_content"], + output="async-rotated", + ) + + def handler(request: httpx.Request) -> httpx.Response: + if request.method == "POST" and request.url.path == "/restricted-pdf": + assert request.url.params["trace"] == "async" + assert request.headers["X-Debug"] == "async" + captured_timeout["value"] = request.extensions.get("timeout") + payload = json.loads(request.content.decode("utf-8")) + assert payload == {**payload_dump, "diagnostics": "on"} + return httpx.Response( + 200, + json={ + "inputId": [input_file.id], + "outputId": [output_id], + }, + ) + if request.method == "GET" and request.url.path == f"/resource/{output_id}": + assert request.url.params["format"] == "info" + assert request.url.params["trace"] == "async" + assert request.headers["X-Debug"] == "async" + return httpx.Response( + 200, + json=build_file_info_payload( + output_id, + "async-rotated.pdf", + "application/pdf", + ), + ) + msg = f"Unexpected request {request.method} {request.url}" + raise AssertionError(msg) + + transport = httpx.MockTransport(handler) + async with AsyncPdfRestClient(api_key=ASYNC_API_KEY, transport=transport) as client: + response = await client.change_permissions_password( + input_file, + current_permissions_password=current_password, + new_permissions_password=new_password, + current_open_password=current_open_password, + restrictions=["edit_content"], + output="async-rotated", + extra_query={"trace": "async"}, + extra_headers={"X-Debug": "async"}, + extra_body={"diagnostics": "on"}, + timeout=0.88, + ) + + assert_pdf_file_response( + response, + expected_name="async-rotated.pdf", + input_file=input_file, + ) + timeout_value = captured_timeout["value"] + assert timeout_value is not None + if isinstance(timeout_value, dict): + assert all(pytest.approx(0.88) == value for value in timeout_value.values()) + else: + assert timeout_value == pytest.approx(0.88) + + +@pytest.mark.asyncio +async def test_async_remove_permissions_password_success( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.delenv("PDFREST_API_KEY", raising=False) + input_file = make_pdf_file(PdfRestFileID.generate(2)) + output_id = str(PdfRestFileID.generate()) + current_password = make_password("secret") + payload_dump = build_unrestrict_payload( + input_file, + current_permissions_password=current_password, + ) + + def handler(request: httpx.Request) -> httpx.Response: + if request.method == "POST" and request.url.path == "/unrestricted-pdf": + payload = json.loads(request.content.decode("utf-8")) + assert payload == payload_dump + return httpx.Response( + 200, + json={ + "inputId": [input_file.id], + "outputId": [output_id], + }, + ) + if request.method == "GET" and request.url.path == f"/resource/{output_id}": + assert request.url.params["format"] == "info" + return httpx.Response( + 200, + json=build_file_info_payload( + output_id, + "async-clean.pdf", + "application/pdf", + ), + ) + msg = f"Unexpected request {request.method} {request.url}" + raise AssertionError(msg) + + transport = httpx.MockTransport(handler) + async with AsyncPdfRestClient(api_key=ASYNC_API_KEY, transport=transport) as client: + response = await client.remove_permissions_password( + input_file, + current_permissions_password=current_password, + ) + + assert_pdf_file_response( + response, + expected_name="async-clean.pdf", + input_file=input_file, + ) + + +@pytest.mark.asyncio +async def test_async_remove_permissions_password_request_customization( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.delenv("PDFREST_API_KEY", raising=False) + input_file = make_pdf_file(PdfRestFileID.generate(2)) + output_id = str(PdfRestFileID.generate()) + captured_timeout: dict[str, float | dict[str, float] | None] = {} + current_password = make_password("async-remove-custom") + open_password = make_password("async-open-custom") + payload_dump = build_unrestrict_payload( + input_file, + current_permissions_password=current_password, + current_open_password=open_password, + output="async-clean-custom", + ) + + def handler(request: httpx.Request) -> httpx.Response: + if request.method == "POST" and request.url.path == "/unrestricted-pdf": + assert request.url.params["trace"] == "async" + assert request.headers["X-Debug"] == "async" + captured_timeout["value"] = request.extensions.get("timeout") + payload = json.loads(request.content.decode("utf-8")) + assert payload == {**payload_dump, "audit": "enabled"} + return httpx.Response( + 200, + json={ + "inputId": [input_file.id], + "outputId": [output_id], + }, + ) + if request.method == "GET" and request.url.path == f"/resource/{output_id}": + assert request.url.params["format"] == "info" + assert request.url.params["trace"] == "async" + assert request.headers["X-Debug"] == "async" + return httpx.Response( + 200, + json=build_file_info_payload( + output_id, + "async-clean-custom.pdf", + "application/pdf", + ), + ) + msg = f"Unexpected request {request.method} {request.url}" + raise AssertionError(msg) + + transport = httpx.MockTransport(handler) + async with AsyncPdfRestClient(api_key=ASYNC_API_KEY, transport=transport) as client: + response = await client.remove_permissions_password( + input_file, + current_permissions_password=current_password, + current_open_password=open_password, + output="async-clean-custom", + extra_query={"trace": "async"}, + extra_headers={"X-Debug": "async"}, + extra_body={"audit": "enabled"}, + timeout=0.73, + ) + + assert_pdf_file_response( + response, + expected_name="async-clean-custom.pdf", + input_file=input_file, + ) + timeout_value = captured_timeout["value"] + assert timeout_value is not None + if isinstance(timeout_value, dict): + assert all(pytest.approx(0.73) == value for value in timeout_value.values()) + else: + assert timeout_value == pytest.approx(0.73) + + +@pytest.mark.asyncio +async def test_async_permissions_password_validation( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.delenv("PDFREST_API_KEY", raising=False) + pdf_file = make_pdf_file(PdfRestFileID.generate(1)) + non_pdf_file = make_non_pdf_file(str(PdfRestFileID.generate())) + other_pdf = make_pdf_file(PdfRestFileID.generate(2)) + secure_password = make_password("secure") + another_password = make_password("another") + empty_password = "".join([]) + transport = httpx.MockTransport(lambda request: (_ for _ in ()).throw(RuntimeError)) + + async with AsyncPdfRestClient(api_key=ASYNC_API_KEY, transport=transport) as client: + with pytest.raises(ValidationError, match="Must be a PDF file"): + await client.add_permissions_password( + non_pdf_file, + new_permissions_password=secure_password, + ) + + with pytest.raises(ValidationError, match="at most 1"): + await client.add_permissions_password( + [pdf_file, other_pdf], + new_permissions_password=secure_password, + ) + + with pytest.raises( + ValidationError, match="String should have at least 6 characters" + ): + await client.add_permissions_password( + pdf_file, + new_permissions_password="a" * 5, + ) + + with pytest.raises( + ValidationError, match="List should have at least 1 item after validation" + ): + await client.add_permissions_password( + pdf_file, + new_permissions_password=secure_password, + restrictions=[], + ) + + with pytest.raises( + ValidationError, match="String should have at least 1 character" + ): + await client.remove_permissions_password( + pdf_file, + current_permissions_password=empty_password, + ) + + with pytest.raises(ValidationError, match="at most 1"): + await client.remove_permissions_password( + [pdf_file, other_pdf], + current_permissions_password=another_password, + ) + + +def test_permissions_password_validation(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.delenv("PDFREST_API_KEY", raising=False) + pdf_file = make_pdf_file(PdfRestFileID.generate(1)) + non_pdf_file = make_non_pdf_file(str(PdfRestFileID.generate())) + other_pdf = make_pdf_file(PdfRestFileID.generate(2)) + secure_password = make_password("secure") + another_password = make_password("another") + empty_password = "".join([]) + transport = httpx.MockTransport(lambda request: (_ for _ in ()).throw(RuntimeError)) + + with ( + PdfRestClient(api_key=VALID_API_KEY, transport=transport) as client, + pytest.raises(ValidationError, match="Must be a PDF file"), + ): + client.add_permissions_password( + non_pdf_file, + new_permissions_password=secure_password, + ) + + with ( + PdfRestClient(api_key=VALID_API_KEY, transport=transport) as client, + pytest.raises(ValidationError, match="at most 1"), + ): + client.add_permissions_password( + [pdf_file, other_pdf], + new_permissions_password=secure_password, + ) + + with ( + PdfRestClient(api_key=VALID_API_KEY, transport=transport) as client, + pytest.raises( + ValidationError, match="String should have at least 6 characters" + ), + ): + client.add_permissions_password( + pdf_file, + new_permissions_password="a" * 5, + ) + + with ( + PdfRestClient(api_key=VALID_API_KEY, transport=transport) as client, + pytest.raises( + ValidationError, match="List should have at least 1 item after validation" + ), + ): + client.add_permissions_password( + pdf_file, + new_permissions_password=secure_password, + restrictions=[], + ) + + with ( + PdfRestClient(api_key=VALID_API_KEY, transport=transport) as client, + pytest.raises(ValidationError, match="String should have at least 1 character"), + ): + client.remove_permissions_password( + pdf_file, + current_permissions_password=empty_password, + ) + + with ( + PdfRestClient(api_key=VALID_API_KEY, transport=transport) as client, + pytest.raises(ValidationError, match="at most 1"), + ): + client.remove_permissions_password( + [pdf_file, other_pdf], + current_permissions_password=another_password, + )